Using dask with climpred

This demo demonstrates climpred’s capabilities with dask https://docs.dask.org/en/latest/array.html. This enables enables out-of-memory and parallel computation for large datasets with climpred.

[2]:
import warnings

%matplotlib inline
import numpy as np
import xarray as xr
import dask
import climpred

warnings.filterwarnings("ignore")
[1]:
from dask.distributed import Client
import multiprocessing
ncpu = multiprocessing.cpu_count()
processes = False
nworker = 2
threads = ncpu // nworker
print(
    f"Number of CPUs: {ncpu}, number of threads: {threads}, number of workers: {nworker}, processes: {processes}",
)
client = Client(
    processes=processes,
    threads_per_worker=threads,
    n_workers=nworker,
    memory_limit="64GB",
)
client
Number of CPUs: 4, number of threads: 2, number of workers: 2, processes: False
[1]:

Client

Cluster

  • Workers: 2
  • Cores: 4
  • Memory: 34.36 GB

Load data

[4]:
# generic
ny, nx = 256, 220
nl, ni, nm = 20, 12, 10
init = xr.DataArray(np.random.random((nl, ni, nm, ny, nx)), dims=('lead', 'init', 'member', 'y', 'x'))
init.name='var'
init['init'] = np.arange(3000, 3300, 300 // ni)
init['lead'] = np.arange(1,1+init.lead.size)
control = xr.DataArray(np.random.random((300, ny, nx)),dims=('time', 'y', 'x'))
control.name='var'
control['time'] = np.arange(3000, 3300)

pm = climpred.PerfectModelEnsemble(init).add_control(control)

compute skill with compute_perfect_model

[5]:
kw = {'comparison':'m2e', 'metric':'rmse', 'dim':['init', 'member']}

compute skill without dask

[6]:
%time s = pm.verify(**kw)
CPU times: user 12 s, sys: 7.08 s, total: 19.1 s
Wall time: 19.1 s
  • 2 core Mac Book Pro 2018: CPU times: user 11.5 s, sys: 6.88 s, total: 18.4 s Wall time: 19.6 s

  • 24 core mistral node: CPU times: user 9.22 s, sys: 10.3 s, total: 19.6 s Wall time: 19.5 s

verify() skill with dask

In order to use dask efficient, we need to chunk the data appropriately. Processing chunks of data lazily with dask creates a tiny overhead per dask, therefore chunking mostly makes sense when applying it to large data.

[15]:
chunked_dim = 'y'
chunks = {chunked_dim:init[chunked_dim].size // nworker}
pm_chunked = pm.chunk(chunks)
# if memory allows
# pm_chunked = pm_chunked.persist()
pm_chunked.get_initialized()['var'].data
[15]:
Array Chunk
Bytes 1.08 GB 540.67 MB
Shape (20, 12, 10, 256, 220) (20, 12, 10, 128, 220)
Count 2 Tasks 2 Chunks
Type float64 numpy.ndarray
12 20 220 256 10
[16]:
%%time
s_chunked = pm_chunked.verify(**kw)
assert dask.is_dask_collection(s_chunked)
s_chunked = s_chunked.compute()
CPU times: user 35.8 s, sys: 18.5 s, total: 54.3 s
Wall time: 19.5 s
  • 2 core Mac Book Pro 2018: CPU times: user 2min 35s, sys: 1min 4s, total: 3min 40s Wall time: 2min 10s

  • 24 core mistral node: CPU times: user 26.2 s, sys: 1min 37s, total: 2min 3s Wall time: 5.38 s

[17]:
try:
    xr.testing.assert_allclose(s,s_chunked,atol=1e-6)
except AssertionError:
    for v in s.data_vars:
        (s-s_chunked)[v].plot(robust=True, col='lead')
  • The results s and s_chunked are identical as requested.

  • Chunking reduces Wall time from 20s to 5s on supercomputer.


bootstrap() skill

This speedup translates into bootstrap_perfect_model, where bootstrapped resamplings of intializialized, uninitialized and persistence skill are computed and then translated into p values and confidence intervals.

[18]:
kwp = kw.copy()
kwp['iterations'] = 4

without dask

[ ]:
%time s_p = pm.bootstrap(**kwp)
  • 2 core Mac Book Pro 2018: CPU times: user 2min 3s, sys: 1min 22s, total: 3min 26s Wall time: 3min 43s

  • 24 core mistral node: CPU times: user 1min 51s, sys: 1min 54s, total: 3min 45s Wall time: 3min 25s

bootstrap skill with dask

When ds is chunked, bootstrap_perfect_model performs all skill calculations on resampled inputs in parallel.

[ ]:
%time s_p_chunked = pm_chunked.bootstrap(**kwp).compute()
  • 2 core Mac Book Pro 2018: CPU times: user 2min 35s, sys: 1min 4s, total: 3min 40s Wall time: 2min 10s

  • 24 core mistral node: CPU times: user 2min 55s, sys: 8min 8s, total: 11min 3s Wall time: 1min 53s