# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""
This module defines the utilities required for Ensemble Copula Coupling
plugins.
"""
import warnings
from typing import List, Optional, Union
import cf_units as unit
import iris
import numpy as np
from cf_units import Unit
from iris.cube import Cube
from numpy import ndarray
from improver import BasePlugin
from improver.ensemble_copula_coupling.constants import BOUNDS_FOR_ECDF
from improver.metadata.probabilistic import find_threshold_coordinate
[docs]
def concatenate_2d_array_with_2d_array_endpoints(
array_2d: ndarray, low_endpoint: float, high_endpoint: float
) -> ndarray:
"""
For a 2d array, add a 2d array as the lower and upper endpoints.
The concatenation to add the lower and upper endpoints to the 2d array
are performed along the second (index 1) dimension.
Args:
array_2d:
2d array of values
low_endpoint:
Number used to create a 2d array of a constant value
as the lower endpoint.
high_endpoint:
Number of used to create a 2d array of a constant value
as the upper endpoint.
Returns:
2d array of values after padding with the low_endpoint and
high_endpoint.
"""
if array_2d.ndim != 2:
raise ValueError("Expected 2D input, got {}D input".format(array_2d.ndim))
lower_array = np.full((array_2d.shape[0], 1), low_endpoint, dtype=array_2d.dtype)
upper_array = np.full((array_2d.shape[0], 1), high_endpoint, dtype=array_2d.dtype)
array_2d = np.concatenate((lower_array, array_2d, upper_array), axis=1)
return array_2d
[docs]
class CalculatePercentilesFromIntensityDistribution(BasePlugin):
"""
Plugin to calculate percentiles at which provided intensity values would fall,
according to a fitted probability distribution (currently only gamma).
"""
[docs]
def __init__(
self,
distribution: str = "gamma",
nan_mask_value: Optional[float] = 0.0,
scale_percentiles_to_probability_lower_bound: bool = False,
):
"""
Initialise the plugin.
Args:
distribution: Type of distribution to fit
(currently only 'gamma' is supported).
nan_mask_value: Value to mask as NaN before calculating mean and std.
This option might be most useful for a diagnostic, such as
precipitation rate, where there is a high frequency of zero values.
If None, no masking is performed. Default is 0.0.
scale_percentiles_to_probability_lower_bound: If True, the minimum value
of the calculated percentiles will be set to the minimum CDF
probability implied by the input probabilities, rather than zero.
This has the effect of restricting the percentiles to the non-zero
part of the distribution, which is useful when there is a high
probability of zero values (e.g., for precipitation). When False,
percentiles are calculated over the full [0, 1] range, regardless
of the input probabilities. Default is False.
"""
if distribution != "gamma":
raise ValueError(
f"Unrecognised distribution option '{distribution}'. "
"The supported options are: 'gamma'."
)
self.distribution = distribution
self.nan_mask_value = nan_mask_value
self.scale_percentiles_to_probability_lower_bound = (
scale_percentiles_to_probability_lower_bound
)
[docs]
@staticmethod
def _scale_percentiles_to_probability_lower_bound(
percentiles: np.ndarray,
probabilities: np.ndarray,
nan_mask_value: Optional[float] = 0.0,
):
"""Rescale percentiles based on provided probabilities, with optional NaN
masking. The rescaling uses the min probability after converting probabilities
to be "less than" a given threshold, rather than "greater than".
This rescaling has the effect that when the percentiles are later used for
sampling the probabilities, the percentiles will not sample below the min
probability. This is helpful for distributions with a high frequency of zero
values, where the probability of e.g. the precipitation being less than a
particular value can be large. This therefore focuses the percentiles on the
non-zero part of the distribution.
Args:
percentiles: Array of percentiles to be rescaled.
probabilities: Array of probabilities for scaling.
nan_mask_value: Value to mask as NaN before rescaling. If None,
no masking is performed.
Returns:
Rescaled percentiles array.
"""
percentiles_nan = percentiles.copy()
scaled_percentiles = percentiles.copy()
cdf_probabilities = 1 - probabilities
lower_limit = cdf_probabilities[0]
upper_limit = np.ones(lower_limit.shape)
if nan_mask_value is not None:
percentiles_nan[percentiles_nan == nan_mask_value] = np.nan
percentiles_nan[:, np.all(probabilities == 0, axis=0)] = np.nan
scaled_percentiles_nan = (
(upper_limit - lower_limit) * percentiles_nan
) + lower_limit
scaled_percentiles[~np.isnan(scaled_percentiles_nan)] = scaled_percentiles_nan[
~np.isnan(scaled_percentiles_nan)
]
return scaled_percentiles
[docs]
def _calculate_percentiles_from_intensity_distribution(
self,
probability_cube: Cube,
intensity_cube: Cube,
):
"""For each spatial location, calculate the percentiles at which the provided
intensity values would fall, according to a fitted probability distribution.
Args:
probability_cube: Cube containing probability data at a range of thresholds.
intensity_cube: Cube containing the intensity data to be mapped to percentiles.
Returns:
An array of percentiles (CDF values) for each intensity at each location.
References:
Scheuerer, M., and T. M. Hamill, 2015: Statistical Postprocessing of
Ensemble Precipitation Forecasts by Fitting Censored, Shifted Gamma
Distributions. Mon. Wea. Rev., 143, 4578–4596,
https://doi.org/10.1175/MWR-D-15-0061.1.
Wilks, D. S., 2019: Statistical Methods in the Atmospheric Sciences,
Academic Press.
"""
from scipy.stats import gamma
if intensity_cube.ndim < 2:
raise ValueError(
"Expected at least 2D input, got {}D input".format(intensity_cube.ndim)
)
# Optionally mask a value as NaN before calculating mean and std
cube_nan = intensity_cube.copy()
if self.nan_mask_value is not None:
cube_nan.data[cube_nan.data == self.nan_mask_value] = np.nan
mean = np.nanmean(cube_nan.data, axis=0)
std = np.nanstd(cube_nan.data, axis=0)
# Avoid zero mean or std causing issues.
absolute_thresholds = np.abs(find_threshold_coordinate(probability_cube).points)
min_non_zero_threshold = np.min(absolute_thresholds[absolute_thresholds > 0])
tolerance = min_non_zero_threshold / 10.0
mean = np.where(mean > tolerance, mean, tolerance)
std = np.where(std > tolerance, std, tolerance)
# Calculate gamma distribution parameters from mean and std. Taken from
# Wilks (2019) Section 4.4.5 and Scheuerer and Hamill (2015). These are
# most accurate estimates when the shape parameter is large.
shape_parameter = mean**2 / std**2
scale_parameter = std**2 / mean
percentiles = gamma.cdf(
intensity_cube.data,
shape_parameter[None, ...],
scale=scale_parameter[None, ...],
)
if self.scale_percentiles_to_probability_lower_bound:
percentiles = self._scale_percentiles_to_probability_lower_bound(
percentiles, probability_cube.data, self.nan_mask_value
)
percentiles = np.sort(percentiles, axis=0)
return percentiles.astype(np.float32)
[docs]
def process(
self,
probability_cube: Cube,
intensity_cube: Cube,
) -> np.ndarray:
"""
Public interface to calculate percentiles from intensity distribution.
Args:
probability_cube: Cube containing probability data at a range of thresholds.
intensity_cube: Cube containing the intensity data to be mapped to percentiles.
Returns:
A 3D array of percentiles (CDF values) for each intensity at each location.
"""
return self._calculate_percentiles_from_intensity_distribution(
probability_cube, intensity_cube
)
[docs]
def choose_set_of_percentiles(
no_of_percentiles: int,
sampling: str = "quantile",
probability_cube: Optional[Cube] = None,
intensity_cube: Optional[Cube] = None,
distribution: Optional[str] = "gamma",
nan_mask_value: Optional[float] = 0.0,
scale_percentiles_to_probability_lower_bound: bool = True,
) -> np.ndarray:
"""
Function to create percentiles.
Args:
no_of_percentiles:
Number of percentiles.
sampling:
Type of sampling of the distribution to produce a set of
percentiles e.g. quantile, random or transformation.
Accepted options for sampling are:
* Quantile: A regular set of equally-spaced percentiles aimed
at dividing a Cumulative Distribution Function into
blocks of equal probability. This is the default option.
* Random: A random set of ordered percentiles.
* Transformation: A set of percentiles generated by applying a
transformation to the distribution.
probability_cube: Cube containing probability data at a range of thresholds.
intensity_cube: Cube containing the intensity data to be mapped to percentiles.
distribution: Type of distribution to fit (currently only 'gamma' is supported).
nan_mask_value: Value to mask as NaN before calculating mean and std.
If None, no masking is performed. Default is 0.0.
scale_percentiles_to_probability_lower_bound: If True, the minimum value
of the calculated percentiles will be set to the minimum CDF
probability implied by the input probabilities, rather than zero.
This has the effect of restricting the percentiles to the non-zero
part of the distribution, which is useful when there is a high
probability of zero values (e.g., for precipitation). When False,
percentiles are calculated over the full [0, 1] range, regardless
of the input probabilities. Default is True.
Returns:
Percentiles calculated using the sampling technique specified as a numpy array.
Raises:
ValueError: if the sampling option is not one of the accepted options.
References:
For further details, Flowerdew, J., 2014.
Calibrating ensemble reliability whilst preserving spatial structure.
Tellus, Series A: Dynamic Meteorology and Oceanography, 66(1), pp.1-20.
Schefzik, R., Thorarinsdottir, T.L. & Gneiting, T., 2013.
Uncertainty Quantification in Complex Simulation Models Using Ensemble
Copula Coupling. Statistical Science, 28(4), pp.616-640.
"""
if sampling in ["quantile"]:
percentiles = np.linspace(
1 / float(1 + no_of_percentiles),
no_of_percentiles / float(1 + no_of_percentiles),
no_of_percentiles,
)
elif sampling in ["random"]:
percentiles = np.random.uniform(
1 / float(1 + no_of_percentiles),
no_of_percentiles / float(1 + no_of_percentiles),
no_of_percentiles,
)
percentiles = np.sort(percentiles)
elif sampling in ["transformation"]:
percentiles = CalculatePercentilesFromIntensityDistribution(
distribution=distribution,
nan_mask_value=nan_mask_value,
scale_percentiles_to_probability_lower_bound=scale_percentiles_to_probability_lower_bound,
).process(probability_cube, intensity_cube)
else:
msg = "Unrecognised sampling option '{}'".format(sampling)
raise ValueError(msg)
return percentiles * 100
[docs]
def create_cube_with_percentiles(
percentiles: Union[List[float], ndarray],
template_cube: Cube,
cube_data: ndarray,
cube_unit: Optional[Union[Unit, str]] = None,
) -> Cube:
"""
Create a cube with a percentile coordinate based on a template cube.
The resulting cube will have an extra percentile coordinate compared with
the template cube. The shape of the cube_data should be the shape of the
desired output cube.
Args:
percentiles:
Ensemble percentiles. There should be the same number of
percentiles as the first dimension of cube_data.
template_cube:
Cube to copy metadata from.
cube_data:
Data to insert into the template cube.
The shape of the cube_data, excluding the dimension associated with
the percentile coordinate, should be the same as the shape of
template_cube.
For example, template_cube shape is (3, 3, 3), whilst the cube_data
is (10, 3, 3, 3), where there are 10 percentiles.
cube_unit:
The units of the data within the cube, if different from those of
the template_cube.
Returns:
Cube containing a percentile coordinate as the leading dimension (or
scalar percentile coordinate if single-valued)
"""
# create cube with new percentile dimension
cubes = iris.cube.CubeList([])
for point in percentiles:
cube = template_cube.copy()
cube.add_aux_coord(
iris.coords.AuxCoord(
np.float32(point), long_name="percentile", units=unit.Unit("%")
)
)
cubes.append(cube)
result = cubes.merge_cube()
# replace data and units
result.data = cube_data
if cube_unit is not None:
result.units = cube_unit
return result
[docs]
def create_cube_with_percentile_index(
indices: Union[List[int], ndarray],
template_cube: Cube,
cube_data: ndarray,
cube_unit: Optional[Union[Unit, str]] = None,
) -> Cube:
"""
Create a cube with a percentile_index coordinate based on a template cube.
The resulting cube will have an extra percentile_index coordinate compared with
the template cube. The shape of the cube_data should be the shape of the
desired output cube.
Args:
indices:
Indices to use for the percentile_index coordinate. There should be the same
number of indices as the first dimension of cube_data.
template_cube:
Cube to copy metadata from.
cube_data:
Data to insert into the template cube.
The shape of the cube_data, excluding the dimension associated with
the percentile_index coordinate, should be the same as the shape of
template_cube.
cube_unit:
The units of the data within the cube, if different from those of
the template_cube.
Returns:
Cube containing a percentile_index coordinate as the leading dimension (or
scalar percentile_index coordinate if single-valued)
"""
cubes = iris.cube.CubeList([])
for idx in indices:
cube = template_cube.copy()
cube.add_aux_coord(
iris.coords.AuxCoord(
int(idx), long_name="percentile_index", units=unit.Unit("1")
)
)
cubes.append(cube)
result = cubes.merge_cube()
result.data = cube_data
if cube_unit is not None:
result.units = cube_unit
return result
[docs]
def get_bounds_of_distribution(bounds_pairing_key: str, desired_units: Unit) -> ndarray:
"""
Gets the bounds of the distribution and converts the units of the
bounds_pairing to the desired_units.
This method gets the bounds values and units from the imported
dictionaries: BOUNDS_FOR_ECDF and units_of_BOUNDS_FOR_ECDF.
The units of the bounds are converted to be the desired units.
Args:
bounds_pairing_key:
Name of key to be used for the BOUNDS_FOR_ECDF dictionary, in order
to get the desired bounds_pairing.
desired_units:
Units to which the bounds_pairing will be converted.
Returns:
Lower and upper bound to be used as the ends of the
empirical cumulative distribution function, converted to have
the desired units.
Raises:
KeyError: If the bounds_pairing_key is not within the BOUNDS_FOR_ECDF
dictionary.
"""
# Extract bounds from dictionary of constants.
try:
bounds_pairing = BOUNDS_FOR_ECDF[bounds_pairing_key].value
bounds_pairing_units = BOUNDS_FOR_ECDF[bounds_pairing_key].units
except KeyError as err:
msg = (
"The bounds_pairing_key: {} is not recognised "
"within BOUNDS_FOR_ECDF {}. \n"
"Error: {}".format(bounds_pairing_key, BOUNDS_FOR_ECDF, err)
)
raise KeyError(msg)
bounds_pairing_units = unit.Unit(bounds_pairing_units)
bounds_pairing = bounds_pairing_units.convert(
np.array(bounds_pairing), desired_units
)
return bounds_pairing
[docs]
def insert_lower_and_upper_endpoint_to_1d_array(
array_1d: ndarray, low_endpoint: float, high_endpoint: float
) -> ndarray:
"""
For a 1d array, add a lower and upper endpoint.
Args:
array_1d:
1d array of values
low_endpoint:
Number of use as the lower endpoint.
high_endpoint:
Number of use as the upper endpoint.
Returns:
1d array of values padded with the low_endpoint and high_endpoint.
"""
if array_1d.ndim != 1:
raise ValueError("Expected 1D input, got {}D input".format(array_1d.ndim))
lower_array = np.array([low_endpoint])
upper_array = np.array([high_endpoint])
array_1d = np.concatenate((lower_array, array_1d, upper_array))
if array_1d.dtype == np.float64:
array_1d = array_1d.astype(np.float32)
return array_1d
[docs]
def restore_non_percentile_dimensions(
array_to_reshape: ndarray, original_cube: Cube, n_percentiles: int
) -> ndarray:
"""
Reshape a 2d array, so that it has the dimensions of the original cube,
whilst ensuring that the probabilistic dimension is the first dimension.
Args:
array_to_reshape:
The array that requires reshaping. This has dimensions "percentiles"
by "points", where "points" is a flattened array of all the other
original dimensions that needs reshaping.
original_cube:
Cube slice containing the desired shape to be reshaped to, apart from
the probabilistic dimension. This would typically be expected to be
either [time, y, x] or [y, x].
n_percentiles:
Length of the required probabilistic dimension ("percentiles").
Returns:
The array after reshaping.
Raises:
ValueError: If the probabilistic dimension is not the first on the
original_cube.
CoordinateNotFoundError: If the input_probabilistic_dimension_name is
not a coordinate on the original_cube.
"""
shape_to_reshape_to = list(original_cube.shape)
if n_percentiles > 1:
shape_to_reshape_to = [n_percentiles] + shape_to_reshape_to
return array_to_reshape.reshape(shape_to_reshape_to)
[docs]
def slow_interp_same_x(x: np.ndarray, xp: np.ndarray, fp: np.ndarray) -> np.ndarray:
"""For each row i of fp, calculate np.interp(x, xp, fp[i, :]). Note that a
fast_interp_same_x version of this function exists that uses numba. This
slow version is retained as a fallback for when numba is not available.
Args:
x: 1-D array
xp: 1-D array, sorted in non-decreasing order
fp: 2-D array with len(xp) columns
Returns:
2-D array with shape (len(fp), len(x)), with each row i equal to
np.interp(x, xp, fp[i, :])
"""
result = np.empty((fp.shape[0], len(x)), np.float32)
for i in range(fp.shape[0]):
result[i, :] = np.interp(x, xp, fp[i, :])
return result
[docs]
def interpolate_multiple_rows_same_x(*args):
"""For each row i of fp, do the equivalent of np.interp(x, xp, fp[i, :]).
Calls a fast numba implementation where numba is available (see
`improver.ensemble_copula_coupling.numba_utilities.fast_interp_same_y`) and calls a
the native python implementation otherwise (see :func:`slow_interp_same_y`).
Args:
x: 1-D array
xp: 1-D array, sorted in non-decreasing order
fp: 2-D array with len(xp) columns
Returns:
2-D array with shape (len(fp), len(x)), with each row i equal to
np.interp(x, xp, fp[i, :])
"""
try:
import numba # noqa: F401
from improver.ensemble_copula_coupling.numba_utilities import fast_interp_same_x
return fast_interp_same_x(*args)
except ImportError:
warnings.warn("Module numba unavailable. ResamplePercentiles will be slower.")
return slow_interp_same_x(*args)
[docs]
def slow_interp_same_y(x: np.ndarray, xp: np.ndarray, fp: np.ndarray) -> np.ndarray:
"""For each row i of xp, do the equivalent of np.interp(x, xp[i], fp). Note that a
fast_interp_same_y version of this function exists that uses numba. This
slow version is retained as a fallback for when numba is not available.
Args:
x: 1-d array
xp: n * m array, each row must be in non-decreasing order
fp: 1-d array with length m
Returns:
n * len(x) array where each row i is equal to np.interp(x, xp[i], fp)
"""
result = np.empty((xp.shape[0], len(x)), dtype=np.float32)
for i in range(xp.shape[0]):
result[i] = np.interp(x, xp[i, :], fp)
return result
[docs]
def slow_interp_same_y_2d(x: np.ndarray, xp: np.ndarray, fp: np.ndarray) -> np.ndarray:
"""For each row i of xp, do the equivalent of np.interp(x[i], xp[i], fp).
Note that a fast_interp_same_y_2d version of this function exists that uses numba.
This slow version is retained as a fallback for when numba is not available.
Args:
x: 2-d array, shape (n, k)
xp: n * m array, each row must be in non-decreasing order
fp: 1-d array with length m
Returns:
n * k array where each row i is equal to np.interp(x[i], xp[i], fp)
"""
result = np.empty(x.shape, dtype=np.float32)
for i in range(x.shape[0]):
result[i] = np.interp(x[i], xp[i], fp)
return result
[docs]
def interpolate_multiple_rows_same_y(*args):
"""For each row i of xp, do the equivalent of np.interp(x, xp[i], fp).
Calls a fast numba implementation where numba is available (see
`improver.ensemble_copula_coupling.numba_utilities.fast_interp_same_y`) and calls a
the native python implementation otherwise (see :func:`slow_interp_same_y`).
Args:
x: 1-d array
xp: n * m array, each row must be in non-decreasing order
fp: 1-d array with length m
Returns:
n * len(x) array where each row i is equal to np.interp(x, xp[i], fp)
"""
try:
import numba # noqa: F401
from improver.ensemble_copula_coupling.numba_utilities import (
fast_interp_same_y_nd,
)
return fast_interp_same_y_nd(*args)
except ImportError:
warnings.warn(
"Module numba unavailable. ConvertProbabilitiesToPercentiles will be slower."
)
return slow_interp_same_y_nd(*args)
[docs]
def slow_interp_same_y_nd(x: np.ndarray, xp: np.ndarray, fp: np.ndarray) -> np.ndarray:
"""Dispatch to functions that handle 1D or 2D x inputs.
Note that a fast_interp_same_y_nd version of this function exists that uses numba.
This slow version is retained as a fallback for when numba is not available.
Args:
x: 1-D or 2-D array
xp: n * m array, each row must be in non-decreasing order
fp: 1-D array with length m
Returns:
If x is 1-D, returns n * len(x) array where each row i is equal to
np.interp(x, xp[i], fp).
If x is 2-D, returns n * k array where each row i is equal to
np.interp(x[i], xp[i], fp).
Raises:
ValueError: If x is not 1-D or 2-D.
"""
if x.ndim == 1:
return slow_interp_same_y(x, xp, fp)
if x.ndim == 2:
return slow_interp_same_y_2d(x, xp, fp)
raise ValueError("x must be 1D or 2D.")