Using dask jobqueue with xp65 conda env

Hi all.

Hopefully someone has PBS clusters working with xp65.

I’m unable to do so. With analysis-25.03 or 25.04 I get warnings about mismatched versions (varies a bit though):

2025-05-02 16:39:06,372 - distributed.worker - WARNING - Mismatched versions found
±------------±--------------------------------------------±----------------±---------------+| Package | Worker-c4acc418-d28f-4266-9626-0b51cf0bd8c1 | Scheduler | Workers |
±------------±--------------------------------------------±----------------±---------------+
| cloudpickle | 3.1.0 | 3.1.1 | 3.1.0 |

| tornado | 6.4.1 | 6.4.2 | 6.4.1 |
±------------±--------------------------------------------±----------------±---------------+

and then failures todeserialize or errors involving unexpected keyword arguments, makes sense if versions are incompatible I guess.

2025-05-02 16:39:06,514 - distributed.protocol.pickle - INFO - Failed to deserialize b’\x80\x05\ \x94\x8c\x0copen_dataset\x94\x93\x94]\x94XB\x01\x00\x00/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20150101-20151231.nc\x94ah x86\x94b.’
Traceback (most recent call last):
File “/g/data/xp65/public/apps/med_conda/envs/analysis3-24.07/lib/python3.11/site-packages/distributed/protocol/pickle.py”, line 92, in loads
return pickle.loads(x)
^^^^^^^^^^^^^^^
AttributeError: ‘Task’ object has no attribute ‘_data_producer’

When run in a notebook it just hangs as the cluster closes and open_mfdataset then just sits there waiting forever.

With 24.07 I get a different error stream, a whole lot of Engine loading failures, all seemingly related to libmpi:

File “/g/data/xp65/public/apps/med_conda/envs/analysis3-24.07/lib/python3.11/site-packages/h5py/init.py”, line 25, in
from . import _errors
^^^^^^^^^^^^^^^^^
ImportError: libmpi.so.40: cannot open shared object file: No such file or directory

I’m at a loss as in terms of debugging, need someone who knows how these things work I think.

Can I upload my minimal test script here somehow? (Upload button above says no to .py format). Pasted below.

Thanks for suggestions.

Kind regards,
Aurel Griesser

import xarray as xr

from dask.distributed import Client
from dask_jobqueue import PBSCluster


def my_start_PBS_dask_cluster(  
    cores=4,
    memory="9GB",
    processes=4,
    walltime = '1:00:00',
    storages = 'gdata/ai05+gdata/xp65+gdata/tp28+gdata/ia39+gdata/mn51+gdata/ob53+gdata/py18+gdata/py18+gdata/dk92'
):
    
    print("Starting Dask...\r", end="")
    
    cluster = PBSCluster(walltime=str(walltime), cores=cores, memory=str(memory), processes=processes,
                         job_extra_directives=['-q normalbw',
                                               '-l ncpus='+str(cores),
                                               '-l mem='+str(memory),
                                               '-l storage='+storages,
                                               '-l jobfs=10GB',
                                               '-P ai05'],
                         job_script_prologue=['module unload conda/analysis3-25.03', 'module load conda/analysis3-24.07'],
                         job_directives_skip=["select"],
                         # python="/g/data/xp65/public/apps/med_conda/envs/analysis3-25.03/bin/python",  # Why not this? 'which python' gives me this path, but Scott provided the one on the next line.
                         # python="/g/data/xp65/public/apps/med_conda_scripts/analysis3-25.04.d/bin/python",
                         python="/g/data/xp65/public/apps/med_conda_scripts/analysis3-24.07.d/bin/python",
                        )
    
    cluster.scale(jobs=1)  # Scale the resource to this many nodes
    client = Client(cluster)
    print(f"Dask Client started. Dashboard URL: {client.dashboard_link}")
    return client, cluster


def my_load_data():
    file_list = [
        "/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20150101-20151231.nc",
        "/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20160101-20161231.nc",
        "/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20170101-20171231.nc"
    ]
    
    ds = xr.open_mfdataset(file_list, parallel=True, engine='h5netcdf')
    
    return ds


