Dask bag crashing in xp65 env (but wasn't in hh5)

Dear NRI help,

I am using dask bag to read multiple files in parallel. This code was working on the hh5 environment, however, crashes on the xp65 env.

Here are the pertinent fragments of code:

from dask import compute, delayed
import dask.bag as db

#Create key:value pairs of keys and directory paths to files
...

def parallel_read_acs_data(arglist):
#@delayed
#def parallel_read_acs_data(key_val,dirpath):
    key_val, dirpath = arglist
    #Read the data in the directory
    infiles = sorted(glob.glob(dirpath+"/*.nc")) #Get the infiles from directory. A list.
 
    # Chunks to length of individual files (1 year)
    value = xr.open_mfdataset(infiles,chunks={'time':-1},parallel=True) 
    data_dict[key_val]=value #Need to extract 0th value as key is list.
    return data_dict

data_dict={}
#def parallel_read_function(argshort):
def parallel_read_function(arglist):
    bag=db.from_sequence(arglist)
    bag=bag.map(parallel_read_acs_data)
    data_dict = bag.compute()
    return data_dict

“The process submits and then I get the following error message:
KilledWorker: Attempted to run task (‘from_sequence-parallel_read_acs_data-8bc8c0317571e91cbdada5fce7c5daa1’, 69) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://10.6.122.69:43559. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.”

I think it has to do with dask dependencies mimatch.

Cheers,
Justin

Hi Justin

Which module exactly are you loading (e.g. conda/analysis3-25.03 ?) ?

This error message is a flow-on effect from the operation failing.

Are their other error messages? e.g. maybe there is a log message from a python exception written by the worker directly.

I ran this test code :


import xarray as xr
import glob
from dask.distributed import Client

client = Client(threads_per_worker = 1)
client

infiles = sorted(glob.glob('/g/data/ik11/outputs/access-om2/1deg_era5_ryf/output00*/ice/OUTPUT/iceh*-daily.nc'))

ds = xr.open_mfdataset(infiles, parallel=True)

ds

and replicated your problem on conda:analysis3-25.03. It worked fine on conda:analysis3-25.01 and conda:analysis3-25.04, so I would try using 25.04. (@CharlesTurner - maybe that is something you have seen before?)

A couple of thoughts:

  • are you starting a dask client? normally we use this:
from dask.distributed import Client

client = Client(threads_per_worker = 1)
client

The threads_per_worker is important here, it helps avoid some dugs which existing with parallel access to files.

  • the use of dask in the code snippet is a bit odd. xr.open_mfdataset uses dask under the hood, and should return a “lazy loaded” xarray dataset already, so you should not need to use a @delayed decorator or interact with the dask bag directly. e.g. these lines:
    key_val, dirpath = arglist
    #Read the data in the directory
    infiles = sorted(glob.glob(dirpath+"/*.nc")) #Get the infiles from directory. A list.
 
    # Chunks to length of individual files (1 year)
    value = xr.open_mfdataset(infiles,chunks={'time':-1},parallel=True) 
    data_dict[key_val]=value 

will already return a “lazy-loaded” dataset which using dask under the hood.

If my suggestion doesn’t work would try and break down the problem into something smaller - e.g. can you make a minimal-example of what fails.

Oh - never mind my comments about starting the dask cluster - i see you have that handled (Using dask_jobqueue in the new xp65 environment)

Not sure I can add much useful in the way of debugging steps beyond what @anton has already suggested, but I would also caution against doing anything like this:

@dask.delayed
def foo():
    xr.open_mfdataset()

Whilst xarray passes around locks to (attempt to) ensure that files are opened in a thread-safe fashion, creating locks is not thread safe.

This means that if you use a xr.open_mfdataset call (which creates the relevant locks) within an already threaded section of code, eg. an @dask.delayed task, these threads are unlikely to work properly.

For some reason, this only started to become an issue recently, presumably due to some transitive dependency changes. I didn’t experience it in the analysis3-25.02 environment or earlier, so I’d recommend trying with the analysis3-25.02 environment & seeing if that solves your issue.

2 posts were split to a new topic: Using dask jobqueue with xp65 conda env

@ag0498, I hope you don’t mind I’ve move your question into it’s own topic here: Using dask jobqueue with xp65 conda env.

1 Like

I tried using the analysis-25.02 env, however, the same thing happened.

I use the dask decorator on top of xarray.open_mfdataset as it enables me to read all the combinations of input files quickly and associate key:value pairs to the xarray datasets.

Hi Anton,

Note that this is using analysis-25.02 as suggested by @CharlesTurner

Even if I use the following:

test_dir = 'path_to_nc_files'
infiles = sorted(glob.glob(test_dir+"/*.nc"))
value = xr.open_mfdataset(infiles,chunks={'time':-1},parallel=True)

I get the same error message. However, if I change to using parallel=False, the code completed.

Also, if I use the dask bag decorator (using parallel=False) it still doesn’t work as it used to in the hh5 analysis env.

I get the error message:

KilledWorker: Attempted to run task 'open_dataset-c6093310-9440-4b9d-a582-db183141f49f' on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://10.6.122.27:41293. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

Thanks Justin

I assume you are using PBSCluster ?

There should be a dask-worker.e* file in the directory you are running python from. Could you have a look and see what it says please

Hi Anton,

I have included portions of the dask-worker.e* file below. Note that this is using analysis-25.02 and using the minimal example above (i.e. only using xr.open_mfdatasest without dask bag, so only reading the files in one directory.

Lots of lines starting the workers:
2025-05-09 11:24:08,452 - distributed.worker - INFO - -------------------------------------------------
2025-05-09 11:24:08,481 - distributed.worker - INFO - Start worker at: tcp://10.6.129.66:38669
2025-05-09 11:24:08,481 - distributed.worker - INFO - Listening to: tcp://10.6.129.66:38669
2025-05-09 11:24:08,481 - distributed.worker - INFO - Worker name: PBSCluster-0-0
2025-05-09 11:24:08,481 - distributed.worker - INFO - dashboard at: 10.6.129.66:45943
2025-05-09 11:24:08,481 - distributed.worker - INFO - Waiting to connect to: tcp://10.6.121.3:39283
2025-05-09 11:24:08,481 - distributed.worker - INFO - -------------------------------------------------
2025-05-09 11:24:08,481 - distributed.worker - INFO - Threads: 1
2025-05-09 11:24:08,481 - distributed.worker - INFO - Memory: 8.51 GiB
2025-05-09 11:24:08,482 - distributed.worker - INFO - Local Directory: /jobfs/140693214.gadi-pbs/dask-scratch-space/worker-uiojq498

Followed by lots of lines connecting the workers:
2025-05-09 11:24:08,482 - distributed.worker - INFO - -------------------------------------------------
2025-05-09 11:24:09,872 - distributed.worker - INFO - Starting Worker plugin shuffle
2025-05-09 11:24:09,872 - distributed.worker - INFO - Registered to: tcp://10.6.121.3:39283
2025-05-09 11:24:09,872 - distributed.worker - INFO - -------------------------------------------------

Then (probably the important one?), the workers get killed:
2025-05-09 11:25:54,887 - distributed.nanny - INFO - Worker process 51656 was killed by signal 11
2025-05-09 11:25:54,896 - distributed.nanny - INFO - Worker process 51635 was killed by signal 11
2025-05-09 11:25:54,898 - distributed.nanny - INFO - Worker process 51626 was killed by signal 11

I hope that makes sense.

Cheers,
Justin

Signal 11 is normally a system termination, e.g. because too much memory or jobfs was used. What is the PBS footer in the dask-worker.o* file?

This is the output of the dask-worker.o* file:

======================================================================================
                  Resource Usage on 2025-05-09 11:28:17:
   Job Id:             140693214.gadi-pbs
   Project:            mn51
   Exit Status:        271 (Linux Signal 15 SIGTERM Termination)
   Service Units:      2.52
   NCPUs Requested:    28                     NCPUs Used: 28
                                           CPU Time Used: 00:08:08
   Memory Requested:   256.0GB               Memory Used: 6.15GB
   Walltime requested: 08:00:00            Walltime Used: 00:04:15
   JobFS requested:    200.0GB                JobFS used: 0B
======================================================================================

I’m not sure why that came out all bold. However, there doesn’t appear to be memory or jobfs exceedences.

1 Like

I have made an edit to use a code block which is the best way to include fixed format text like this.

1 Like

Also I just tried running the minimal example above using the latest kernel (analysis3-25.04): i.e.

test_dir = 'path_to_nc_files'
infiles = sorted(glob.glob(test_dir+"/*.nc"))
value = xr.open_mfdataset(infiles,parallel=True)

And I get the following in the dask-worker.e* file (something about mismatched versions):

±------------±--------------------------------------------±----------------±----------------+
| Package | Worker-b35d7060-44c0-4993-bfe6-7ccfe933a976 | Scheduler | Workers |
±------------±--------------------------------------------±----------------±----------------+
| dask | 2025.1.0 | 2025.3.0 | 2025.1.0 |
| distributed | 2025.1.0 | 2025.3.0 | 2025.1.0 |
| python | 3.11.11.final.0 | 3.11.12.final.0 | 3.11.11.final.0 |
| toolz | 0.12.1 | 1.0.0 | 0.12.1 |
±------------±--------------------------------------------±----------------±----------------+
2025-05-09 12:14:12,039 - distributed.worker - INFO - Starting Worker plugin shuffle
2025-05-09 12:14:12,040 - distributed.worker - INFO - Registered to: tcp://10.6.121.24:36167
2025-05-09 12:14:12,040 - distributed.worker - INFO - -------------------------------------------------
2025-05-09 12:14:12,040 - distributed.core - INFO - Starting established connection to tcp://10.6.121.24:36167
2025-05-09 12:14:12,041 - distributed.worker - WARNING - Mismatched versions found

Also, the following later in the .e* file:
2025-05-09 12:15:53,543 - distributed.protocol.pickle - INFO - Failed to deserialize b’\x80\x05\x95\xa2\x02\x00\x00\x00\x00\x00\x00\x8c\x0fdask._task_spec\x94\x8c\x04Task\x94\x93\x94)\x81\x94(\x89(\x91\x94NNN\x8c\x13xarray.backends.api\x94\x8c\x0copen_dataset\x94\x93\x94]\x94XL\x01\x00\x00/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/CSIRO/CESM2/historical/r11i1p1f1/CCAM-v2203-SN/v1-r1-ACS-MRNBC-BARRAR2-1980-2022/day/prAdjust/v20241216/prAdjust_AUST-05i_CESM2_historical_r11i1p1f1_CSIRO_CCAM-v2203-SN_v1-r1-ACS-MRNBC-BARRAR2-1980-2022_day_19840101-19841231.nc\x94ah\x02)\x81\x94(\x89h\x04NNNh\x02)\x81\x94(\x89h\x04NNN]\x94(\x8c\x06engine\x94Neh\x02)\x81\x94(\x89h\x04NNN\x8c\x06chunks\x94h\x02)\x81\x94(\x89h\x04NNN]\x94\x85\x94\x8c\x08builtins\x94\x8c\x04dict\x94\x93\x94N}\x94t\x94b\x86\x94h\x00\x8c\x0e_identity_cast\x94\x93\x94N}\x94\x8c\x03typ\x94h\x13\x8c\x04list\x94\x93\x94st\x94b\x86\x94h\x1aN}\x94h\x1ch\x1est\x94b\x85\x94h\x15N}\x94t\x94b\x87\x94\x8c\ndask.utils\x94\x8c\x05apply\x94\x93\x94\x8c1open_dataset-58cd6c78-1560-4734-bbf6-b9e6dab15d37\x94}\x94t\x94b.’
Traceback (most recent call last):
File “/g/data/xp65/public/apps/med_conda/envs/analysis3-25.03/lib/python3.11/site-packages/distributed/protocol/pickle.py”, line 92, in loads
return pickle.loads(x)
^^^^^^^^^^^^^^^
_pickle.UnpicklingError: state is not a dictionary

Hi Justin

Just confirming you are setting the python path to PBS cluster and the kernel version consistently?

These two lines have different version

Hi Anton,
It appears that was a problem, however, the problem still occurred when I reran it.

I found out that the correct kernel was not being loaded by my notebook. It prompted me for the kernel I wished to use from a pop-up menu. I loaded it, but it appears that it wasn’t loaded appropriately. I had to the “reload” the correct kernel from the drop down menu at the top-right of the jupyter notebook (i.e. select the kernel twice).

I still have to be careful to load the correct Python path in the PBSCluster call as Scott suggested.

My code now runs as it did on hh5.

Note that I have tested this with a few kernels including anaylsis-25.04 and analysis-25.05

I think this ticket is now resolved.

Thanks all for your help and suggestions.

Oh good. As Charles noted - we’ve had a few oddities with parallel access to netcdf files recently. It’s still a bit unresolved but glad you have something working :slight_smile: