Source code for climpred.stats

import warnings

import numpy as np
import xarray as xr
from xskillscore import pearson_r

try:
    from xrft import power_spectrum
except ImportError:
    power_spectrum = None
from .checks import is_xarray


def rm_poly(ds, dim="time", deg=2, **kwargs):
    """Remove degree polynomial along dimension dim from ds."""
    coefficients = ds.polyfit(dim, deg=deg, **kwargs)
    coord = ds[dim]
    fits = []
    if isinstance(ds, xr.Dataset):
        for v in coefficients:
            name = v.replace("_polyfit_coefficients", "")
            fit = xr.polyval(coord, coefficients[v]).rename(name)
            fits.append(fit)
        fits = xr.merge(fits)
    elif isinstance(ds, xr.DataArray):
        name = ds.name
        v = list(coefficients.data_vars)[0]
        fits = xr.polyval(coord, coefficients[v]).rename(name)
    ds_rm_poly = ds - fits
    return ds_rm_poly


def rm_trend(ds, dim="time", **kwargs):
    """Remove degree polynomial along dimension dim from ds."""
    return rm_poly(ds, dim=dim, deg=1, **kwargs)


[docs]@is_xarray(0) def decorrelation_time(da, iterations=20, dim="time"): """Calculate the decorrelaton time of a time series. .. math:: \\tau_{d} = 1 + 2 * \\sum_{k=1}^{r}(\\alpha_{k})^{k} Args: da (xarray object): input. iterations (optional int): Number of iterations to run the above formula. dim (optional str): Time dimension for xarray object. Returns: Decorrelation time of time series. Reference: * Storch, H. v, and Francis W. Zwiers. Statistical Analysis in Climate Research. Cambridge ; New York: Cambridge University Press, 1999., p.373 """ def _lag_corr(x, y, dim, lead): """Helper function to shift the two time series and correlate.""" N = x[dim].size normal = x.isel({dim: slice(0, N - lead)}) shifted = y.isel({dim: slice(0 + lead, N)}) # Align dimensions for xarray operation shifted[dim] = normal[dim] return pearson_r(normal, shifted, dim) one = xr.ones_like(da.isel({dim: 0})) one = one.where(da.isel({dim: 0}).notnull()) return one + 2 * xr.concat( [_lag_corr(da, da, dim=dim, lead=i) ** i for i in range(1, iterations)], "it" ).sum("it")
[docs]def dpp(ds, dim="time", m=10, chunk=True): """Calculates the Diagnostic Potential Predictability (dpp) .. math:: DPP_{\\mathrm{unbiased}}(m) = \\frac{\\sigma^{2}_{m} - \\frac{1}{m}\\cdot\\sigma^{2}}{\\sigma^{2}} Note: Resplandy et al. 2015 and Seferian et al. 2018 calculate unbiased DPP in a slightly different way: chunk=False. Args: ds (xr.DataArray): control simulation with time dimension as years. dim (str): dimension to apply DPP on. Default: time. m (optional int): separation time scale in years between predictable low-freq component and high-freq noise. chunk (optional boolean): Whether chunking is applied. Default: True. If False, then uses Resplandy 2015 / Seferian 2018 method. Returns: dpp (xr.DataArray): ds without time dimension. References: * Boer, G. J. “Long Time-Scale Potential Predictability in an Ensemble of Coupled Climate Models.” Climate Dynamics 23, no. 1 (August 1, 2004): 29–44. https://doi.org/10/csjjbh. * Resplandy, L., R. Séférian, and L. Bopp. “Natural Variability of CO2 and O2 Fluxes: What Can We Learn from Centuries-Long Climate Models Simulations?” Journal of Geophysical Research: Oceans 120, no. 1 (January 2015): 384–404. https://doi.org/10/f63c3h. * Séférian, Roland, Sarah Berthet, and Matthieu Chevallier. “Assessing the Decadal Predictability of Land and Ocean Carbon Uptake.” Geophysical Research Letters, March 15, 2018. https://doi.org/10/gdb424. """ def _chunking(ds, dim="time", number_chunks=False, chunk_length=False): """ Separate data into chunks and reshapes chunks in a c dimension. Specify either the number chunks or the length of chunks. Needed for dpp. Args: ds (xr.DataArray): control simulation with time dimension as years. dim (str): dimension to apply chunking to. Default: time chunk_length (int): see dpp(m) number_chunks (int): number of chunks in the return data. Returns: c (xr.DataArray): chunked ds, but with additional dimension c. """ if number_chunks and not chunk_length: chunk_length = np.floor(ds[dim].size / number_chunks) cmin = int(ds[dim].min()) elif not number_chunks and chunk_length: cmin = int(ds[dim].min()) number_chunks = int(np.floor(ds[dim].size / chunk_length)) else: raise KeyError("set number_chunks or chunk_length to True") c = ds.sel({dim: slice(cmin, cmin + chunk_length - 1)}) c = c.expand_dims("c") c["c"] = [0] for i in range(1, number_chunks): c2 = ds.sel( {dim: slice(cmin + chunk_length * i, cmin + (i + 1) * chunk_length - 1)} ) c2 = c2.expand_dims("c") c2["c"] = [i] c2[dim] = c[dim] c = xr.concat([c, c2], "c") return c if not chunk: # Resplandy 2015, Seferian 2018 s2v = ds.rolling({dim: m}).mean().var(dim) s2 = ds.var(dim) if chunk: # Boer 2004 ppvf # first chunk chunked_means = _chunking(ds, dim=dim, chunk_length=m).mean(dim) # sub means in chunks chunked_deviations = _chunking(ds, dim=dim, chunk_length=m) - chunked_means s2v = chunked_means.var("c") s2e = chunked_deviations.var([dim, "c"]) s2 = s2v + s2e dpp = (s2v - s2 / (m)) / s2 return dpp
[docs]@is_xarray(0) def varweighted_mean_period(da, dim="time", **kwargs): """Calculate the variance weighted mean period of time series based on xrft.power_spectrum. .. math:: P_{x} = \\frac{\\sum_k V(f_k,x)}{\\sum_k f_k \\cdot V(f_k,x)} Args: da (xarray object): input data including dim. dim (optional str): Name of time dimension. for **kwargs see xrft.power_spectrum Reference: * Branstator, Grant, and Haiyan Teng. “Two Limits of Initial-Value Decadal Predictability in a CGCM." Journal of Climate 23, no. 23 (August 27, 2010): 6292-6311. https://doi.org/10/bwq92h. See also: https://xrft.readthedocs.io/en/latest/api.html#xrft.xrft.power_spectrum """ if power_spectrum is None: raise ImportError( "xrft is not installed; see" "https://xrft.readthedocs.io/en/latest/installation.html" ) if isinstance(da, xr.Dataset): raise ValueError("require xr.DataArray, try xr.Dataset.map(func)") da = da.fillna(0.0) # dim should be list if isinstance(dim, str): dim = [dim] assert isinstance(dim, list) ps = power_spectrum(da, dim=dim, **kwargs) # take pos for d in dim: ps = ps.where(ps[f"freq_{d}"] > 0) # weighted average vwmp = ps for d in dim: vwmp = vwmp.sum(f"freq_{d}") / ((vwmp * vwmp[f"freq_{d}"]).sum(f"freq_{d}")) for d in dim: if f"freq_{d}_spacing" in vwmp.coords: del vwmp.coords[f"freq_{d}_spacing"] return vwmp