I am using dask bag to read multiple files in parallel. This code was working on the hh5 environment, however, crashes on the xp65 env.
Here are the pertinent fragments of code:
from dask import compute, delayed
import dask.bag as db
#Create key:value pairs of keys and directory paths to files
...
def parallel_read_acs_data(arglist):
#@delayed
#def parallel_read_acs_data(key_val,dirpath):
key_val, dirpath = arglist
#Read the data in the directory
infiles = sorted(glob.glob(dirpath+"/*.nc")) #Get the infiles from directory. A list.
# Chunks to length of individual files (1 year)
value = xr.open_mfdataset(infiles,chunks={'time':-1},parallel=True)
data_dict[key_val]=value #Need to extract 0th value as key is list.
return data_dict
data_dict={}
#def parallel_read_function(argshort):
def parallel_read_function(arglist):
bag=db.from_sequence(arglist)
bag=bag.map(parallel_read_acs_data)
data_dict = bag.compute()
return data_dict
âThe process submits and then I get the following error message:
KilledWorker: Attempted to run task (âfrom_sequence-parallel_read_acs_data-8bc8c0317571e91cbdada5fce7c5daa1â, 69) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://10.6.122.69:43559. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.â
I think it has to do with dask dependencies mimatch.
and replicated your problem on conda:analysis3-25.03. It worked fine on conda:analysis3-25.01 and conda:analysis3-25.04, so I would try using 25.04. (@CharlesTurner - maybe that is something you have seen before?)
A couple of thoughts:
are you starting a dask client? normally we use this:
from dask.distributed import Client
client = Client(threads_per_worker = 1)
client
The threads_per_worker is important here, it helps avoid some dugs which existing with parallel access to files.
the use of dask in the code snippet is a bit odd. xr.open_mfdataset uses dask under the hood, and should return a âlazy loadedâ xarray dataset already, so you should not need to use a @delayed decorator or interact with the dask bag directly. e.g. these lines:
key_val, dirpath = arglist
#Read the data in the directory
infiles = sorted(glob.glob(dirpath+"/*.nc")) #Get the infiles from directory. A list.
# Chunks to length of individual files (1 year)
value = xr.open_mfdataset(infiles,chunks={'time':-1},parallel=True)
data_dict[key_val]=value
will already return a âlazy-loadedâ dataset which using dask under the hood.
If my suggestion doesnât work would try and break down the problem into something smaller - e.g. can you make a minimal-example of what fails.
Not sure I can add much useful in the way of debugging steps beyond what @anton has already suggested, but I would also caution against doing anything like this:
@dask.delayed
def foo():
xr.open_mfdataset()
Whilst xarray passes around locks to (attempt to) ensure that files are opened in a thread-safe fashion, creating locks is not thread safe.
This means that if you use a xr.open_mfdataset call (which creates the relevant locks) within an already threaded section of code, eg. an @dask.delayed task, these threads are unlikely to work properly.
For some reason, this only started to become an issue recently, presumably due to some transitive dependency changes. I didnât experience it in the analysis3-25.02 environment or earlier, so Iâd recommend trying with the analysis3-25.02 environment & seeing if that solves your issue.
I tried using the analysis-25.02 env, however, the same thing happened.
I use the dask decorator on top of xarray.open_mfdataset as it enables me to read all the combinations of input files quickly and associate key:value pairs to the xarray datasets.
Note that this is using analysis-25.02 as suggested by @CharlesTurner
Even if I use the following:
test_dir = 'path_to_nc_files'
infiles = sorted(glob.glob(test_dir+"/*.nc"))
value = xr.open_mfdataset(infiles,chunks={'time':-1},parallel=True)
I get the same error message. However, if I change to using parallel=False, the code completed.
Also, if I use the dask bag decorator (using parallel=False) it still doesnât work as it used to in the hh5 analysis env.
I get the error message:
KilledWorker: Attempted to run task 'open_dataset-c6093310-9440-4b9d-a582-db183141f49f' on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://10.6.122.27:41293. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
I have included portions of the dask-worker.e* file below. Note that this is using analysis-25.02 and using the minimal example above (i.e. only using xr.open_mfdatasest without dask bag, so only reading the files in one directory.
Lots of lines starting the workers:
2025-05-09 11:24:08,452 - distributed.worker - INFO - -------------------------------------------------
2025-05-09 11:24:08,481 - distributed.worker - INFO - Start worker at: tcp://10.6.129.66:38669
2025-05-09 11:24:08,481 - distributed.worker - INFO - Listening to: tcp://10.6.129.66:38669
2025-05-09 11:24:08,481 - distributed.worker - INFO - Worker name: PBSCluster-0-0
2025-05-09 11:24:08,481 - distributed.worker - INFO - dashboard at: 10.6.129.66:45943
2025-05-09 11:24:08,481 - distributed.worker - INFO - Waiting to connect to: tcp://10.6.121.3:39283
2025-05-09 11:24:08,481 - distributed.worker - INFO - -------------------------------------------------
2025-05-09 11:24:08,481 - distributed.worker - INFO - Threads: 1
2025-05-09 11:24:08,481 - distributed.worker - INFO - Memory: 8.51 GiB
2025-05-09 11:24:08,482 - distributed.worker - INFO - Local Directory: /jobfs/140693214.gadi-pbs/dask-scratch-space/worker-uiojq498
Followed by lots of lines connecting the workers:
2025-05-09 11:24:08,482 - distributed.worker - INFO - -------------------------------------------------
2025-05-09 11:24:09,872 - distributed.worker - INFO - Starting Worker plugin shuffle
2025-05-09 11:24:09,872 - distributed.worker - INFO - Registered to: tcp://10.6.121.3:39283
2025-05-09 11:24:09,872 - distributed.worker - INFO - -------------------------------------------------
Then (probably the important one?), the workers get killed:
2025-05-09 11:25:54,887 - distributed.nanny - INFO - Worker process 51656 was killed by signal 11
2025-05-09 11:25:54,896 - distributed.nanny - INFO - Worker process 51635 was killed by signal 11
2025-05-09 11:25:54,898 - distributed.nanny - INFO - Worker process 51626 was killed by signal 11
Also, the following later in the .e* file:
2025-05-09 12:15:53,543 - distributed.protocol.pickle - INFO - Failed to deserialize bâ\x80\x05\x95\xa2\x02\x00\x00\x00\x00\x00\x00\x8c\x0fdask._task_spec\x94\x8c\x04Task\x94\x93\x94)\x81\x94(\x89(\x91\x94NNN\x8c\x13xarray.backends.api\x94\x8c\x0copen_dataset\x94\x93\x94]\x94XL\x01\x00\x00/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/CSIRO/CESM2/historical/r11i1p1f1/CCAM-v2203-SN/v1-r1-ACS-MRNBC-BARRAR2-1980-2022/day/prAdjust/v20241216/prAdjust_AUST-05i_CESM2_historical_r11i1p1f1_CSIRO_CCAM-v2203-SN_v1-r1-ACS-MRNBC-BARRAR2-1980-2022_day_19840101-19841231.nc\x94ah\x02)\x81\x94(\x89h\x04NNNh\x02)\x81\x94(\x89h\x04NNN]\x94(\x8c\x06engine\x94Neh\x02)\x81\x94(\x89h\x04NNN\x8c\x06chunks\x94h\x02)\x81\x94(\x89h\x04NNN]\x94\x85\x94\x8c\x08builtins\x94\x8c\x04dict\x94\x93\x94N}\x94t\x94b\x86\x94h\x00\x8c\x0e_identity_cast\x94\x93\x94N}\x94\x8c\x03typ\x94h\x13\x8c\x04list\x94\x93\x94st\x94b\x86\x94h\x1aN}\x94h\x1ch\x1est\x94b\x85\x94h\x15N}\x94t\x94b\x87\x94\x8c\ndask.utils\x94\x8c\x05apply\x94\x93\x94\x8c1open_dataset-58cd6c78-1560-4734-bbf6-b9e6dab15d37\x94}\x94t\x94b.â
Traceback (most recent call last):
File â/g/data/xp65/public/apps/med_conda/envs/analysis3-25.03/lib/python3.11/site-packages/distributed/protocol/pickle.pyâ, line 92, in loads
return pickle.loads(x)
^^^^^^^^^^^^^^^
_pickle.UnpicklingError: state is not a dictionary
Hi Anton,
It appears that was a problem, however, the problem still occurred when I reran it.
I found out that the correct kernel was not being loaded by my notebook. It prompted me for the kernel I wished to use from a pop-up menu. I loaded it, but it appears that it wasnât loaded appropriately. I had to the âreloadâ the correct kernel from the drop down menu at the top-right of the jupyter notebook (i.e. select the kernel twice).
I still have to be careful to load the correct Python path in the PBSCluster call as Scott suggested.
My code now runs as it did on hh5.
Note that I have tested this with a few kernels including anaylsis-25.04 and analysis-25.05
Oh good. As Charles noted - weâve had a few oddities with parallel access to netcdf files recently. Itâs still a bit unresolved but glad you have something working