Using dask with climpred

This demo demonstrates climpred’s capabilities with dask https://docs.dask.org/en/latest/array.html. This enables enables out-of-memory and parallel computation for large datasets with climpred.

[1]:
import warnings

%matplotlib inline
import numpy as np
import xarray as xr
import dask
import climpred

warnings.filterwarnings("ignore")
[11]:
from dask.distributed import Client
import multiprocessing
ncpu = multiprocessing.cpu_count()
processes = False
nworker = 8
threads = ncpu // nworker
print(
    f"Number of CPUs: {ncpu}, number of threads: {threads}, number of workers: {nworker}, processes: {processes}",
)
client = Client(
    processes=processes,
    threads_per_worker=threads,
    n_workers=nworker,
    memory_limit="64GB",
)
client
Number of CPUs: 48, number of threads: 6, number of workers: 8, processes: False
[11]:

Client

Cluster

  • Workers: 8
  • Cores: 48
  • Memory: 512.00 GB

Load data

[12]:
# generic
ny, nx = 256, 220
nl, ni, nm = 20, 12, 10
ds = xr.DataArray(np.random.random((nl, ni, nm, ny, nx)), dims=('lead', 'init', 'member', 'y', 'x'))
ds['init'] = np.arange(3000, 3300, 300 // ni)
ds['lead'] = np.arange(1,1+ds.lead.size)
control = xr.DataArray(np.random.random((300, ny, nx)),dims=('time', 'y', 'x'))
control['time'] = np.arange(3000, 3300)

compute skill with compute_perfect_model

[13]:
kw = {'comparison':'m2e', 'metric':'rmse'}

compute skill without dask

[14]:
%time s = climpred.prediction.compute_perfect_model(ds, control, **kw)
CPU times: user 10.9 s, sys: 10.9 s, total: 21.8 s
Wall time: 19.9 s
  • 2 core Mac Book Pro 2018: CPU times: user 11.5 s, sys: 6.88 s, total: 18.4 s Wall time: 19.6 s
  • 24 core mistral node: CPU times: user 9.22 s, sys: 10.3 s, total: 19.6 s Wall time: 19.5 s

compute skill with dask

In order to use dask efficient, we need to chunk the data appropriately. Processing chunks of data lazily with dask creates a tiny overhead per dask, therefore chunking mostly makes sense when applying it to large data.

[26]:
chunked_dim = 'y'
chunks = {chunked_dim:ds[chunked_dim].size // nworker}
ds = ds.chunk(chunks)
# if memory allows
ds = ds.persist()
ds.data
[26]:
Array Chunk
Bytes 1.08 GB 135.17 MB
Shape (20, 12, 10, 256, 220) (20, 12, 10, 32, 220)
Count 8 Tasks 8 Chunks
Type float64 numpy.ndarray
12 20 220 256 10
[27]:
%%time
s_chunked = climpred.prediction.compute_perfect_model(ds, control, **kw)
assert dask.is_dask_collection(s_chunked)
s_chunked = s_chunked.compute()
CPU times: user 25.5 s, sys: 1min 21s, total: 1min 46s
Wall time: 5.42 s
  • 2 core Mac Book Pro 2018: CPU times: user 2min 35s, sys: 1min 4s, total: 3min 40s Wall time: 2min 10s
  • 24 core mistral node: CPU times: user 26.2 s, sys: 1min 37s, total: 2min 3s Wall time: 5.38 s
[28]:
try:
    xr.testing.assert_allclose(s,s_chunked,atol=1e-6)
except AssertionError:
    for v in s.data_vars:
        (s-s_chunked)[v].plot(robust=True, col='lead')
  • The results s and s_chunked are identical as requested.
  • Chunking reduces Wall time from 20s to 5s on supercomputer.

bootstrap skill with bootstrap_perfect_model

This speedup translates into bootstrap_perfect_model, where bootstrapped resamplings of intializialized, uninitialized and persistence skill are computed and then translated into p values and confidence intervals.

[29]:
kwp = kw.copy()
kwp['iterations'] = 4

bootstrap skill without dask

[30]:
ds = ds.compute()
control = control.compute()
[31]:
%time s_p = climpred.bootstrap.bootstrap_perfect_model(ds, control, **kwp)
CPU times: user 1min 51s, sys: 1min 54s, total: 3min 45s
Wall time: 3min 25s
  • 2 core Mac Book Pro 2018: CPU times: user 2min 3s, sys: 1min 22s, total: 3min 26s Wall time: 3min 43s
  • 24 core mistral node: CPU times: user 1min 51s, sys: 1min 54s, total: 3min 45s Wall time: 3min 25s

bootstrap skill with dask

When ds is chunked, bootstrap_perfect_model performs all skill calculations on resampled inputs in parallel.

[32]:
chunked_dim = 'y'
chunks = {chunked_dim:ds[chunked_dim].size // nworker}
ds = ds.chunk(chunks)
# if memory allows
ds = ds.persist()
ds.data
[32]:
Array Chunk
Bytes 1.08 GB 135.17 MB
Shape (20, 12, 10, 256, 220) (20, 12, 10, 32, 220)
Count 8 Tasks 8 Chunks
Type float64 numpy.ndarray
12 20 220 256 10
[33]:
%time s_p_chunked = climpred.bootstrap.bootstrap_perfect_model(ds, control, **kwp)
CPU times: user 2min 55s, sys: 8min 8s, total: 11min 3s
Wall time: 1min 53s
  • 2 core Mac Book Pro 2018: CPU times: user 2min 35s, sys: 1min 4s, total: 3min 40s Wall time: 2min 10s
  • 24 core mistral node: CPU times: user 2min 55s, sys: 8min 8s, total: 11min 3s Wall time: 1min 53s
[ ]: