Skip to content

resample and boolean indexing with dask-arrays #356

Open
@rpnaut

Description

@rpnaut

I want to use the resample_iterations_idx functionality to bootstrap evaluation metrics of hindcasts. The challenge with huge datasets is the memory allocation when storing all the iteration samples.
I started with the Mean squared error skill score and tried to understand the memory consumption. The following small example script demonstrates the usage of the metric and the memory consumption:

import numpy
import xskillscore as xskill
import xarray as xr
from memory_profiler import profile

@profile
def AMSEss():
  alpha=0.10
  coordtime="time"
  dimbs="iteration"
  dsskill = xr.DataArray(data=15 + 2.1 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
  dsref   = xr.DataArray(data=15 + 0.15 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
  dsproof = xr.DataArray(data=15 + 2.0 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])

  bsp1 = xskill.resample_iterations_idx( dsproof - dsref, iterations=500, dim=coordtime,replace=True)
  bsp2 = xskill.resample_iterations_idx( dsskill - dsref, iterations=500, dim=coordtime,replace=True)

  p2divp1 = ( numpy.square( bsp2 ) ).mean(dim=coordtime) / \
    ( numpy.square( bsp1 ) ).mean(dim=coordtime)
  amsessf = xr.where( (p2divp1 - 1.0)>0, (-1.0)/p2divp1 + 1.0,
    p2divp1 - 1.0 ) # could be that nan's are not preserved
  amsesslb = amsessf.quantile(alpha/2.0,dim=dimbs) #, keep_attrs=True)
  amsessub = amsessf.quantile(1.0-alpha/2.0,dim=dimbs) #, keep_attrs=True)
  del bsp1, bsp2, amsessf
  BSms = xr.where( (amsesslb)>0, amsesslb*0.0 +1.0, amsesslb*0.0 )
  BSms = BSms.where( (amsessub)>0, -1.0 )
  BSms = BSms.where( (1-(amsesslb<=0)) == (1-(amsessub<=0)), 0.0 )
  BSms = BSms.where( amsesslb.notnull().data )
  BSms.to_netcdf("bsms_chunked.nc")

if __name__ == '__main__':
  AMSEss()

Running the script from the linux console gives me the following:

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     7    189.4 MiB    189.4 MiB           1   @profile
     8                                         def AMSEss():
     9    189.4 MiB      0.0 MiB           1     alpha=0.10
    10    189.4 MiB      0.0 MiB           1     coordtime="time"
    11    189.4 MiB      0.0 MiB           1     dimbs="iteration"
    12    201.0 MiB     11.6 MiB           1     dsskill = xr.DataArray(data=15 + 20 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
    13    212.5 MiB     11.4 MiB           1     dsref   = xr.DataArray(data=15 + 15 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
    14    223.9 MiB     11.4 MiB           1     dsproof = xr.DataArray(data=15 + 10 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
    15                                         
    16   5946.6 MiB   5722.7 MiB           1     bsp1 = xskill.resample_iterations_idx( dsproof - dsref, iterations=500, dim=coordtime,replace=True)
    17  11680.0 MiB   5733.4 MiB           1     bsp2 = xskill.resample_iterations_idx( dsskill - dsref, iterations=500, dim=coordtime,replace=True)
    18                                         
    19  11718.2 MiB      0.0 MiB           2     p2divp1 = ( numpy.square( bsp2 ) ).mean(dim=coordtime) / \
    20  11756.3 MiB     38.1 MiB           1       ( numpy.square( bsp1 ) ).mean(dim=coordtime)
    21  11756.4 MiB      0.0 MiB           2     amsessf = xr.where( (p2divp1 - 1.0)>0, (-1.0)/p2divp1 + 1.0, 
    22  11794.5 MiB     38.1 MiB           1       p2divp1 - 1.0 ) # could be that nan's are not preserved
    23  11756.4 MiB      0.0 MiB           1     amsesslb = amsessf.quantile(alpha/2.0,dim=dimbs) #, keep_attrs=True)
    24  11756.4 MiB      0.0 MiB           1     amsessub = amsessf.quantile(1.0-alpha/2.0,dim=dimbs) #, keep_attrs=True)
    25    274.2 MiB -11482.2 MiB           1     del bsp1, bsp2, amsessf
    26    274.2 MiB      0.0 MiB           1     BSms = xr.where( (amsesslb)>0, amsesslb*0.0 +1.0, amsesslb*0.0 )
    27    274.2 MiB      0.0 MiB           1     BSms = BSms.where( (amsessub)>0, -1.0 )
    28    274.2 MiB      0.0 MiB           1     BSms = BSms.where( (1-(amsesslb<=0)) == (1-(amsessub<=0)), 0.0 )
    29    274.2 MiB      0.0 MiB           1     BSms = BSms.where( amsesslb.notnull().data )

Thus, we need 5.7GB to store each of the iteration samples bsp1 and bsp2. That is consistent to the size of the arrays. However, often the climate datasets are much larger. So, I started working with dask arrays and changed the three lines:

  dsskill = xr.DataArray(data=15 + 2.1 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"]).chunk({'time':10})
  dsref   = xr.DataArray(data=15 + 0.15 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"]).chunk({'time':10})
  dsproof = xr.DataArray(data=15 + 2.0 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"]).chunk({'time':10})

Now, the dask scheduler starts to collect all operations and performs the computation as the netcdf-file is written. However, the resampling_iterations_idx seems to refer indices which belong to the unchunked fields but not to the chunked fields:

  File "/sw/spack-rhel6/miniforge3-4.9.2-3-Linux-x86_64-pwdbqi/lib/python3.8/site-packages/dask/optimization.py", line 963, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/sw/spack-rhel6/miniforge3-4.9.2-3-Linux-x86_64-pwdbqi/lib/python3.8/site-packages/dask/core.py", line 151, in get
    result = _execute_task(task, cache)
  File "/sw/spack-rhel6/miniforge3-4.9.2-3-Linux-x86_64-pwdbqi/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/zmaw/m221054/.local/lib/python3.8/site-packages/xskillscore/core/resampling.py", line 193, in select_bootstrap_indices_ufunc
    return np.moveaxis(x.squeeze()[idx.squeeze().transpose()], 0, -1)
IndexError: index 144 is out of bounds for axis 0 with size 10

Is there a way, to use the resampling functionality on dask arrays to save memory.? It is not clear to me if this is really parallelizable. As already commented in issue #221 by @dougiesquire (#221 (comment)), there is a problem with boolean indexing in numpy arrays which utilize dask arrays.

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions