Dask bag crashing in xp65 env (but wasn't in hh5)

Dear NRI help,

I am using dask bag to read multiple files in parallel. This code was working on the hh5 environment, however, crashes on the xp65 env.

Here are the pertinent fragments of code:

from dask import compute, delayed
import dask.bag as db

#Create key:value pairs of keys and directory paths to files
...

def parallel_read_acs_data(arglist):
#@delayed
#def parallel_read_acs_data(key_val,dirpath):
    key_val, dirpath = arglist
    #Read the data in the directory
    infiles = sorted(glob.glob(dirpath+"/*.nc")) #Get the infiles from directory. A list.
 
    # Chunks to length of individual files (1 year)
    value = xr.open_mfdataset(infiles,chunks={'time':-1},parallel=True) 
    data_dict[key_val]=value #Need to extract 0th value as key is list.
    return data_dict

data_dict={}
#def parallel_read_function(argshort):
def parallel_read_function(arglist):
    bag=db.from_sequence(arglist)
    bag=bag.map(parallel_read_acs_data)
    data_dict = bag.compute()
    return data_dict

“The process submits and then I get the following error message:
KilledWorker: Attempted to run task (‘from_sequence-parallel_read_acs_data-8bc8c0317571e91cbdada5fce7c5daa1’, 69) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://10.6.122.69:43559. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

I think it has to do with dask dependencies mimatch.

Cheers,
Justin

Hi Justin

Which module exactly are you loading (e.g. conda/analysis3-25.03 ?) ?

This error message is a flow-on effect from the operation failing.

Are their other error messages? e.g. maybe there is a log message from a python exception written by the worker directly.

I ran this test code :


import xarray as xr
import glob
from dask.distributed import Client

client = Client(threads_per_worker = 1)
client

infiles = sorted(glob.glob('/g/data/ik11/outputs/access-om2/1deg_era5_ryf/output00*/ice/OUTPUT/iceh*-daily.nc'))

ds = xr.open_mfdataset(infiles, parallel=True)

ds

and replicated your problem on conda:analysis3-25.03. It worked fine on conda:analysis3-25.01 and conda:analysis3-25.04, so I would try using 25.04. (@CharlesTurner - maybe that is something you have seen before?)

A couple of thoughts:

  • are you starting a dask client? normally we use this:
from dask.distributed import Client

client = Client(threads_per_worker = 1)
client

The threads_per_worker is important here, it helps avoid some dugs which existing with parallel access to files.

  • the use of dask in the code snippet is a bit odd. xr.open_mfdataset uses dask under the hood, and should return a “lazy loaded” xarray dataset already, so you should not need to use a @delayed decorator or interact with the dask bag directly. e.g. these lines:
    key_val, dirpath = arglist
    #Read the data in the directory
    infiles = sorted(glob.glob(dirpath+"/*.nc")) #Get the infiles from directory. A list.
 
    # Chunks to length of individual files (1 year)
    value = xr.open_mfdataset(infiles,chunks={'time':-1},parallel=True) 
    data_dict[key_val]=value 

will already return a “lazy-loaded” dataset which using dask under the hood.

If my suggestion doesn’t work would try and break down the problem into something smaller - e.g. can you make a minimal-example of what fails.

Oh - never mind my comments about starting the dask cluster - i see you have that handled (Using dask_jobqueue in the new xp65 environment)

Not sure I can add much useful in the way of debugging steps beyond what @anton has already suggested, but I would also caution against doing anything like this:

@dask.delayed
def foo():
    xr.open_mfdataset()

Whilst xarray passes around locks to (attempt to) ensure that files are opened in a thread-safe fashion, creating locks is not thread safe.

This means that if you use a xr.open_mfdataset call (which creates the relevant locks) within an already threaded section of code, eg. an @dask.delayed task, these threads are unlikely to work properly.

For some reason, this only started to become an issue recently, presumably due to some transitive dependency changes. I didn’t experience it in the analysis3-25.02 environment or earlier, so I’d recommend trying with the analysis3-25.02 environment & seeing if that solves your issue.

Hi all.

Hopefully someone has PBS clusters working with xp65.

I’m unable to do so. With analysis-25.03 or 25.04 I get warnings about mismatched versions (varies a bit though):

2025-05-02 16:39:06,372 - distributed.worker - WARNING - Mismatched versions found
±------------±--------------------------------------------±----------------±---------------+| Package | Worker-c4acc418-d28f-4266-9626-0b51cf0bd8c1 | Scheduler | Workers |
±------------±--------------------------------------------±----------------±---------------+
| cloudpickle | 3.1.0 | 3.1.1 | 3.1.0 |

| tornado | 6.4.1 | 6.4.2 | 6.4.1 |
±------------±--------------------------------------------±----------------±---------------+

and then failures todeserialize or errors involving unexpected keyword arguments, makes sense if versions are incompatible I guess.

2025-05-02 16:39:06,514 - distributed.protocol.pickle - INFO - Failed to deserialize b’\x80\x05\ \x94\x8c\x0copen_dataset\x94\x93\x94]\x94XB\x01\x00\x00/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20150101-20151231.nc\x94ah x86\x94b.’
Traceback (most recent call last):
File “/g/data/xp65/public/apps/med_conda/envs/analysis3-24.07/lib/python3.11/site-packages/distributed/protocol/pickle.py”, line 92, in loads
return pickle.loads(x)
^^^^^^^^^^^^^^^
AttributeError: ‘Task’ object has no attribute ‘_data_producer’

