Hi all.
Hopefully someone has PBS clusters working with xp65.
I’m unable to do so. With analysis-25.03 or 25.04 I get warnings about mismatched versions (varies a bit though):
2025-05-02 16:39:06,372 - distributed.worker - WARNING - Mismatched versions found
±------------±--------------------------------------------±----------------±---------------+| Package | Worker-c4acc418-d28f-4266-9626-0b51cf0bd8c1 | Scheduler | Workers |
±------------±--------------------------------------------±----------------±---------------+
| cloudpickle | 3.1.0 | 3.1.1 | 3.1.0 |
| tornado | 6.4.1 | 6.4.2 | 6.4.1 |
±------------±--------------------------------------------±----------------±---------------+
and then failures todeserialize or errors involving unexpected keyword arguments, makes sense if versions are incompatible I guess.
2025-05-02 16:39:06,514 - distributed.protocol.pickle - INFO - Failed to deserialize b’\x80\x05\ \x94\x8c\x0copen_dataset\x94\x93\x94]\x94XB\x01\x00\x00/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20150101-20151231.nc\x94ah x86\x94b.’
Traceback (most recent call last):
File “/g/data/xp65/public/apps/med_conda/envs/analysis3-24.07/lib/python3.11/site-packages/distributed/protocol/pickle.py”, line 92, in loads
return pickle.loads(x)
^^^^^^^^^^^^^^^
AttributeError: ‘Task’ object has no attribute ‘_data_producer’
When run in a notebook it just hangs as the cluster closes and open_mfdataset
then just sits there waiting forever.
With 24.07 I get a different error stream, a whole lot of Engine loading failures, all seemingly related to libmpi:
File “/g/data/xp65/public/apps/med_conda/envs/analysis3-24.07/lib/python3.11/site-packages/h5py/init.py”, line 25, in
from . import _errors
^^^^^^^^^^^^^^^^^
ImportError: libmpi.so.40: cannot open shared object file: No such file or directory
I’m at a loss as in terms of debugging, need someone who knows how these things work I think.
Can I upload my minimal test script here somehow? (Upload button above says no to .py format). Pasted below.
Thanks for suggestions.
Kind regards,
Aurel Griesser
import xarray as xr
from dask.distributed import Client
from dask_jobqueue import PBSCluster
def my_start_PBS_dask_cluster(
cores=4,
memory="9GB",
processes=4,
walltime = '1:00:00',
storages = 'gdata/ai05+gdata/xp65+gdata/tp28+gdata/ia39+gdata/mn51+gdata/ob53+gdata/py18+gdata/py18+gdata/dk92'
):
print("Starting Dask...\r", end="")
cluster = PBSCluster(walltime=str(walltime), cores=cores, memory=str(memory), processes=processes,
job_extra_directives=['-q normalbw',
'-l ncpus='+str(cores),
'-l mem='+str(memory),
'-l storage='+storages,
'-l jobfs=10GB',
'-P ai05'],
job_script_prologue=['module unload conda/analysis3-25.03', 'module load conda/analysis3-24.07'],
job_directives_skip=["select"],
# python="/g/data/xp65/public/apps/med_conda/envs/analysis3-25.03/bin/python", # Why not this? 'which python' gives me this path, but Scott provided the one on the next line.
# python="/g/data/xp65/public/apps/med_conda_scripts/analysis3-25.04.d/bin/python",
python="/g/data/xp65/public/apps/med_conda_scripts/analysis3-24.07.d/bin/python",
)
cluster.scale(jobs=1) # Scale the resource to this many nodes
client = Client(cluster)
print(f"Dask Client started. Dashboard URL: {client.dashboard_link}")
return client, cluster
def my_load_data():
file_list = [
"/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20150101-20151231.nc",
"/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20160101-20161231.nc",
"/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20170101-20171231.nc"
]
ds = xr.open_mfdataset(file_list, parallel=True, engine='h5netcdf')
return ds
if __name__ == '__main__':
try:
# Initialise a Dask cluster at this stage, since our inputs are probably OK.
client, cluster = my_start_PBS_dask_cluster()
# Load data.
ds = my_load_data()
print(f"Mean value: {ds['tasmaxAdjust'].mean()}")
finally:
cluster.close()
client.close()
print("Closed Dask cluster")
print("All done!")
I just run that with python test_dask.py
in a terminal in an ARE browser window.