Using dask
with climpred
¶
This demo demonstrates climpred
’s capabilities with dask
https://docs.dask.org/en/latest/array.html. This enables enables out-of-memory and parallel computation for large datasets with climpred
.
[1]:
import warnings
%matplotlib inline
import numpy as np
import xarray as xr
import dask
import climpred
warnings.filterwarnings("ignore")
- Load a
Client
to usedask.distributed
: https://stackoverflow.com/questions/51099685/best-practices-in-setting-number-of-dask-workers - (Optionally) Use the
dask
dashboard to visualize performance: https://github.com/dask/dask-labextension
[11]:
from dask.distributed import Client
import multiprocessing
ncpu = multiprocessing.cpu_count()
processes = False
nworker = 8
threads = ncpu // nworker
print(
f"Number of CPUs: {ncpu}, number of threads: {threads}, number of workers: {nworker}, processes: {processes}",
)
client = Client(
processes=processes,
threads_per_worker=threads,
n_workers=nworker,
memory_limit="64GB",
)
client
Number of CPUs: 48, number of threads: 6, number of workers: 8, processes: False
[11]:
Client
|
Cluster
|
Load data¶
[12]:
# generic
ny, nx = 256, 220
nl, ni, nm = 20, 12, 10
ds = xr.DataArray(np.random.random((nl, ni, nm, ny, nx)), dims=('lead', 'init', 'member', 'y', 'x'))
ds['init'] = np.arange(3000, 3300, 300 // ni)
ds['lead'] = np.arange(1,1+ds.lead.size)
control = xr.DataArray(np.random.random((300, ny, nx)),dims=('time', 'y', 'x'))
control['time'] = np.arange(3000, 3300)
compute skill with compute_perfect_model
¶
[13]:
kw = {'comparison':'m2e', 'metric':'rmse'}
compute skill without dask
¶
[14]:
%time s = climpred.prediction.compute_perfect_model(ds, control, **kw)
CPU times: user 10.9 s, sys: 10.9 s, total: 21.8 s
Wall time: 19.9 s
- 2 core Mac Book Pro 2018: CPU times: user 11.5 s, sys: 6.88 s, total: 18.4 s Wall time: 19.6 s
- 24 core mistral node: CPU times: user 9.22 s, sys: 10.3 s, total: 19.6 s Wall time: 19.5 s
compute skill with dask
¶
In order to use dask
efficient, we need to chunk the data appropriately. Processing chunks of data lazily with dask
creates a tiny overhead per dask, therefore chunking mostly makes sense when applying it to large data.
[26]:
chunked_dim = 'y'
chunks = {chunked_dim:ds[chunked_dim].size // nworker}
ds = ds.chunk(chunks)
# if memory allows
ds = ds.persist()
ds.data
[26]:
|
[27]:
%%time
s_chunked = climpred.prediction.compute_perfect_model(ds, control, **kw)
assert dask.is_dask_collection(s_chunked)
s_chunked = s_chunked.compute()
CPU times: user 25.5 s, sys: 1min 21s, total: 1min 46s
Wall time: 5.42 s
- 2 core Mac Book Pro 2018: CPU times: user 2min 35s, sys: 1min 4s, total: 3min 40s Wall time: 2min 10s
- 24 core mistral node: CPU times: user 26.2 s, sys: 1min 37s, total: 2min 3s Wall time: 5.38 s
[28]:
try:
xr.testing.assert_allclose(s,s_chunked,atol=1e-6)
except AssertionError:
for v in s.data_vars:
(s-s_chunked)[v].plot(robust=True, col='lead')
- The results
s
ands_chunked
are identical as requested. - Chunking reduces Wall time from 20s to 5s on supercomputer.
bootstrap skill with bootstrap_perfect_model
¶
This speedup translates into bootstrap_perfect_model
, where bootstrapped resamplings of intializialized, uninitialized and persistence skill are computed and then translated into p values and confidence intervals.
[29]:
kwp = kw.copy()
kwp['iterations'] = 4
bootstrap skill without dask
¶
[30]:
ds = ds.compute()
control = control.compute()
[31]:
%time s_p = climpred.bootstrap.bootstrap_perfect_model(ds, control, **kwp)
CPU times: user 1min 51s, sys: 1min 54s, total: 3min 45s
Wall time: 3min 25s
- 2 core Mac Book Pro 2018: CPU times: user 2min 3s, sys: 1min 22s, total: 3min 26s Wall time: 3min 43s
- 24 core mistral node: CPU times: user 1min 51s, sys: 1min 54s, total: 3min 45s Wall time: 3min 25s
bootstrap skill with dask
¶
When ds
is chunked, bootstrap_perfect_model
performs all skill calculations on resampled inputs in parallel.
[32]:
chunked_dim = 'y'
chunks = {chunked_dim:ds[chunked_dim].size // nworker}
ds = ds.chunk(chunks)
# if memory allows
ds = ds.persist()
ds.data
[32]:
|
[33]:
%time s_p_chunked = climpred.bootstrap.bootstrap_perfect_model(ds, control, **kwp)
CPU times: user 2min 55s, sys: 8min 8s, total: 11min 3s
Wall time: 1min 53s
- 2 core Mac Book Pro 2018: CPU times: user 2min 35s, sys: 1min 4s, total: 3min 40s Wall time: 2min 10s
- 24 core mistral node: CPU times: user 2min 55s, sys: 8min 8s, total: 11min 3s Wall time: 1min 53s
[ ]: