Source code for improver.ensemble_copula_coupling.utilities

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""
This module defines the utilities required for Ensemble Copula Coupling
plugins.

"""

import warnings
from typing import List, Optional, Union

import cf_units as unit
import iris
import numpy as np
from cf_units import Unit
from iris.cube import Cube
from numpy import ndarray

from improver import BasePlugin
from improver.ensemble_copula_coupling.constants import BOUNDS_FOR_ECDF
from improver.metadata.probabilistic import find_threshold_coordinate


[docs] def concatenate_2d_array_with_2d_array_endpoints( array_2d: ndarray, low_endpoint: float, high_endpoint: float ) -> ndarray: """ For a 2d array, add a 2d array as the lower and upper endpoints. The concatenation to add the lower and upper endpoints to the 2d array are performed along the second (index 1) dimension. Args: array_2d: 2d array of values low_endpoint: Number used to create a 2d array of a constant value as the lower endpoint. high_endpoint: Number of used to create a 2d array of a constant value as the upper endpoint. Returns: 2d array of values after padding with the low_endpoint and high_endpoint. """ if array_2d.ndim != 2: raise ValueError("Expected 2D input, got {}D input".format(array_2d.ndim)) lower_array = np.full((array_2d.shape[0], 1), low_endpoint, dtype=array_2d.dtype) upper_array = np.full((array_2d.shape[0], 1), high_endpoint, dtype=array_2d.dtype) array_2d = np.concatenate((lower_array, array_2d, upper_array), axis=1) return array_2d
[docs] class CalculatePercentilesFromIntensityDistribution(BasePlugin): """ Plugin to calculate percentiles at which provided intensity values would fall, according to a fitted probability distribution (currently only gamma). """
[docs] def __init__( self, distribution: str = "gamma", nan_mask_value: Optional[float] = 0.0, scale_percentiles_to_probability_lower_bound: bool = False, ): """ Initialise the plugin. Args: distribution: Type of distribution to fit (currently only 'gamma' is supported). nan_mask_value: Value to mask as NaN before calculating mean and std. This option might be most useful for a diagnostic, such as precipitation rate, where there is a high frequency of zero values. If None, no masking is performed. Default is 0.0. scale_percentiles_to_probability_lower_bound: If True, the minimum value of the calculated percentiles will be set to the minimum CDF probability implied by the input probabilities, rather than zero. This has the effect of restricting the percentiles to the non-zero part of the distribution, which is useful when there is a high probability of zero values (e.g., for precipitation). When False, percentiles are calculated over the full [0, 1] range, regardless of the input probabilities. Default is False. """ if distribution != "gamma": raise ValueError( f"Unrecognised distribution option '{distribution}'. " "The supported options are: 'gamma'." ) self.distribution = distribution self.nan_mask_value = nan_mask_value self.scale_percentiles_to_probability_lower_bound = ( scale_percentiles_to_probability_lower_bound )
[docs] @staticmethod def _scale_percentiles_to_probability_lower_bound( percentiles: np.ndarray, probabilities: np.ndarray, nan_mask_value: Optional[float] = 0.0, ): """Rescale percentiles based on provided probabilities, with optional NaN masking. The rescaling uses the min probability after converting probabilities to be "less than" a given threshold, rather than "greater than". This rescaling has the effect that when the percentiles are later used for sampling the probabilities, the percentiles will not sample below the min probability. This is helpful for distributions with a high frequency of zero values, where the probability of e.g. the precipitation being less than a particular value can be large. This therefore focuses the percentiles on the non-zero part of the distribution. Args: percentiles: Array of percentiles to be rescaled. probabilities: Array of probabilities for scaling. nan_mask_value: Value to mask as NaN before rescaling. If None, no masking is performed. Returns: Rescaled percentiles array. """ percentiles_nan = percentiles.copy() scaled_percentiles = percentiles.copy() cdf_probabilities = 1 - probabilities lower_limit = cdf_probabilities[0] upper_limit = np.ones(lower_limit.shape) if nan_mask_value is not None: percentiles_nan[percentiles_nan == nan_mask_value] = np.nan percentiles_nan[:, np.all(probabilities == 0, axis=0)] = np.nan scaled_percentiles_nan = ( (upper_limit - lower_limit) * percentiles_nan ) + lower_limit scaled_percentiles[~np.isnan(scaled_percentiles_nan)] = scaled_percentiles_nan[ ~np.isnan(scaled_percentiles_nan) ] return scaled_percentiles
[docs] def _calculate_percentiles_from_intensity_distribution( self, probability_cube: Cube, intensity_cube: Cube, ): """For each spatial location, calculate the percentiles at which the provided intensity values would fall, according to a fitted probability distribution. Args: probability_cube: Cube containing probability data at a range of thresholds. intensity_cube: Cube containing the intensity data to be mapped to percentiles. Returns: An array of percentiles (CDF values) for each intensity at each location. References: Scheuerer, M., and T. M. Hamill, 2015: Statistical Postprocessing of Ensemble Precipitation Forecasts by Fitting Censored, Shifted Gamma Distributions. Mon. Wea. Rev., 143, 4578–4596, https://doi.org/10.1175/MWR-D-15-0061.1. Wilks, D. S., 2019: Statistical Methods in the Atmospheric Sciences, Academic Press. """ from scipy.stats import gamma if intensity_cube.ndim < 2: raise ValueError( "Expected at least 2D input, got {}D input".format(intensity_cube.ndim) ) # Optionally mask a value as NaN before calculating mean and std cube_nan = intensity_cube.copy() if self.nan_mask_value is not None: cube_nan.data[cube_nan.data == self.nan_mask_value] = np.nan mean = np.nanmean(cube_nan.data, axis=0) std = np.nanstd(cube_nan.data, axis=0) # Avoid zero mean or std causing issues. absolute_thresholds = np.abs(find_threshold_coordinate(probability_cube).points) min_non_zero_threshold = np.min(absolute_thresholds[absolute_thresholds > 0]) tolerance = min_non_zero_threshold / 10.0 mean = np.where(mean > tolerance, mean, tolerance) std = np.where(std > tolerance, std, tolerance) # Calculate gamma distribution parameters from mean and std. Taken from # Wilks (2019) Section 4.4.5 and Scheuerer and Hamill (2015). These are # most accurate estimates when the shape parameter is large. shape_parameter = mean**2 / std**2 scale_parameter = std**2 / mean percentiles = gamma.cdf( intensity_cube.data, shape_parameter[None, ...], scale=scale_parameter[None, ...], ) if self.scale_percentiles_to_probability_lower_bound: percentiles = self._scale_percentiles_to_probability_lower_bound( percentiles, probability_cube.data, self.nan_mask_value ) percentiles = np.sort(percentiles, axis=0) return percentiles.astype(np.float32)
[docs] def process( self, probability_cube: Cube, intensity_cube: Cube, ) -> np.ndarray: """ Public interface to calculate percentiles from intensity distribution. Args: probability_cube: Cube containing probability data at a range of thresholds. intensity_cube: Cube containing the intensity data to be mapped to percentiles. Returns: A 3D array of percentiles (CDF values) for each intensity at each location. """ return self._calculate_percentiles_from_intensity_distribution( probability_cube, intensity_cube )
[docs] def choose_set_of_percentiles( no_of_percentiles: int, sampling: str = "quantile", probability_cube: Optional[Cube] = None, intensity_cube: Optional[Cube] = None, distribution: Optional[str] = "gamma", nan_mask_value: Optional[float] = 0.0, scale_percentiles_to_probability_lower_bound: bool = True, ) -> np.ndarray: """ Function to create percentiles. Args: no_of_percentiles: Number of percentiles. sampling: Type of sampling of the distribution to produce a set of percentiles e.g. quantile, random or transformation. Accepted options for sampling are: * Quantile: A regular set of equally-spaced percentiles aimed at dividing a Cumulative Distribution Function into blocks of equal probability. This is the default option. * Random: A random set of ordered percentiles. * Transformation: A set of percentiles generated by applying a transformation to the distribution. probability_cube: Cube containing probability data at a range of thresholds. intensity_cube: Cube containing the intensity data to be mapped to percentiles. distribution: Type of distribution to fit (currently only 'gamma' is supported). nan_mask_value: Value to mask as NaN before calculating mean and std. If None, no masking is performed. Default is 0.0. scale_percentiles_to_probability_lower_bound: If True, the minimum value of the calculated percentiles will be set to the minimum CDF probability implied by the input probabilities, rather than zero. This has the effect of restricting the percentiles to the non-zero part of the distribution, which is useful when there is a high probability of zero values (e.g., for precipitation). When False, percentiles are calculated over the full [0, 1] range, regardless of the input probabilities. Default is True. Returns: Percentiles calculated using the sampling technique specified as a numpy array. Raises: ValueError: if the sampling option is not one of the accepted options. References: For further details, Flowerdew, J., 2014. Calibrating ensemble reliability whilst preserving spatial structure. Tellus, Series A: Dynamic Meteorology and Oceanography, 66(1), pp.1-20. Schefzik, R., Thorarinsdottir, T.L. & Gneiting, T., 2013. Uncertainty Quantification in Complex Simulation Models Using Ensemble Copula Coupling. Statistical Science, 28(4), pp.616-640. """ if sampling in ["quantile"]: percentiles = np.linspace( 1 / float(1 + no_of_percentiles), no_of_percentiles / float(1 + no_of_percentiles), no_of_percentiles, ) elif sampling in ["random"]: percentiles = np.random.uniform( 1 / float(1 + no_of_percentiles), no_of_percentiles / float(1 + no_of_percentiles), no_of_percentiles, ) percentiles = np.sort(percentiles) elif sampling in ["transformation"]: percentiles = CalculatePercentilesFromIntensityDistribution( distribution=distribution, nan_mask_value=nan_mask_value, scale_percentiles_to_probability_lower_bound=scale_percentiles_to_probability_lower_bound, ).process(probability_cube, intensity_cube) else: msg = "Unrecognised sampling option '{}'".format(sampling) raise ValueError(msg) return percentiles * 100
[docs] def create_cube_with_percentiles( percentiles: Union[List[float], ndarray], template_cube: Cube, cube_data: ndarray, cube_unit: Optional[Union[Unit, str]] = None, ) -> Cube: """ Create a cube with a percentile coordinate based on a template cube. The resulting cube will have an extra percentile coordinate compared with the template cube. The shape of the cube_data should be the shape of the desired output cube. Args: percentiles: Ensemble percentiles. There should be the same number of percentiles as the first dimension of cube_data. template_cube: Cube to copy metadata from. cube_data: Data to insert into the template cube. The shape of the cube_data, excluding the dimension associated with the percentile coordinate, should be the same as the shape of template_cube. For example, template_cube shape is (3, 3, 3), whilst the cube_data is (10, 3, 3, 3), where there are 10 percentiles. cube_unit: The units of the data within the cube, if different from those of the template_cube. Returns: Cube containing a percentile coordinate as the leading dimension (or scalar percentile coordinate if single-valued) """ # create cube with new percentile dimension cubes = iris.cube.CubeList([]) for point in percentiles: cube = template_cube.copy() cube.add_aux_coord( iris.coords.AuxCoord( np.float32(point), long_name="percentile", units=unit.Unit("%") ) ) cubes.append(cube) result = cubes.merge_cube() # replace data and units result.data = cube_data if cube_unit is not None: result.units = cube_unit return result
[docs] def create_cube_with_percentile_index( indices: Union[List[int], ndarray], template_cube: Cube, cube_data: ndarray, cube_unit: Optional[Union[Unit, str]] = None, ) -> Cube: """ Create a cube with a percentile_index coordinate based on a template cube. The resulting cube will have an extra percentile_index coordinate compared with the template cube. The shape of the cube_data should be the shape of the desired output cube. Args: indices: Indices to use for the percentile_index coordinate. There should be the same number of indices as the first dimension of cube_data. template_cube: Cube to copy metadata from. cube_data: Data to insert into the template cube. The shape of the cube_data, excluding the dimension associated with the percentile_index coordinate, should be the same as the shape of template_cube. cube_unit: The units of the data within the cube, if different from those of the template_cube. Returns: Cube containing a percentile_index coordinate as the leading dimension (or scalar percentile_index coordinate if single-valued) """ cubes = iris.cube.CubeList([]) for idx in indices: cube = template_cube.copy() cube.add_aux_coord( iris.coords.AuxCoord( int(idx), long_name="percentile_index", units=unit.Unit("1") ) ) cubes.append(cube) result = cubes.merge_cube() result.data = cube_data if cube_unit is not None: result.units = cube_unit return result
[docs] def get_bounds_of_distribution(bounds_pairing_key: str, desired_units: Unit) -> ndarray: """ Gets the bounds of the distribution and converts the units of the bounds_pairing to the desired_units. This method gets the bounds values and units from the imported dictionaries: BOUNDS_FOR_ECDF and units_of_BOUNDS_FOR_ECDF. The units of the bounds are converted to be the desired units. Args: bounds_pairing_key: Name of key to be used for the BOUNDS_FOR_ECDF dictionary, in order to get the desired bounds_pairing. desired_units: Units to which the bounds_pairing will be converted. Returns: Lower and upper bound to be used as the ends of the empirical cumulative distribution function, converted to have the desired units. Raises: KeyError: If the bounds_pairing_key is not within the BOUNDS_FOR_ECDF dictionary. """ # Extract bounds from dictionary of constants. try: bounds_pairing = BOUNDS_FOR_ECDF[bounds_pairing_key].value bounds_pairing_units = BOUNDS_FOR_ECDF[bounds_pairing_key].units except KeyError as err: msg = ( "The bounds_pairing_key: {} is not recognised " "within BOUNDS_FOR_ECDF {}. \n" "Error: {}".format(bounds_pairing_key, BOUNDS_FOR_ECDF, err) ) raise KeyError(msg) bounds_pairing_units = unit.Unit(bounds_pairing_units) bounds_pairing = bounds_pairing_units.convert( np.array(bounds_pairing), desired_units ) return bounds_pairing
[docs] def insert_lower_and_upper_endpoint_to_1d_array( array_1d: ndarray, low_endpoint: float, high_endpoint: float ) -> ndarray: """ For a 1d array, add a lower and upper endpoint. Args: array_1d: 1d array of values low_endpoint: Number of use as the lower endpoint. high_endpoint: Number of use as the upper endpoint. Returns: 1d array of values padded with the low_endpoint and high_endpoint. """ if array_1d.ndim != 1: raise ValueError("Expected 1D input, got {}D input".format(array_1d.ndim)) lower_array = np.array([low_endpoint]) upper_array = np.array([high_endpoint]) array_1d = np.concatenate((lower_array, array_1d, upper_array)) if array_1d.dtype == np.float64: array_1d = array_1d.astype(np.float32) return array_1d
[docs] def restore_non_percentile_dimensions( array_to_reshape: ndarray, original_cube: Cube, n_percentiles: int ) -> ndarray: """ Reshape a 2d array, so that it has the dimensions of the original cube, whilst ensuring that the probabilistic dimension is the first dimension. Args: array_to_reshape: The array that requires reshaping. This has dimensions "percentiles" by "points", where "points" is a flattened array of all the other original dimensions that needs reshaping. original_cube: Cube slice containing the desired shape to be reshaped to, apart from the probabilistic dimension. This would typically be expected to be either [time, y, x] or [y, x]. n_percentiles: Length of the required probabilistic dimension ("percentiles"). Returns: The array after reshaping. Raises: ValueError: If the probabilistic dimension is not the first on the original_cube. CoordinateNotFoundError: If the input_probabilistic_dimension_name is not a coordinate on the original_cube. """ shape_to_reshape_to = list(original_cube.shape) if n_percentiles > 1: shape_to_reshape_to = [n_percentiles] + shape_to_reshape_to return array_to_reshape.reshape(shape_to_reshape_to)
[docs] def slow_interp_same_x(x: np.ndarray, xp: np.ndarray, fp: np.ndarray) -> np.ndarray: """For each row i of fp, calculate np.interp(x, xp, fp[i, :]). Note that a fast_interp_same_x version of this function exists that uses numba. This slow version is retained as a fallback for when numba is not available. Args: x: 1-D array xp: 1-D array, sorted in non-decreasing order fp: 2-D array with len(xp) columns Returns: 2-D array with shape (len(fp), len(x)), with each row i equal to np.interp(x, xp, fp[i, :]) """ result = np.empty((fp.shape[0], len(x)), np.float32) for i in range(fp.shape[0]): result[i, :] = np.interp(x, xp, fp[i, :]) return result
[docs] def interpolate_multiple_rows_same_x(*args): """For each row i of fp, do the equivalent of np.interp(x, xp, fp[i, :]). Calls a fast numba implementation where numba is available (see `improver.ensemble_copula_coupling.numba_utilities.fast_interp_same_y`) and calls a the native python implementation otherwise (see :func:`slow_interp_same_y`). Args: x: 1-D array xp: 1-D array, sorted in non-decreasing order fp: 2-D array with len(xp) columns Returns: 2-D array with shape (len(fp), len(x)), with each row i equal to np.interp(x, xp, fp[i, :]) """ try: import numba # noqa: F401 from improver.ensemble_copula_coupling.numba_utilities import fast_interp_same_x return fast_interp_same_x(*args) except ImportError: warnings.warn("Module numba unavailable. ResamplePercentiles will be slower.") return slow_interp_same_x(*args)
[docs] def slow_interp_same_y(x: np.ndarray, xp: np.ndarray, fp: np.ndarray) -> np.ndarray: """For each row i of xp, do the equivalent of np.interp(x, xp[i], fp). Note that a fast_interp_same_y version of this function exists that uses numba. This slow version is retained as a fallback for when numba is not available. Args: x: 1-d array xp: n * m array, each row must be in non-decreasing order fp: 1-d array with length m Returns: n * len(x) array where each row i is equal to np.interp(x, xp[i], fp) """ result = np.empty((xp.shape[0], len(x)), dtype=np.float32) for i in range(xp.shape[0]): result[i] = np.interp(x, xp[i, :], fp) return result
[docs] def slow_interp_same_y_2d(x: np.ndarray, xp: np.ndarray, fp: np.ndarray) -> np.ndarray: """For each row i of xp, do the equivalent of np.interp(x[i], xp[i], fp). Note that a fast_interp_same_y_2d version of this function exists that uses numba. This slow version is retained as a fallback for when numba is not available. Args: x: 2-d array, shape (n, k) xp: n * m array, each row must be in non-decreasing order fp: 1-d array with length m Returns: n * k array where each row i is equal to np.interp(x[i], xp[i], fp) """ result = np.empty(x.shape, dtype=np.float32) for i in range(x.shape[0]): result[i] = np.interp(x[i], xp[i], fp) return result
[docs] def interpolate_multiple_rows_same_y(*args): """For each row i of xp, do the equivalent of np.interp(x, xp[i], fp). Calls a fast numba implementation where numba is available (see `improver.ensemble_copula_coupling.numba_utilities.fast_interp_same_y`) and calls a the native python implementation otherwise (see :func:`slow_interp_same_y`). Args: x: 1-d array xp: n * m array, each row must be in non-decreasing order fp: 1-d array with length m Returns: n * len(x) array where each row i is equal to np.interp(x, xp[i], fp) """ try: import numba # noqa: F401 from improver.ensemble_copula_coupling.numba_utilities import ( fast_interp_same_y_nd, ) return fast_interp_same_y_nd(*args) except ImportError: warnings.warn( "Module numba unavailable. ConvertProbabilitiesToPercentiles will be slower." ) return slow_interp_same_y_nd(*args)
[docs] def slow_interp_same_y_nd(x: np.ndarray, xp: np.ndarray, fp: np.ndarray) -> np.ndarray: """Dispatch to functions that handle 1D or 2D x inputs. Note that a fast_interp_same_y_nd version of this function exists that uses numba. This slow version is retained as a fallback for when numba is not available. Args: x: 1-D or 2-D array xp: n * m array, each row must be in non-decreasing order fp: 1-D array with length m Returns: If x is 1-D, returns n * len(x) array where each row i is equal to np.interp(x, xp[i], fp). If x is 2-D, returns n * k array where each row i is equal to np.interp(x[i], xp[i], fp). Raises: ValueError: If x is not 1-D or 2-D. """ if x.ndim == 1: return slow_interp_same_y(x, xp, fp) if x.ndim == 2: return slow_interp_same_y_2d(x, xp, fp) raise ValueError("x must be 1D or 2D.")