Using dask with climpred

This demo demonstrates climpred’s capabilities with dask https://docs.dask.org/en/latest/array.html. This enables enables out-of-memory and parallel computation for large datasets with climpred.

[2]:
import warnings

%matplotlib inline
import numpy as np
import xarray as xr
import dask
import climpred

warnings.filterwarnings("ignore")
[1]:
from dask.distributed import Client
import multiprocessing
ncpu = multiprocessing.cpu_count()
processes = False
nworker = 2
threads = ncpu // nworker
print(
    f"Number of CPUs: {ncpu}, number of threads: {threads}, number of workers: {nworker}, processes: {processes}",
)
client = Client(
    processes=processes,
    threads_per_worker=threads,
    n_workers=nworker,
    memory_limit="64GB",
)
client
Number of CPUs: 4, number of threads: 2, number of workers: 2, processes: False
[1]:

Client

Cluster

  • Workers: 2
  • Cores: 4
  • Memory: 34.36 GB

Load data

[4]:
# generic
ny, nx = 256, 220
nl, ni, nm = 20, 12, 10
init = xr.DataArray(np.random.random((nl, ni, nm, ny, nx)), dims=('lead', 'init', 'member', 'y', 'x'))
init.name='var'
init['init'] = np.arange(3000, 3300, 300 // ni)
init['lead'] = np.arange(1,1+init.lead.size)
control = xr.DataArray(np.random.random((300, ny, nx)),dims=('time', 'y', 'x'))
control.name='var'
control['time'] = np.arange(3000, 3300)

pm = climpred.PerfectModelEnsemble(init).add_control(control)

verify()

[5]:
kw = {'comparison':'m2e', 'metric':'rmse', 'dim':['init', 'member']}

without dask

[6]:
%time s = pm.verify(**kw)
CPU times: user 12 s, sys: 7.08 s, total: 19.1 s
Wall time: 19.1 s
  • 2 core Mac Book Pro 2018: CPU times: user 11.5 s, sys: 6.88 s, total: 18.4 s Wall time: 19.6 s

  • 24 core mistral node: CPU times: user 9.22 s, sys: 10.3 s, total: 19.6 s Wall time: 19.5 s

with dask

In order to use dask efficient, we need to chunk the data appropriately. Processing chunks of data lazily with dask creates a tiny overhead per dask, therefore chunking mostly makes sense when applying it to large data.

It is important that the data is chunked along a different dimension than dim passed to verify()!

[15]:
chunked_dim = 'y'
chunks = {chunked_dim:init[chunked_dim].size // nworker}
pm_chunked = pm.chunk(chunks)
# if memory allows
# pm_chunked = pm_chunked.persist()
pm_chunked.get_initialized()['var'].data
[15]:
Array Chunk
Bytes 1.08 GB 540.67 MB
Shape (20, 12, 10, 256, 220) (20, 12, 10, 128, 220)
Count 2 Tasks 2 Chunks
Type float64 numpy.ndarray
12 20 220 256 10
[16]:
%%time
s_chunked = pm_chunked.verify(**kw)
assert dask.is_dask_collection(s_chunked)
s_chunked = s_chunked.compute()
CPU times: user 35.8 s, sys: 18.5 s, total: 54.3 s
Wall time: 19.5 s
  • 2 core Mac Book Pro 2018: CPU times: user 2min 35s, sys: 1min 4s, total: 3min 40s Wall time: 2min 10s

  • 24 core mistral node: CPU times: user 26.2 s, sys: 1min 37s, total: 2min 3s Wall time: 5.38 s

[17]:
try:
    xr.testing.assert_allclose(s,s_chunked,atol=1e-6)
except AssertionError:
    for v in s.data_vars:
        (s-s_chunked)[v].plot(robust=True, col='lead')
  • The results s and s_chunked are identical as requested.

  • Chunking reduces Wall time from 20s to 5s on supercomputer.


bootstrap()

This speedup translates into bootstrap(), where bootstrapped resamplings of intializialized, uninitialized and persistence skill are computed and then translated into p values and confidence intervals.

[18]:
kwp = kw.copy()
kwp['iterations'] = 4

without dask

[ ]:
%time s_p = pm.bootstrap(**kwp)
  • 2 core Mac Book Pro 2018: CPU times: user 2min 3s, sys: 1min 22s, total: 3min 26s Wall time: 3min 43s

  • 24 core mistral node: CPU times: user 1min 51s, sys: 1min 54s, total: 3min 45s Wall time: 3min 25s

with dask

When ds is chunked, bootstrap() performs all skill calculations on resampled inputs in parallel.

[ ]:
%time s_p_chunked = pm_chunked.bootstrap(**kwp).compute()
  • 2 core Mac Book Pro 2018: CPU times: user 2min 35s, sys: 1min 4s, total: 3min 40s Wall time: 2min 10s

  • 24 core mistral node: CPU times: user 2min 55s, sys: 8min 8s, total: 11min 3s Wall time: 1min 53s