{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Using `dask` with `climpred`\n", "\n", "This demo demonstrates `climpred`'s capabilities with [`dask`](https://docs.dask.org/en/latest/array.html). This enables enables out-of-memory and parallel computation for large datasets with `climpred`." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import warnings\n", "\n", "%matplotlib inline\n", "import numpy as np\n", "import xarray as xr\n", "import dask\n", "import climpred\n", "\n", "warnings.filterwarnings(\"ignore\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- Load a `Client` to use `dask.distributed`: [stackoverflow](https://stackoverflow.com/questions/51099685/best-practices-in-setting-number-of-dask-workers)\n", "- (Optionally) [Use the `dask` dashboard to visualize performance](https://github.com/dask/dask-labextension)" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of CPUs: 4, number of threads: 2, number of workers: 2, processes: False\n" ] }, { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 2
  • \n", "
  • Cores: 4
  • \n", "
  • Memory: 34.36 GB
  • \n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client\n", "import multiprocessing\n", "ncpu = multiprocessing.cpu_count()\n", "processes = False\n", "nworker = 2\n", "threads = ncpu // nworker\n", "print(\n", " f\"Number of CPUs: {ncpu}, number of threads: {threads}, number of workers: {nworker}, processes: {processes}\",\n", ")\n", "client = Client(\n", " processes=processes,\n", " threads_per_worker=threads,\n", " n_workers=nworker,\n", " memory_limit=\"64GB\",\n", ")\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Synthetic data" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# generic\n", "ny, nx = 256, 220\n", "nl, ni, nm = 20, 12, 10\n", "init = xr.DataArray(np.random.random((nl, ni, nm, ny, nx)), dims=('lead', 'init', 'member', 'y', 'x'))\n", "init.name='var'\n", "init['init'] = np.arange(3000, 3300, 300 // ni)\n", "init['lead'] = np.arange(1,1+init.lead.size)\n", "control = xr.DataArray(np.random.random((300, ny, nx)),dims=('time', 'y', 'x'))\n", "control.name='var'\n", "control['time'] = np.arange(3000, 3300)\n", "\n", "pm = climpred.PerfectModelEnsemble(init).add_control(control)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### `verify()`\n", "\n", "{py:meth}`.PerfectModelEnsemble.verify`" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "kw = {'comparison':'m2e', 'metric':'rmse', 'dim':['init', 'member']}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### without `dask`" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 12 s, sys: 7.08 s, total: 19.1 s\n", "Wall time: 19.1 s\n" ] } ], "source": [ "%time s = pm.verify(**kw)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- 2 core Mac Book Pro 2018: CPU times: user 11.5 s, sys: 6.88 s, total: 18.4 s Wall time: 19.6 s\n", "- 24 core mistral node: CPU times: user 9.22 s, sys: 10.3 s, total: 19.6 s Wall time: 19.5 s" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### with `dask`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In order to use `dask` efficient, we need to chunk the data appropriately. Processing chunks of data lazily with `dask` creates a tiny overhead per dask, therefore chunking mostly makes sense when applying it to large data." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "> It is important that the data is chunked along a different dimension than `dim` passed to `verify()`!" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Bytes 1.08 GB 540.67 MB
Shape (20, 12, 10, 256, 220) (20, 12, 10, 128, 220)
Count 2 Tasks 2 Chunks
Type float64 numpy.ndarray
\n", "
\n", "\n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 12\n", " 20\n", "\n", "\n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 220\n", " 256\n", " 10\n", "\n", "
" ], "text/plain": [ "dask.array" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "chunked_dim = 'y'\n", "chunks = {chunked_dim:init[chunked_dim].size // nworker}\n", "pm_chunked = pm.chunk(chunks)\n", "# if memory allows\n", "# pm_chunked = pm_chunked.persist()\n", "pm_chunked.get_initialized()['var'].data" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 35.8 s, sys: 18.5 s, total: 54.3 s\n", "Wall time: 19.5 s\n" ] } ], "source": [ "%%time\n", "s_chunked = pm_chunked.verify(**kw)\n", "assert dask.is_dask_collection(s_chunked)\n", "s_chunked = s_chunked.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- 2 core Mac Book Pro 2018: CPU times: user 2min 35s, sys: 1min 4s, total: 3min 40s Wall time: 2min 10s\n", "- 24 core mistral node: CPU times: user 26.2 s, sys: 1min 37s, total: 2min 3s Wall time: 5.38 s" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "try:\n", " xr.testing.assert_allclose(s,s_chunked,atol=1e-6)\n", "except AssertionError:\n", " for v in s.data_vars:\n", " (s-s_chunked)[v].plot(robust=True, col='lead')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- The results `s` and `s_chunked` are identical as requested.\n", "- Chunking reduces Wall time from 20s to 5s on supercomputer." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### `bootstrap()`\n", "\n", "This speedup translates into {py:meth}`.PerfectModelEnsemble.bootstrap`, where bootstrapped resamplings of intializialized, uninitialized and persistence skill are computed and then translated into p values and confidence intervals." ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "kwp = kw.copy()\n", "kwp['iterations'] = 4" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### without `dask`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time s_p = pm.bootstrap(**kwp)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- 2 core Mac Book Pro 2018: CPU times: user 2min 3s, sys: 1min 22s, total: 3min 26s Wall time: 3min 43s\n", "- 24 core mistral node: CPU times: user 1min 51s, sys: 1min 54s, total: 3min 45s Wall time: 3min 25s" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### with `dask`\n", "\n", "When `ds` is chunked, {py:meth}`.PerfectModelEnsemble.bootstrap` performs all skill calculations on resampled inputs in parallel." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time s_p_chunked = pm_chunked.bootstrap(**kwp).compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- 2 core Mac Book Pro 2018: CPU times: user 2min 35s, sys: 1min 4s, total: 3min 40s Wall time: 2min 10s\n", "- 24 core mistral node: CPU times: user 2min 55s, sys: 8min 8s, total: 11min 3s Wall time: 1min 53s" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" }, "latex_envs": { "LaTeX_envs_menu_present": false, "autoclose": true, "autocomplete": true, "bibliofile": "large.bib", "cite_by": "apalike", "current_citInitial": 1, "eqLabelWithNumbers": false, "eqNumInitial": 1, "hotkeys": { "equation": "Ctrl-E", "itemize": "Ctrl-I" }, "labels_anchors": false, "latex_user_defs": false, "report_style_numbering": false, "user_envs_cfg": false }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {}, "toc_section_display": true, "toc_window_display": false }, "varInspector": { "cols": { "lenName": 16, "lenType": 16, "lenVar": 40 }, "kernels_config": { "python": { "delete_cmd_postfix": "", "delete_cmd_prefix": "del ", "library": "var_list.py", "varRefreshCmd": "print(var_dic_list())" }, "r": { "delete_cmd_postfix": ") ", "delete_cmd_prefix": "rm(", "library": "var_list.r", "varRefreshCmd": "cat(var_dic_list()) " } }, "types_to_exclude": [ "module", "function", "builtin_function_or_method", "instance", "_Feature" ], "window_display": false } }, "nbformat": 4, "nbformat_minor": 4 }