Source code for climpred.stats

"""Objects dealing with timeseries and ensemble statistics."""
import warnings

import numpy as np
import numpy.polynomial.polynomial as poly
import scipy.stats as ss
import xarray as xr
from xrft import power_spectrum
from xskillscore import pearson_r, pearson_r_p_value

from .checks import has_dims, is_xarray
from .utils import copy_coords_from_to


# ----------------------------------#
# TIME SERIES
# Functions related to time series.
# ----------------------------------#
[docs]@is_xarray([0, 1]) def corr(x, y, dim='time', lag=0, return_p=False): """Computes the Pearson product-moment coefficient of linear correlation. Note: This version calculates the effective degrees of freedom, accounting for autocorrelation within each time series that could fluff the significance of the correlation. Args: x (xarray object): Independent variable time series or grid of time series. y (xarray object): Dependent variable time series or grid of time series dim (optional str): Correlation dimension lag (optional int): Lag to apply to correlaton, with x predicting y. return_p (optional bool): If True, return correlation coefficients as well as p values. Returns: Pearson correlation coefficients If return_p True, associated p values. References: * Wilks, Daniel S. Statistical methods in the atmospheric sciences. Vol. 100. Academic press, 2011. * Lovenduski, Nicole S., and Nicolas Gruber. "Impact of the Southern Annular Mode on Southern Ocean circulation and biology." Geophysical Research Letters 32.11 (2005). """ if lag != 0: N = x[dim].size normal = x.isel({dim: slice(0, N - lag)}) shifted = y.isel({dim: slice(0 + lag, N)}) if dim not in list(x.coords): normal[dim] = np.arange(1, N) shifted[dim] = normal[dim] r = pearson_r(normal, shifted, dim) else: r = pearson_r(x, y, dim) if return_p: p = _eff_p_value(x, y, r, dim) # return with proper dimension labeling. would be easier with # apply_ufunc, but having trouble getting it to work here. issue # probably has to do with core dims. dimlist = list(r.dims) for i in range(len(dimlist)): p = p.rename({'dim_' + str(i): dimlist[i]}) return r, p else: return r
def _eff_p_value(x, y, r, dim): """Computes p values accounting for autocorrelation in time series. Args: x (xarray object): Independent time series. y (xarray object): Dependent time series. r (xarray object): Pearson correlations between x and y. dim (str): Dimension to compute compute p values over. Returns: p values accounting for autocorrelation in input time series. References: * Wilks, Daniel S. Statistical methods in the atmospheric sciences. Vol. 100. Academic press, 2011. """ def _compute_autocorr(v, dim, n): """ Return normal and shifted time series with equal dimensions so as not to throw an error. """ shifted = v.isel({dim: slice(1, n)}) normal = v.isel({dim: slice(0, n - 1)}) # see explanation in autocorr for this if dim not in list(v.coords): normal[dim] = np.arange(1, n) shifted[dim] = normal[dim] return pearson_r(shifted, normal, dim) n = x[dim].size # find autocorrelation xa, ya = x - x.mean(dim), y - y.mean(dim) xauto = _compute_autocorr(xa, dim, n) yauto = _compute_autocorr(ya, dim, n) # compute effective sample size n_eff = n * (1 - xauto * yauto) / (1 + xauto * yauto) n_eff = np.floor(n_eff) # constrain n_eff to be at maximum the total number of samples n_eff = n_eff.where(n_eff <= n, n) # compute t-statistic t = r * np.sqrt((n_eff - 2) / (1 - r ** 2)) p = xr.DataArray(ss.t.sf(np.abs(t), n_eff - 2) * 2) return p
[docs]@is_xarray(0) def rm_poly(ds, order, dim='time'): """Returns xarray object with nth-order fit removed. .. note:: This automatically performs a linear interpolation across any NaNs in the time series. Args: ds (xarray object): Time series to be detrended. order (int): Order of polynomial fit to be removed. dim (optional str): Dimension over which to remove the polynomial fit. Returns: xarray object with polynomial fit removed. """ # TODO: adapt for dask has_dims(ds, dim, 'dataset') # handle both datasets and dataarray if isinstance(ds, xr.Dataset): da = ds.to_array() return_ds = True else: da = ds.copy() return_ds = False da_dims_orig = list(da.dims) # orig -> original if len(da_dims_orig) > 1: # want independent axis to be the leading dimension da_dims_swap = da_dims_orig.copy() # copy to prevent contamination # https://stackoverflow.com/questions/1014523/ # simple-syntax-for-bringing-a-list-element-to-the-front-in-python da_dims_swap.insert(0, da_dims_swap.pop(da_dims_swap.index(dim))) da = da.transpose(*da_dims_swap) # hide other dims into a single dim da = da.stack({'other_dims': da_dims_swap[1:]}) dims_swapped = True else: dims_swapped = False # NaNs will make the polyfit fail--interpolate any NaNs in # the provided dim to prevent poor fit, while other dims' NaNs # will be filled with 0s; however, all NaNs will be replaced # in the final output nan_locs = np.isnan(da.values) # any(nan_locs.sum(axis=0)) fails if not 2D if nan_locs.ndim == 1: nan_locs = nan_locs.reshape(len(nan_locs), 1) nan_reshaped = True else: nan_reshaped = False # check if there's any NaNs in the provided dim because # interpolate_na is computationally expensive to run regardless of NaNs if any(nan_locs.sum(axis=0)) > 0: # Could do a check to see if there's any NaNs that aren't bookended. # [0, np.nan, 2], can interpolate. da = da.interpolate_na(dim) if any(nan_locs[0, :]): # [np.nan, 1, 2], no first value to interpolate from; back fill da = da.bfill(dim) if any(nan_locs[-1, :]): # [0, 1, np.nan], no last value to interpolate from; forward fill da = da.ffill(dim) # this handles the other axes; doesn't matter since it won't affect the fit da = da.fillna(0) # the actual operation of detrending y = da.values x = np.arange(0, len(y), 1) coefs = poly.polyfit(x, y, order) fit = poly.polyval(x, coefs) y_dt = y - fit.transpose() # dt -> detrended da.data[:] = y_dt # replace back the filled NaNs (keep values where not NaN) if nan_reshaped: nan_locs = nan_locs[:, 0] da = da.where(~nan_locs) if dims_swapped: # revert the other dimensions to its original form and ordering da = da.unstack('other_dims').transpose(*da_dims_orig) if return_ds: # revert back into a dataset return xr.merge( da.sel(variable=var).rename(var).drop('variable') for var in da['variable'].values ) else: return da
[docs]def rm_trend(da, dim='time'): """Remove linear trend from time series. Args: ds (xarray object): Time series to be detrended. dim (optional str): Dimension over which to remove the linear trend. Returns: xarray object with linear trend removed. """ return rm_poly(da, 1, dim=dim)
[docs]@is_xarray(0) def varweighted_mean_period(da, dim='time', **kwargs): """Calculate the variance weighted mean period of time series based on xrft.power_spectrum. .. math:: P_{x} = \\frac{\\sum_k V(f_k,x)}{\\sum_k f_k \\cdot V(f_k,x)} Args: da (xarray object): input data including dim. dim (optional str): Name of time dimension. for **kwargs see xrft.power_spectrum Reference: * Branstator, Grant, and Haiyan Teng. “Two Limits of Initial-Value Decadal Predictability in a CGCM." Journal of Climate 23, no. 23 (August 27, 2010): 6292-6311. https://doi.org/10/bwq92h. See also: https://xrft.readthedocs.io/en/latest/api.html#xrft.xrft.power_spectrum """ # set nans to 0 if isinstance(da, xr.Dataset): raise ValueError('require xr.Dataset') da = da.fillna(0.0) # dim should be list if isinstance(dim, str): dim = [dim] assert isinstance(dim, list) ps = power_spectrum(da, dim=dim, **kwargs) # take pos for d in dim: ps = ps.where(ps[f'freq_{d}'] > 0) # weighted average vwmp = ps for d in dim: vwmp = vwmp.sum(f'freq_{d}') / ((vwmp * vwmp[f'freq_{d}']).sum(f'freq_{d}')) for d in dim: del vwmp[f'freq_{d}_spacing'] # try to copy coords try: vwmp = copy_coords_from_to(da.drop(dim), vwmp) except ValueError: warnings.warn("Couldn't keep coords.") return vwmp
[docs]@is_xarray(0) def autocorr(ds, lag=1, dim='time', return_p=False): """Calculate the lagged correlation of time series. Args: ds (xarray object): Time series or grid of time series. lag (optional int): Number of time steps to lag correlate to. dim (optional str): Name of dimension to autocorrelate over. return_p (optional bool): If True, return correlation coefficients and p values. Returns: Pearson correlation coefficients. If return_p, also returns their associated p values. """ N = ds[dim].size normal = ds.isel({dim: slice(0, N - lag)}) shifted = ds.isel({dim: slice(0 + lag, N)}) """ xskillscore pearson_r looks for the dimensions to be matching, but we shifted them so they probably won't be. This solution doesn't work if the user provides a dataset without a coordinate for the main dimension, so we need to create a dummy dimension in that case. """ if dim not in list(ds.coords): normal[dim] = np.arange(1, N) shifted[dim] = normal[dim] r = pearson_r(normal, shifted, dim) if return_p: # NOTE: This assumes 2-tailed. Need to update eff_pearsonr # to utilize xskillscore's metrics but then compute own effective # p value with option for one-tailed. p = pearson_r_p_value(normal, shifted, dim) return r, p else: return r
[docs]@is_xarray(0) def decorrelation_time(da, r=20, dim='time'): """Calculate the decorrelaton time of a time series. .. math:: \\tau_{d} = 1 + 2 * \\sum_{k=1}^{r}(\\alpha_{k})^{k} Args: da (xarray object): Time series. r (optional int): Number of iterations to run the above formula. dim (optional str): Time dimension for xarray object. Returns: Decorrelation time of time series. Reference: * Storch, H. v, and Francis W. Zwiers. Statistical Analysis in Climate Research. Cambridge ; New York: Cambridge University Press, 1999., p.373 """ one = xr.ones_like(da.isel({dim: 0})) one = one.where(da.isel({dim: 0}).notnull()) return one + 2 * xr.concat( [autocorr(da, dim=dim, lag=i) ** i for i in range(1, r)], 'it' ).sum('it')
# --------------------------------------------# # Diagnostic Potential Predictability (DPP) # Functions related to DPP from Boer et al. # --------------------------------------------#
[docs]def dpp(ds, dim='time', m=10, chunk=True): """Calculates the Diagnostic Potential Predictability (dpp) .. math:: DPP_{\\mathrm{unbiased}}(m) = \\frac{\\sigma^{2}_{m} - \\frac{1}{m}\\cdot\\sigma^{2}}{\\sigma^{2}} Note: Resplandy et al. 2015 and Seferian et al. 2018 calculate unbiased DPP in a slightly different way: chunk=False. Args: ds (xr.DataArray): control simulation with time dimension as years. dim (str): dimension to apply DPP on. Default: time. m (optional int): separation time scale in years between predictable low-freq component and high-freq noise. chunk (optional boolean): Whether chunking is applied. Default: True. If False, then uses Resplandy 2015 / Seferian 2018 method. Returns: dpp (xr.DataArray): ds without time dimension. References: * Boer, G. J. “Long Time-Scale Potential Predictability in an Ensemble of Coupled Climate Models.” Climate Dynamics 23, no. 1 (August 1, 2004): 29–44. https://doi.org/10/csjjbh. * Resplandy, L., R. Séférian, and L. Bopp. “Natural Variability of CO2 and O2 Fluxes: What Can We Learn from Centuries-Long Climate Models Simulations?” Journal of Geophysical Research: Oceans 120, no. 1 (January 2015): 384–404. https://doi.org/10/f63c3h. * Séférian, Roland, Sarah Berthet, and Matthieu Chevallier. “Assessing the Decadal Predictability of Land and Ocean Carbon Uptake.” Geophysical Research Letters, March 15, 2018. https://doi.org/10/gdb424. """ def _chunking(ds, dim='time', number_chunks=False, chunk_length=False): """ Separate data into chunks and reshapes chunks in a c dimension. Specify either the number chunks or the length of chunks. Needed for dpp. Args: ds (xr.DataArray): control simulation with time dimension as years. dim (str): dimension to apply chunking to. Default: time chunk_length (int): see dpp(m) number_chunks (int): number of chunks in the return data. Returns: c (xr.DataArray): chunked ds, but with additional dimension c. """ if number_chunks and not chunk_length: chunk_length = np.floor(ds[dim].size / number_chunks) cmin = int(ds[dim].min()) elif not number_chunks and chunk_length: cmin = int(ds[dim].min()) number_chunks = int(np.floor(ds[dim].size / chunk_length)) else: raise KeyError('set number_chunks or chunk_length to True') c = ds.sel({dim: slice(cmin, cmin + chunk_length - 1)}) c = c.expand_dims('c') c['c'] = [0] for i in range(1, number_chunks): c2 = ds.sel( {dim: slice(cmin + chunk_length * i, cmin + (i + 1) * chunk_length - 1)} ) c2 = c2.expand_dims('c') c2['c'] = [i] c2[dim] = c[dim] c = xr.concat([c, c2], 'c') return c if not chunk: # Resplandy 2015, Seferian 2018 s2v = ds.rolling({dim: m}).mean().var(dim) s2 = ds.var(dim) if chunk: # Boer 2004 ppvf # first chunk chunked_means = _chunking(ds, dim=dim, chunk_length=m).mean(dim) # sub means in chunks chunked_deviations = _chunking(ds, dim=dim, chunk_length=m) - chunked_means s2v = chunked_means.var('c') s2e = chunked_deviations.var([dim, 'c']) s2 = s2v + s2e dpp = (s2v - s2 / (m)) / s2 return dpp