if __name__ == '__main__':

    try:
        # Initialise a Dask cluster at this stage, since our inputs are probably OK.
        client, cluster = my_start_PBS_dask_cluster()
    
        # Load data.
        ds = my_load_data()
        print(f"Mean value: {ds['tasmaxAdjust'].mean()}")
        
    finally:
        cluster.close()
        client.close()
        print("Closed Dask cluster")

    print("All done!")

I just run that with python test_dask.py in a terminal in an ARE browser window.

It’s fine to use the in-built forum code box as above, but if you want to share longer snippets or programs there are some suggestions in this topic:

1 Like

Hi Aurel,

a couple of debugging steps:

  1. Can you add openmpi to your requirements. I’ve been testing this with the following modules & I don’t get any libmpi.so errors.

    I’m pretty sure that error is basically telling you that the mpi library can’t be found. Similarly I added it in the cluster startup:
...
    job_script_prologue=['module load conda/analysis3-25.02', 'module load openmpi'],
...
  1. I can stop the hanging by changing
    ds = xr.open_mfdataset(file_list, parallel=True, engine='h5netcdf')
    
    to
    ds = xr.open_mfdataset(file_list, parallel=False, engine='h5netcdf')
    
    I’m pretty confident in this instance what’s happening is that the parallel=True is causing xarray to try to create locks inside the dask cluster, which are then competing and causing the computation to hang. EDIT: ^ This turns out to be wrong - instead, it’s accidentally creating a local cluster, and the tasks aren’t being passed to the right schedulers.

I’m playing around with whether we can pass a lock in explicitly, but I haven’t quite got that working yet. This should provide some performance benefit over parallel=False but still avoid the deadlocks. If (hopefully when) I get that working I’ll post a follow up.

Okay, I’ve dug into the xarray source code, and I suspect I understand the source of the issue.

In the xarray source code, calling open_mfdataset() with parallel=True will create a bunch of dask tasks, one associated with opening each dataset (using open_dataset). These are then computed & combined.

There doesn’t appear to be anything local that lets you tell xarray which dask cluster/client to use. From experience, xarray is supposed to ‘just know’ which dask client/cluster you’re trying to send jobs to.

For example, the following works when run in a notebook, with the jobs of opening the datasets correctly sent off to the dask workers:

from dask.distributed import Client
import xarray as xr

client = Client(threads_per_worker=1)


def my_load_data():
    print("Defining files to open")
    file_list = [
        "/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20150101-20151231.nc",
        "/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20160101-20161231.nc",
        "/g/data/ia39/australian-climate-service/release/CORDEX/output-Adjust/CMIP6/bias-adjusted-output/AUST-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1-ACS-MRNBC-AGCDv1-1960-2022/day/tasmaxAdjust/v20241216/tasmaxAdjust_AUST-05i_ACCESS-CM2_ssp370_r4i1p1f1_BOM_BARPA-R_v1-r1-ACS-MRNBC-AGCDv1-1960-2022_day_20170101-20171231.nc",
    ]

    print("Opening files...")
    ds = xr.open_mfdataset(file_list, parallel=True, engine="h5netcdf")

    print("Dataset opened, returning")
    return ds

ds = my_load_data()
print(f"Mean value: {ds['tasmaxAdjust'].mean()}")
Defining files to open
Opening files...
Dataset opened, returning
Mean value: <xarray.DataArray 'tasmaxAdjust' ()> Size: 4B
dask.array<mean_agg-aggregate, shape=(), dtype=float32, chunksize=(), chunktype=numpy.ndarray>

It seems that for whatever reason in your example, xarray is not correctly identifying that it needs to send off jobs to the dask cluster you’ve spun up, instead (I assume) sending them off to some sort of in process client which doesn’t exist or can’t process them.

I haven’t been able to find any documentation on how to specify a dask cluster for xarray to use manually, but it appears for whatever reason in this script it’s just not being picked up correctly. The dask progress graph shows the jobs (open_dataset) being waited on indefinitely.

Note also that removing parallel=True in the script you provided as I mentioned above means that the file opens aren’t sent to your dask workers, which is far from ideal (especially with larger datasets).

I’m not sure if anyone has any experience coercing xarray into using the correct dask client - maybe @dougiesquire?

1 Like

@ag0498, what makes you think things aren’t working? I am able to successfully run your script in analysis-25.02, analysis-25.04 and analysis-25.05. Bear in mind that dask jobqueue submits PBS scripts for each of the dask workers you request, and work will not start until these jobs begin. Could you be interpreting this delay as the job “hanging”? You can see the status of the worker jobs by running qstat -u <username>.

Are you sure you were setting the python path correctly in your PBSCluster initialisation? I would also make sure you don’t have anything in your ~/.bashrc or ~/.bash_profile that could cause issues (e.g. module load commands).

I haven’t tried to recreate this given that analysis-25.02, analysis-25.04 and analysis-25.05 worked for me. Do you need 24.07 for any particular reason or can you use one of the more recent ones?

Hmmm, there does appear to be an issue with analysis-25.03. I get KilledWorker worker errors when trying to run @ag0498’s script in this environment. Ping @rbeucher, @CharlesTurner

Thank you all for your help so far!

I’ve added the openmpi module and the job_script_prologue but am getting warnings related to versions still (as per ‘dask-worker.eXXX’ files).

2025-05-05 13:28:53,652 - distributed.worker - WARNING - Mismatched versions found

+-------------+---------------------------------------------+-----------------+-----------------+
| Package     | Worker-d4cb978b-6744-4425-9a79-0317a41e356a | Scheduler       | Workers         |
+-------------+---------------------------------------------+-----------------+-----------------+
| dask        | 2025.3.0                                    | 2025.1.0        | 2025.3.0        |
| distributed | 2025.3.0                                    | 2025.1.0        | 2025.3.0        |
| python      | 3.11.12.final.0                             | 3.11.11.final.0 | 3.11.12.final.0 |
| toolz       | 1.0.0                                       | 0.12.1          | 1.0.0           |
+-------------+---------------------------------------------+-----------------+-----------------+

Ultimately the failure is

2025-05-05 13:49:10,050 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x149ad7fb0d90>>, <Task finished name='Task-6' coro=<Worker.handle_scheduler() done, defined at /g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/distributed/worker.py:220> exception=TypeError("ComputeTaskEvent.__init__() got an unexpected keyword argument 'duration'")>)
Traceback (most recent call last):
  File "/g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/tornado/ioloop.py", line 750, in _run_callback
    ret = callback()
          ^^^^^^^^^^
  File "/g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/tornado/ioloop.py", line 774, in _discard_future_result
    future.result()
  File "/g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/distributed/worker.py", line 223, in wrapper
    return await method(self, *args, **kwargs)  # type: ignore
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/distributed/worker.py", line 1319, in handle_scheduler
    await self.handle_stream(comm)
  File "/g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/distributed/core.py", line 917, in handle_stream
    handler(**merge(extra, msg))
  File "/g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/distributed/worker.py", line 1940, in _
    event = cls(**kwargs)
            ^^^^^^^^^^^^^
TypeError: ComputeTaskEvent.__init__() got an unexpected keyword argument 'duration'
2025-05-05 13:49:11,914 - distributed.nanny - ERROR - Worker process died unexpectedly
2025-05-05 13:49:12,262 - distributed.nanny - INFO - Closing Nanny at 'tcp://10.6.121.12:45693'. Reason: nanny-close-gracefully

I can see with qstat that the cluster starts, but then dies and the Python process seems to be left waiting for a return which will never arrive.

module list includes “3) conda/analysis3-25.04(analysis3)” and I am using python="/g/data/xp65/public/apps/med_conda_scripts/analysis3-25.04.d/bin/python" in my PBSCluster construction. I assume that these date/versions should match.

My bashrc only includes module load pbs as far as modules are concerned.

Thank you again all.

Regards,
Aurel.

