Dask bag crashing in xp65 env (but wasn't in hh5)

Dear NRI help,

I am using dask bag to read multiple files in parallel. This code was working on the hh5 environment, however, crashes on the xp65 env.

Here are the pertinent fragments of code:

from dask import compute, delayed
import dask.bag as db

#Create key:value pairs of keys and directory paths to files
...

def parallel_read_acs_data(arglist):
#@delayed
#def parallel_read_acs_data(key_val,dirpath):
    key_val, dirpath = arglist
    #Read the data in the directory
    infiles = sorted(glob.glob(dirpath+"/*.nc")) #Get the infiles from directory. A list.
 
    # Chunks to length of individual files (1 year)
    value = xr.open_mfdataset(infiles,chunks={'time':-1},parallel=True) 
    data_dict[key_val]=value #Need to extract 0th value as key is list.
    return data_dict

data_dict={}
#def parallel_read_function(argshort):
def parallel_read_function(arglist):
    bag=db.from_sequence(arglist)
    bag=bag.map(parallel_read_acs_data)
    data_dict = bag.compute()
    return data_dict

“The process submits and then I get the following error message:
KilledWorker: Attempted to run task (‘from_sequence-parallel_read_acs_data-8bc8c0317571e91cbdada5fce7c5daa1’, 69) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://10.6.122.69:43559. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.”

I think it has to do with dask dependencies mimatch.

Cheers,
Justin

Hi Justin

Which module exactly are you loading (e.g. conda/analysis3-25.03 ?) ?

This error message is a flow-on effect from the operation failing.

Are their other error messages? e.g. maybe there is a log message from a python exception written by the worker directly.

I ran this test code :


import xarray as xr
import glob
from dask.distributed import Client

client = Client(threads_per_worker = 1)
client

infiles = sorted(glob.glob('/g/data/ik11/outputs/access-om2/1deg_era5_ryf/output00*/ice/OUTPUT/iceh*-daily.nc'))

ds = xr.open_mfdataset(infiles, parallel=True)

ds

and replicated your problem on conda:analysis3-25.03. It worked fine on conda:analysis3-25.01 and conda:analysis3-25.04, so I would try using 25.04. (@CharlesTurner - maybe that is something you have seen before?)

A couple of thoughts:

  • are you starting a dask client? normally we use this:
from dask.distributed import Client

client = Client(threads_per_worker = 1)
client

The threads_per_worker is important here, it helps avoid some dugs which existing with parallel access to files.

  • the use of dask in the code snippet is a bit odd. xr.open_mfdataset uses dask under the hood, and should return a “lazy loaded” xarray dataset already, so you should not need to use a @delayed decorator or interact with the dask bag directly. e.g. these lines:
    key_val, dirpath = arglist
    #Read the data in the directory
    infiles = sorted(glob.glob(dirpath+"/*.nc")) #Get the infiles from directory. A list.
 
    # Chunks to length of individual files (1 year)
    value = xr.open_mfdataset(infiles,chunks={'time':-1},parallel=True) 
    data_dict[key_val]=value 

will already return a “lazy-loaded” dataset which using dask under the hood.

If my suggestion doesn’t work would try and break down the problem into something smaller - e.g. can you make a minimal-example of what fails.

Oh - never mind my comments about starting the dask cluster - i see you have that handled (Using dask_jobqueue in the new xp65 environment)

Not sure I can add much useful in the way of debugging steps beyond what @anton has already suggested, but I would also caution against doing anything like this:

@dask.delayed
def foo():
    xr.open_mfdataset()

Whilst xarray passes around locks to (attempt to) ensure that files are opened in a thread-safe fashion, creating locks is not thread safe.

This means that if you use a xr.open_mfdataset call (which creates the relevant locks) within an already threaded section of code, eg. an @dask.delayed task, these threads are unlikely to work properly.

For some reason, this only started to become an issue recently, presumably due to some transitive dependency changes. I didn’t experience it in the analysis3-25.02 environment or earlier, so I’d recommend trying with the analysis3-25.02 environment & seeing if that solves your issue.

2 posts were split to a new topic: Using dask jobqueue with xp65 conda env

@ag0498, I hope you don’t mind I’ve move your question into it’s own topic here: Using dask jobqueue with xp65 conda env.

1 Like

I tried using the analysis-25.02 env, however, the same thing happened.

I use the dask decorator on top of xarray.open_mfdataset as it enables me to read all the combinations of input files quickly and associate key:value pairs to the xarray datasets.

Hi Anton,

Note that this is using analysis-25.02 as suggested by @CharlesTurner

Even if I use the following:

test_dir = 'path_to_nc_files'
infiles = sorted(glob.glob(test_dir+"/*.nc"))
value = xr.open_mfdataset(infiles,chunks={'time':-1},parallel=True)

I get the same error message. However, if I change to using parallel=False, the code completed.

Also, if I use the dask bag decorator (using parallel=False) it still doesn’t work as it used to in the hh5 analysis env.

I get the error message:

KilledWorker: Attempted to run task 'open_dataset-c6093310-9440-4b9d-a582-db183141f49f' on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://10.6.122.27:41293. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

Thanks Justin

I assume you are using PBSCluster ?

There should be a dask-worker.e* file in the directory you are running python from. Could you have a look and see what it says please

Hi Anton,

I have included portions of the dask-worker.e* file below. Note that this is using analysis-25.02 and using the minimal example above (i.e. only using xr.open_mfdatasest without dask bag, so only reading the files in one directory.

Lots of lines starting the workers:
2025-05-09 11:24:08,452 - distributed.worker - INFO - -------------------------------------------------
2025-05-09 11:24:08,481 - distributed.worker - INFO - Start worker at: tcp://10.6.129.66:38669
2025-05-09 11:24:08,481 - distributed.worker - INFO - Listening to: tcp://10.6.129.66:38669
2025-05-09 11:24:08,481 - distributed.worker - INFO - Worker name: PBSCluster-0-0
2025-05-09 11:24:08,481 - distributed.worker - INFO - dashboard at: 10.6.129.66:45943
2025-05-09 11:24:08,481 - distributed.worker - INFO - Waiting to connect to: tcp://10.6.121.3:39283
2025-05-09 11:24:08,481 - distributed.worker - INFO - -------------------------------------------------
2025-05-09 11:24:08,481 - distributed.worker - INFO - Threads: 1
2025-05-09 11:24:08,481 - distributed.worker - INFO - Memory: 8.51 GiB
2025-05-09 11:24:08,482 - distributed.worker - INFO - Local Directory: /jobfs/140693214.gadi-pbs/dask-scratch-space/worker-uiojq498

Followed by lots of lines connecting the workers:
2025-05-09 11:24:08,482 - distributed.worker - INFO - -------------------------------------------------
2025-05-09 11:24:09,872 - distributed.worker - INFO - Starting Worker plugin shuffle
2025-05-09 11:24:09,872 - distributed.worker - INFO - Registered to: tcp://10.6.121.3:39283
2025-05-09 11:24:09,872 - distributed.worker - INFO - -------------------------------------------------

Then (probably the important one?), the workers get killed:
2025-05-09 11:25:54,887 - distributed.nanny - INFO - Worker process 51656 was killed by signal 11
2025-05-09 11:25:54,896 - distributed.nanny - INFO - Worker process 51635 was killed by signal 11
2025-05-09 11:25:54,898 - distributed.nanny - INFO - Worker process 51626 was killed by signal 11

I hope that makes sense.

Cheers,
Justin

Signal 11 is normally a system termination, e.g. because too much memory or jobfs was used. What is the PBS footer in the dask-worker.o* file?

This is the output of the dask-worker.o* file:

======================================================================================
                  Resource Usage on 2025-05-09 11:28:17:
   Job Id:             140693214.gadi-pbs
   Project:            mn51
   Exit Status:        271 (Linux Signal 15 SIGTERM Termination)
   Service Units:      2.52
   NCPUs Requested:    28                     NCPUs Used: 28
                                           CPU Time Used: 00:08:08
   Memory Requested:   256.0GB               Memory Used: 6.15GB
   Walltime requested: 08:00:00            Walltime Used: 00:04:15
   JobFS requested:    200.0GB                JobFS used: 0B
======================================================================================

I’m not sure why that came out all bold. However, there doesn’t appear to be memory or jobfs exceedences.

1 Like

I have made an edit to use a code block which is the best way to include fixed format text like this.

1 Like

Also I just tried running the minimal example above using the latest kernel (analysis3-25.04): i.e.

test_dir = 'path_to_nc_files'
infiles = sorted(glob.glob(test_dir+"/*.nc"))
value = xr.open_mfdataset(infiles,parallel=True)

And I get the following in the dask-worker.e* file (something about mismatched versions):

±------------±--------------------------------------------±----------------±----------------+
| Package | Worker-b35d7060-44c0-4993-bfe6-7ccfe933a976 | Scheduler | Workers |
±------------±--------------------------------------------±----------------±----------------+
| dask | 2025.1.0 | 2025.3.0 | 2025.1.0 |
| distributed | 2025.1.0 | 2025.3.0 | 2025.1.0 |
| python | 3.11.11.final.0 | 3.11.12.final.0 | 3.11.11.final.0 |
| toolz | 0.12.1 | 1.0.0 | 0.12.1 |
±------------±--------------------------------------------±----------------±----------------+
2025-05-09 12:14:12,039 - distributed.worker - INFO - Starting Worker plugin shuffle
2025-05-09 12:14:12,040 - distributed.worker - INFO - Registered to: tcp://10.6.121.24:36167
2025-05-09 12:14:12,040 - distributed.worker - INFO - -------------------------------------------------
2025-05-09 12:14:12,040 - distributed.core - INFO - Starting established connection to tcp://10.6.121.24:36167
2025-05-09 12:14:12,041 - distributed.worker - WARNING - Mismatched versions found

Also, the following later in the .e* file:
2025-05-09 12:15:53,543 - distributed.protocol.pickle - INFO - Failed to deserialize b’\x80\x05\x95\xa2\x02\x00\x00\x00\x00\x00\x00\x8c\x0fdask._task_spec\x94\x8c\x04Task\x94\x93\x94)\x81\x94(\x89(\x91\x94NNN\x8c\x13xarray.backends.api\x94\x8c\x0copen_dataset\x94\x93\x94]\x94XL\x01\x00\x00/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/CSIRO/CESM2/historical/r11i1p1f1/CCAM-v2203-SN/v1-r1-ACS-MRNBC-BARRAR2-1980-2022/day/prAdjust/v20241216/prAdjust_AUST-05i_CESM2_historical_r11i1p1f1_CSIRO_CCAM-v2203-SN_v1-r1-ACS-MRNBC-BARRAR2-1980-2022_day_19840101-19841231.nc\x94ah\x02)\x81\x94(\x89h\x04NNNh\x02)\x81\x94(\x89h\x04NNN]\x94(\x8c\x06engine\x94Neh\x02)\x81\x94(\x89h\x04NNN\x8c\x06chunks\x94h\x02)\x81\x94(\x89h\x04NNN]\x94\x85\x94\x8c\x08builtins\x94\x8c\x04dict\x94\x93\x94N}\x94t\x94b\x86\x94h\x00\x8c\x0e_identity_cast\x94\x93\x94N}\x94\x8c\x03typ\x94h\x13\x8c\x04list\x94\x93\x94st\x94b\x86\x94h\x1aN}\x94h\x1ch\x1est\x94b\x85\x94h\x15N}\x94t\x94b\x87\x94\x8c\ndask.utils\x94\x8c\x05apply\x94\x93\x94\x8c1open_dataset-58cd6c78-1560-4734-bbf6-b9e6dab15d37\x94}\x94t\x94b.’
Traceback (most recent call last):
File “/g/data/xp65/public/apps/med_conda/envs/analysis3-25.03/lib/python3.11/site-packages/distributed/protocol/pickle.py”, line 92, in loads
return pickle.loads(x)
^^^^^^^^^^^^^^^
_pickle.UnpicklingError: state is not a dictionary

Hi Justin

Just confirming you are setting the python path to PBS cluster and the kernel version consistently?

These two lines have different version