Lazy sorting with dask (bootstrapping problem)

This is something that might be useful to all who do bootstrapping. I have a 2D (141 x 173) array of observations (obs, with lat and lon dimensions) and a 3D (141 x 173 x 1000) array of bootstrapped (resampled) statistics (resamples, with lat, lon and iteration dimensions).

A common thing we want to compute is the p-value of some test. To do this, we need to calculate the quantile at which the observed value sits within the bootstrap samples.

Here this means, for a particular lat and lon, finding the quantile of obs along the iteration dimension of resamples.

I have a function that does this on a 1D array:

import xarray as xr
import numpy as np

obs = xr.open_mfdataset("/g/data/w42/dr6273/tmp/obs.nc")["obs"]
samples = xr.open_mfdataset("/g/data/w42/dr6273/tmp/bootstraps.nc")["ds"]

def get_quantile(a, v):
    """
    Returns the quantile of a in v
    
    a: float or int
    v: array
    """
    return np.searchsorted(np.sort(v), a) / len(v)

This takes two arguments. To the best of my knowledge, to make this work with xr.apply_ufunc, I have to first stack both arrays:

obs_stack = obs.stack(point=("lat", "lon")).groupby('point')
samples_stack = samples.stack(point=("lat", "lon")).groupby('point')

quantiles = xr.apply_ufunc(
        get_quantile,
        obs_stack,
        samples_stack,
        input_core_dims=[[], ['iteration']],
        output_core_dims=[[]],
        dask='allowed'
    )

This returns a warning and an error:

/g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/dask/array/core.py:1712: FutureWarning: The `numpy.sort` function is not implemented by Dask array. You may want to use the da.map_blocks function or something similar to silence this warning. Your code may stop working in a future release.
  warnings.warn(
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File <timed exec>:1

File /g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/xarray/core/computation.py:1189, in apply_ufunc(func, input_core_dims, output_core_dims, exclude_dims, vectorize, join, dataset_join, dataset_fill_value, keep_attrs, kwargs, dask, output_dtypes, output_sizes, meta, dask_gufunc_kwargs, *args)
   1173 if any(isinstance(a, GroupBy) for a in args):
   1174     this_apply = functools.partial(
   1175         apply_ufunc,
   1176         func,
   (...)
   1187         dask_gufunc_kwargs=dask_gufunc_kwargs,
   1188     )
-> 1189     return apply_groupby_func(this_apply, *args)
   1190 # feed datasets apply_variable_ufunc through apply_dataset_vfunc
   1191 elif any(is_dict_like(a) for a in args):

File /g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/xarray/core/computation.py:557, in apply_groupby_func(func, *args)
    554     iterators.append(iterator)
    556 applied = (func(*zipped_args) for zipped_args in zip(*iterators))
--> 557 applied_example, applied = peek_at(applied)
    558 combine = first_groupby._combine
    559 if isinstance(applied_example, tuple):

File /g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/xarray/core/utils.py:186, in peek_at(iterable)
    182 """Returns the first value from iterable, as well as a new iterator with
    183 the same content as the original iterable
    184 """
    185 gen = iter(iterable)
--> 186 peek = next(gen)
    187 return peek, itertools.chain([peek], gen)

File /g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/xarray/core/computation.py:556, in <genexpr>(.0)
    553         iterator = itertools.repeat(arg)
    554     iterators.append(iterator)
--> 556 applied = (func(*zipped_args) for zipped_args in zip(*iterators))
    557 applied_example, applied = peek_at(applied)
    558 combine = first_groupby._combine

File /g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/xarray/core/computation.py:1204, in apply_ufunc(func, input_core_dims, output_core_dims, exclude_dims, vectorize, join, dataset_join, dataset_fill_value, keep_attrs, kwargs, dask, output_dtypes, output_sizes, meta, dask_gufunc_kwargs, *args)
   1202 # feed DataArray apply_variable_ufunc through apply_dataarray_vfunc
   1203 elif any(isinstance(a, DataArray) for a in args):
-> 1204     return apply_dataarray_vfunc(
   1205         variables_vfunc,
   1206         *args,
   1207         signature=signature,
   1208         join=join,
   1209         exclude_dims=exclude_dims,
   1210         keep_attrs=keep_attrs,
   1211     )
   1212 # feed Variables directly through apply_variable_ufunc
   1213 elif any(isinstance(a, Variable) for a in args):

File /g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/xarray/core/computation.py:315, in apply_dataarray_vfunc(func, signature, join, exclude_dims, keep_attrs, *args)
    310 result_coords, result_indexes = build_output_coords_and_indexes(
    311     args, signature, exclude_dims, combine_attrs=keep_attrs
    312 )
    314 data_vars = [getattr(a, "variable", a) for a in args]
--> 315 result_var = func(*data_vars)
    317 out: tuple[DataArray, ...] | DataArray
    318 if signature.num_outputs > 1:

File /g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/xarray/core/computation.py:771, in apply_variable_ufunc(func, signature, exclude_dims, dask, output_dtypes, vectorize, keep_attrs, dask_gufunc_kwargs, *args)
    766     if vectorize:
    767         func = _vectorize(
    768             func, signature, output_dtypes=output_dtypes, exclude_dims=exclude_dims
    769         )
--> 771 result_data = func(*input_data)
    773 if signature.num_outputs == 1:
    774     result_data = (result_data,)

Cell In[241], line 8, in get_quantile(a, v)
      1 def get_quantile(a, v):
      2     """
      3     Returns the quantile of a in v
      4     
      5     a: float or int
      6     v: array
      7     """
----> 8     return np.searchsorted(np.sort(v), a) / len(v)

File <__array_function__ internals>:180, in searchsorted(*args, **kwargs)

File /g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/dask/array/core.py:1760, in Array.__array_function__(self, func, types, args, kwargs)
   1757 if has_keyword(da_func, "like"):
   1758     kwargs["like"] = self
-> 1760 return da_func(*args, **kwargs)

File /g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/dask/array/routines.py:809, in searchsorted(a, v, side, sorter)
    806 @derived_from(np)
    807 def searchsorted(a, v, side="left", sorter=None):
    808     if a.ndim != 1:
--> 809         raise ValueError("Input array a must be one dimensional")
    811     if sorter is not None:
    812         raise NotImplementedError(
    813             "da.searchsorted with a sorter argument is not supported"
    814         )

ValueError: Input array a must be one dimensional

The warning implies I should be using a different function, not np.sort, that works with lazy arrays. As the error comes from that function, I assume the two are related. The warning and error are not thrown when i load the arrays into memory, but sometimes these are really large and it would take forever.

My attempts to to write a new function using map_blocks, as the warning suggests, have failed miserably. Can anyone help?

2 Likes

Hi @dougrichardson. I think the easiest and fastest way to get the quantiles is using something like this:

quantiles = (samples < obs).mean("iteration")

If you’re really wanting to use your get_quantile function, the easiest way to wrap a function that works on 1D arrays is to use the vectorize argument on apply_ufunc. E.g. in your case:

quantiles = xr.apply_ufunc(
        get_quantile,
        obs,
        samples,
        input_core_dims=[[], ['iteration']],
        output_core_dims=[[]],
        vectorize=True,
        dask='parallelized'
    )

Note, vectorize is provided primarily for convenience, not performance - it is essentially just a for loop. It would be possible to rewrite your function so that you could use apply_ufunc without vectorize=True, but I’m not sure it’s worth going into that here as I think my first suggestion is probably better.

3 Likes

Note, I also thing your stacking approach should work if you remove the groupbys.

Thanks, this worked perfectly and is so obvious once you see it!

And just noting in case others come across this: removing groupby only works if vectorize=True in apply_ufunc. Both versions (with groupby and vectorize=False, versus no groupby and vectorize=True) seem to have similar performance.

2 Likes