I was using 24.07 to start with because all later ones failed with HDF errors in Jupyter notebook testing. Am now working with 25.04. Don’t know why HDF errors aren’t happening - perhaps they’ll re-appear once the dask issue is dealt with. Sorry, that’s a little confusing…

Potentially the same issue as this: Xp65 conda/analysis3 environment + regional_mom6 versioning - #9 by CharlesTurner.

1 Like

Okay, given that things worked for me, my guess is that your worker PBS jobs are possibly dying for some simple reason, e.g. a missing storage flag. Can you please take a look at the output logs for the failing jobs and let us know what the error logs say. IIRC, I think by default the logs should be going to /scratch/ai05/<username>/tmp/logs?

Hi.

Sure.

Early on, it seems each worker is telling me about mismatched versions:

2025-05-05 14:20:30,729 - distributed.worker - WARNING - Mismatched versions found

±------------±--------------------------------------------±----------------±----------------+
| Package | Worker-c01609f2-66c5-4266-90e5-c61235902306 | Scheduler | Workers |
±------------±--------------------------------------------±----------------±----------------+
| dask | 2025.3.0 | 2025.1.0 | 2025.3.0 |
| distributed | 2025.3.0 | 2025.1.0 | 2025.3.0 |
| python | 3.11.12.final.0 | 3.11.11.final.0 | 3.11.12.final.0 |
| toolz | 1.0.0 | 0.12.1 | 1.0.0 |
±------------±--------------------------------------------±----------------±----------------+

Later, I get a series of repeated errors as below:

2025-05-05 14:20:31,661 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x14db8a5acd90>>, <Task finished name=‘Task-6’ coro=<Worker.handle_scheduler() done, defined at /g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/distributed/worker.py:220> exception=TypeError(“ComputeTaskEvent.init() got an unexpected keyword argument ‘duration’”)>)
Traceback (most recent call last):
File “/g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/tornado/ioloop.py”, line 750, in _run_callback
ret = callback()
^^^^^^^^^^
File “/g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/tornado/ioloop.py”, line 774, in _discard_future_result
future.result()
File “/g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/distributed/worker.py”, line 223, in wrapper
return await method(self, *args, **kwargs) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/distributed/worker.py”, line 1319, in handle_scheduler
await self.handle_stream(comm)
File “/g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/distributed/core.py”, line 917, in handle_stream
handler(**merge(extra, msg))
File “/g/data/xp65/public/apps/med_conda/envs/analysis3-25.04/lib/python3.11/site-packages/distributed/worker.py”, line 1940, in _
event = cls(**kwargs)
^^^^^^^^^^^^^
TypeError: ComputeTaskEvent.init() got an unexpected keyword argument ‘duration’

After a few of these it seems that dask gives up (reasonable):

2025-05-05 14:20:31,717 - distributed._signals - INFO - Received signal SIGTERM (15)
2025-05-05 14:20:31,717 - distributed.nanny - INFO - Closing Nanny at ‘tcp://10.6.121.8:45761’. Reason: signal-15
2025-05-05 14:20:31,717 - distributed.nanny - INFO - Nanny asking worker to close. Reason: signal-15

The above errors happen when I open a terminal in a ARE session and run my script. I think it’s to do with conda and containers, based on the mismatched version warnings. So…

If I run conda deactivate, and then run my script, it works! I can even open the dask monitoring interface in the Jupyter environment and see the workers doing their thing.

(Off-topic here but: I can’t activate conda again in the same terminal because of an error “The following argument was not expected: shell.posix”).

Is this even plausible? Is this something very simple which I could perhaps have known all along? I have no idea how conda interacts with modules so didn’t occur to me to try… :blush:

Thanks for your assistance everyone!

Ah yes, this is the sort of thing I was trying to get at with:

The xp65 analysis modules use conda so you definitely want to deactivate any other conda instances before using them.

The thing is I am not explicitly activating any conda environments myself: the conda/analysis3 module seems to do this itself. I have specified “conda/analysis3” in the “Modules” box when starting a ARE session, but nothing in either “Python or Conda virtual environment base” or “Conda environment” boxes.

I presume xp65 implements the conda environment modules differently to the way hh5 did it.

I’ll just have to remember to deactivate conda before running these sort of scripts, if indeed this is the solution.

Ta, Aurel.

I am unable to recreate this. In my ARE set up I have:

  • Module directories: /g/data/xp65/public/modules
  • Modules: conda/analysis3-25.05

I’ve modified your script as follows and can run it successfully in a terminal on the ARE.

@@ -9,7 +9,7 @@ def my_start_PBS_dask_cluster(
     memory="9GB",
     processes=4,
     walltime = '1:00:00',
-    storages = 'gdata/ai05+gdata/xp65+gdata/tp28+gdata/ia39+gdata/mn51+gdata/ob53+gdata/py18+gdata/py18+gdata/dk92'
+    storages = "gdata/xp65+gdata/ia39"
 ):
     
     print("Starting Dask...\r", end="")
@@ -20,12 +20,10 @@ def my_start_PBS_dask_cluster(
                                                '-l mem='+str(memory),
                                                '-l storage='+storages,
                                                '-l jobfs=10GB',
-                                               '-P ai05'],
-                         job_script_prologue=['module unload conda/analysis3-25.03', 'module load conda/analysis3-24.07'],
+                                               '-P tm70'],
+                         job_script_prologue=['module load conda/analysis3-25.05'],
                          job_directives_skip=["select"],
-                         # python="/g/data/xp65/public/apps/med_conda/envs/analysis3-25.03/bin/python",  # Why not this? 'which python' gives me this path, but Scott provided the one on the next line.
-                         # python="/g/data/xp65/public/apps/med_conda_scripts/analysis3-25.04.d/bin/python",
-                         python="/g/data/xp65/public/apps/med_conda_scripts/analysis3-24.07.d/bin/python",
+                         python="/g/data/xp65/public/apps/med_conda_scripts/analysis3-25.05.d/bin/python"
                         )
     
     cluster.scale(jobs=1)  # Scale the resource to this many nodes

What do which python and conda info return for you in your ARE terminal (without calling conda deactivate)?

conda info:

Singularity> conda info
/g/data/xp65/public/apps/med_conda/envs/analysis3-25.03/lib/python3.11/site-packages/conda/base/context.py:202: FutureWarning: Adding ‘defaults’ to channel list implicitly is deprecated and will be removed in 25.3.

To remove this warning, please choose a default channel explicitly with conda’s regular configuration system, e.g. by adding ‘defaults’ to the list of channels:

conda config --add channels defaults

For more information see Using the .condarc conda configuration file — conda 25.3.1 documentation

deprecated.topic(

 active environment : base
active env location : /g/data/xp65/public/apps/med_conda/envs/analysis3-25.03
        shell level : 2
   user config file : /home/548/ag0498/.condarc

populated config files :
conda version : 25.1.1
conda-build version : 25.1.2
python version : 3.11.11.final.0
solver : libmamba (default)
virtual packages : __archspec=1=broadwell
__conda=25.1.1=0
__glibc=2.28=0
__linux=4.18.0=0
__unix=0=0
base environment : /g/data/xp65/public/apps/med_conda/envs/analysis3-25.03 (read only)
conda av data dir : /g/data/xp65/public/apps/med_conda/envs/analysis3-25.03/etc/conda
conda av metadata url : None
channel URLs : main/linux-64
main/noarch
r/linux-64
r/noarch
package cache : /g/data/xp65/public/apps/med_conda/envs/analysis3-25.03/pkgs
/home/548/ag0498/.conda/pkgs
envs directories : /home/548/ag0498/.conda/envs
/g/data/xp65/public/apps/med_conda/envs/analysis3-25.03/envs

and which python:

Singularity> which python
/g/data/xp65/public/apps/med_conda/envs/analysis3-25.03/bin/python

What can we glean from that?

It looks like you are trying to use analysis3-25.03 which, as I mentioned above, does appear to have an issue. Can you please try with 25.05?