When run in a notebook it just hangs as the cluster closes and open_mfdataset then just sits there waiting forever.

With 24.07 I get a different error stream, a whole lot of Engine loading failures, all seemingly related to libmpi:

File “/g/data/xp65/public/apps/med_conda/envs/analysis3-24.07/lib/python3.11/site-packages/h5py/init.py”, line 25, in
from . import _errors
^^^^^^^^^^^^^^^^^
ImportError: libmpi.so.40: cannot open shared object file: No such file or directory

I’m at a loss as in terms of debugging, need someone who knows how these things work I think.

Can I upload my minimal test script here somehow? (Upload button above says no to .py format). Pasted below.

Thanks for suggestions.

Kind regards,
Aurel Griesser

import xarray as xr

from dask.distributed import Client
from dask_jobqueue import PBSCluster


def my_start_PBS_dask_cluster(  
    cores=4,
    memory="9GB",
    processes=4,
    walltime = '1:00:00',
    storages = 'gdata/ai05+gdata/xp65+gdata/tp28+gdata/ia39+gdata/mn51+gdata/ob53+gdata/py18+gdata/py18+gdata/dk92'
):
    
    print("Starting Dask...\r", end="")
    
    cluster = PBSCluster(walltime=str(walltime), cores=cores, memory=str(memory), processes=processes,
                         job_extra_directives=['-q normalbw',
                                               '-l ncpus='+str(cores),
                                               '-l mem='+str(memory),
                                               '-l storage='+storages,
                                               '-l jobfs=10GB',
                                               '-P ai05'],
                         job_script_prologue=['module unload conda/analysis3-25.03', 'module load conda/analysis3-24.07'],
                         job_directives_skip=["select"],
                         # python="/g/data/xp65/public/apps/med_conda/envs/analysis3-25.03/bin/python",  # Why not this? 'which python' gives me this path, but Scott provided the one on the next line.
                         # python="/g/data/xp65/public/apps/med_conda_scripts/analysis3-25.04.d/bin/python",
                         python="/g/data/xp65/public/apps/med_conda_scripts/analysis3-24.07.d/bin/python",
                        )
    
    cluster.scale(jobs=1)  # Scale the resource to this many nodes
    client = Client(cluster)
    print(f"Dask Client started. Dashboard URL: {client.dashboard_link}")
    return client, cluster


def my_load_data():
    file_list = [
        "/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20150101-20151231.nc",
        "/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20160101-20161231.nc",
        "/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20170101-20171231.nc"
    ]
    
    ds = xr.open_mfdataset(file_list, parallel=True, engine='h5netcdf')
    
    return ds


if __name__ == '__main__':

    try:
        # Initialise a Dask cluster at this stage, since our inputs are probably OK.
        client, cluster = my_start_PBS_dask_cluster()
    
        # Load data.
        ds = my_load_data()
        print(f"Mean value: {ds['tasmaxAdjust'].mean()}")
        
    finally:
        cluster.close()
        client.close()
        print("Closed Dask cluster")

    print("All done!")

I just run that with python test_dask.py in a terminal in an ARE browser window.

It’s fine to use the in-built forum code box as above, but if you want to share longer snippets or programs there are some suggestions in this topic: