Using dask
with climpred
¶
This demo demonstrates climpred
’s capabilities with dask
https://docs.dask.org/en/latest/array.html. This enables enables out-of-memory and parallel computation for large datasets with climpred
.
[2]:
import warnings
%matplotlib inline
import numpy as np
import xarray as xr
import dask
import climpred
warnings.filterwarnings("ignore")
Load a
Client
to usedask.distributed
: https://stackoverflow.com/questions/51099685/best-practices-in-setting-number-of-dask-workers(Optionally) Use the
dask
dashboard to visualize performance: https://github.com/dask/dask-labextension
[1]:
from dask.distributed import Client
import multiprocessing
ncpu = multiprocessing.cpu_count()
processes = False
nworker = 2
threads = ncpu // nworker
print(
f"Number of CPUs: {ncpu}, number of threads: {threads}, number of workers: {nworker}, processes: {processes}",
)
client = Client(
processes=processes,
threads_per_worker=threads,
n_workers=nworker,
memory_limit="64GB",
)
client
Number of CPUs: 4, number of threads: 2, number of workers: 2, processes: False
[1]:
Client
|
Cluster
|
Load data¶
[4]:
# generic
ny, nx = 256, 220
nl, ni, nm = 20, 12, 10
init = xr.DataArray(np.random.random((nl, ni, nm, ny, nx)), dims=('lead', 'init', 'member', 'y', 'x'))
init.name='var'
init['init'] = np.arange(3000, 3300, 300 // ni)
init['lead'] = np.arange(1,1+init.lead.size)
control = xr.DataArray(np.random.random((300, ny, nx)),dims=('time', 'y', 'x'))
control.name='var'
control['time'] = np.arange(3000, 3300)
pm = climpred.PerfectModelEnsemble(init).add_control(control)
verify()
¶
[5]:
kw = {'comparison':'m2e', 'metric':'rmse', 'dim':['init', 'member']}
without dask
¶
[6]:
%time s = pm.verify(**kw)
CPU times: user 12 s, sys: 7.08 s, total: 19.1 s
Wall time: 19.1 s
2 core Mac Book Pro 2018: CPU times: user 11.5 s, sys: 6.88 s, total: 18.4 s Wall time: 19.6 s
24 core mistral node: CPU times: user 9.22 s, sys: 10.3 s, total: 19.6 s Wall time: 19.5 s
with dask
¶
In order to use dask
efficient, we need to chunk the data appropriately. Processing chunks of data lazily with dask
creates a tiny overhead per dask, therefore chunking mostly makes sense when applying it to large data.
It is important that the data is chunked along a different dimension than
dim
passed toverify()
!
[15]:
chunked_dim = 'y'
chunks = {chunked_dim:init[chunked_dim].size // nworker}
pm_chunked = pm.chunk(chunks)
# if memory allows
# pm_chunked = pm_chunked.persist()
pm_chunked.get_initialized()['var'].data
[15]:
|
[16]:
%%time
s_chunked = pm_chunked.verify(**kw)
assert dask.is_dask_collection(s_chunked)
s_chunked = s_chunked.compute()
CPU times: user 35.8 s, sys: 18.5 s, total: 54.3 s
Wall time: 19.5 s
2 core Mac Book Pro 2018: CPU times: user 2min 35s, sys: 1min 4s, total: 3min 40s Wall time: 2min 10s
24 core mistral node: CPU times: user 26.2 s, sys: 1min 37s, total: 2min 3s Wall time: 5.38 s
[17]:
try:
xr.testing.assert_allclose(s,s_chunked,atol=1e-6)
except AssertionError:
for v in s.data_vars:
(s-s_chunked)[v].plot(robust=True, col='lead')
The results
s
ands_chunked
are identical as requested.Chunking reduces Wall time from 20s to 5s on supercomputer.
bootstrap()
¶
This speedup translates into bootstrap()
, where bootstrapped resamplings of intializialized, uninitialized and persistence skill are computed and then translated into p values and confidence intervals.
[18]:
kwp = kw.copy()
kwp['iterations'] = 4
without dask
¶
[ ]:
%time s_p = pm.bootstrap(**kwp)
2 core Mac Book Pro 2018: CPU times: user 2min 3s, sys: 1min 22s, total: 3min 26s Wall time: 3min 43s
24 core mistral node: CPU times: user 1min 51s, sys: 1min 54s, total: 3min 45s Wall time: 3min 25s
with dask
¶
When ds
is chunked, bootstrap()
performs all skill calculations on resampled inputs in parallel.
[ ]:
%time s_p_chunked = pm_chunked.bootstrap(**kwp).compute()
2 core Mac Book Pro 2018: CPU times: user 2min 35s, sys: 1min 4s, total: 3min 40s Wall time: 2min 10s
24 core mistral node: CPU times: user 2min 55s, sys: 8min 8s, total: 11min 3s Wall time: 1min 53s