Source code for improver.calibration.samos_calibration

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""
This module defines all the "plugins" specific to Standardised Anomaly Model Output
Statistics (SAMOS).
"""

import datetime
import warnings
from typing import Dict, List, Optional, Sequence, Tuple

import iris
import iris.pandas
import numpy as np
import pandas as pd

from improver.utilities.statistical import (
    DistributionalParameters,
)

try:
    import pygam
except ModuleNotFoundError:
    # Define dummy class to avoid type hint errors.
[docs] class pygam:
[docs] def GAM(self): pass
from iris.cube import Cube, CubeList from improver import BasePlugin, PostProcessingPlugin from improver.calibration import add_static_feature_from_cube_to_df from improver.calibration.emos_calibration import ( ApplyEMOS, EstimateCoefficientsForEnsembleCalibration, convert_to_realizations, generate_forecast_from_distribution, get_attribute_from_coefficients, get_forecast_type, ) from improver.ensemble_copula_coupling.utilities import get_bounds_of_distribution from improver.metadata.probabilistic import find_threshold_coordinate from improver.utilities.cube_manipulation import ( MergeCubes, collapse_realizations, collapse_time, ) from improver.utilities.generalized_additive_models import GAMFit, GAMPredict from improver.utilities.mathematical_operations import CalculateClimateAnomalies # Setting to allow cubes with more than 2 dimensions to be converted to/from dataframes. iris.FUTURE.pandas_ndim = True # Dictionary mapping PyGAM distribution names to those used internally in Scipy. PYGAM_DISTRIBUTION_MAPPING = { "normal": "norm", "binomial": "binom", "poisson": "poisson", "gamma": "gamma", "inv_gauss": "invgauss", }
[docs] def prepare_data_for_gam( input_cube: Cube, additional_fields: Optional[CubeList] = None, unique_site_id_key: Optional[str] = None, ) -> pd.DataFrame: """ Convert input cubes in to a single, combined dataframe. Each of the input cubes is converted to a pandas dataframe. The dataframe derived from input_cube then forms the left in a series of left dataframe joins with those derived from each cube in additional_fields. The x and y coordinates are used to perform this join. This means that the resulting combined dataframe will contain all of the sites/grid points in input_cube, but not any other sites/grid points in the additional_fields cubes. Args: input_cube: A cube of forecast or observation data. additional_fields: Additional cubes with points which can be matched with points in input_cube by matching spatial coordinate values. unique_site_id_key: If working with spot data and available, the name of the coordinate in the input cubes that contains unique site IDs, e.g. "wmo_id" if all sites have a valid wmo_id. Returns: A pandas dataframe with rows equal to the number of sites/grid points in input_cube and containing the following columns: 1. A column with the same name as input_cube containing the original cube data 2. A series of columns derived from the input_cube dimension coordinates 3. A series of columns associated with any auxiliary coordinates (scalar or otherwise) of input_cube 4. One column associated with each of the cubes in additional cubes, with column names matching the associated cube """ df = iris.pandas.as_data_frame( input_cube, add_aux_coords=True, add_cell_measures=True, add_ancillary_variables=True, ) df.reset_index(inplace=True) if additional_fields: # Check if we are dealing with spot data. site_data = "spot_index" in [c.name() for c in input_cube.coords()] # For site data we should use unique IDs wherever possible. As a # fallback we can match on latitude, longitude and altitude. We # need altitude to accommodate e.g. sites with a lower and upper # forecast altitude. If we are working with gridded data we use the # spatial coordinates of the input cube. if site_data and unique_site_id_key is not None: match_coords = [unique_site_id_key] elif site_data: match_coords = ["latitude", "longitude", "altitude"] else: match_coords = [ input_cube.coord(axis="X").name(), input_cube.coord(axis="Y").name(), ] for cube in additional_fields: df = add_static_feature_from_cube_to_df( df, cube, feature_name=cube.name(), possible_merge_columns=match_coords, float_decimals=4, ) return df
[docs] def convert_dataframe_to_cube( df: pd.DataFrame, template_cube: Cube, ) -> Cube: """Function to convert a Pandas dataframe to Iris cube format by using a template cube. The input template_cube provides all metadata for the output. Args: df: A Pandas dataframe which must contain at least the following columns: 1. A column matching the name of template_cube 2. A series of columns with names which match the dimension coordinates on template_cube. The data in these columns should match the points on the corresponding dimension of template_cube. template_cube: A cube which will provide all metadata for the output cube Returns: A copy of template_cube containing data from df. """ dim_coords = [c.name() for c in template_cube.coords(dim_coords=True)] diagnostic = template_cube.name() indexed_df = df.set_index(dim_coords, inplace=False) indexed_df.sort_index(inplace=True) # The as_cubes() function returns a cubelist. In this case, the cubelist contains # only one element. converted_cube = iris.pandas.as_cubes(indexed_df[[diagnostic]])[0] result = template_cube.copy(data=converted_cube.data) return result
[docs] def get_climatological_stats( input_cube: Cube, gams: List, gam_features: List[str], additional_cubes: Optional[CubeList], sd_clip: float = 0.25, unique_site_id_key: Optional[str] = None, constant_extrapolation: bool = False, ) -> Tuple[Cube, Cube]: """Function to predict climatological means and standard deviations given fitted GAMs for each statistic and cubes which can be used to construct a dataframe containing all required features for those GAMs. Args: input_cube gams: A list containing two fitted GAMs, the first for predicting the climatological mean of the locations in input_cube and the second predicting the climatological standard deviation. gam_features: The list of features. These must be either coordinates on input_cube or share a name with a cube in additional_cubes. The index of each feature should match the indices used in model_specification. additional_cubes: Additional fields to use as supplementary predictors. sd_clip: The minimum standard deviation value to allow when predicting from the GAM. Any predictions below this value will be set to this value. unique_site_id_key: If working with spot data and available, the name of the coordinate in the input cubes that contains unique site IDs, e.g. "wmo_id" if all sites have a valid wmo_id. constant_extrapolation: If True, predictor values outside the range of those used to fit the GAM will be predicted using constant extrapolation (i.e. the nearest boundary value). If False, extrapolation extends the trend of each GAM term beyond the range of the training data. Default is False. Returns: A pair of cubes containing climatological mean and climatological standard deviation predictions respectively. """ diagnostic = input_cube.name() df = prepare_data_for_gam( input_cube, additional_cubes, unique_site_id_key=unique_site_id_key ) # Calculate climatological means and standard deviations using previously # fitted GAMs. mean_pred = GAMPredict(constant_extrapolation=constant_extrapolation).process( gams[0], np.array(df[gam_features]) ) sd_pred = GAMPredict(constant_extrapolation=constant_extrapolation).process( gams[1], np.array(df[gam_features]) ) # Convert means and standard deviations into cubes df[diagnostic] = mean_pred mean_cube = convert_dataframe_to_cube(df, input_cube) df[diagnostic] = sd_pred sd_cube = convert_dataframe_to_cube(df, input_cube) sd_cube.data = np.clip(sd_cube.data, a_min=sd_clip, a_max=None) return mean_cube, sd_cube
[docs] class TrainGAMsForSAMOS(BasePlugin): """ Class for fitting Generalised Additive Models (GAMs) to training data for use in a Standardised Anomaly Model Output Statistics (SAMOS) calibration scheme. Two GAMs are trained: one modelling the mean of the training data and one modelling the standard deviation. These can then be used to convert forecasts or observations to climatological anomalies. This plugin should be run separately for forecast and observation data. """
[docs] def __init__( self, model_specification: list[list[str], list[int], dict], max_iter: int = 100, tol: float = 0.0001, distribution: str = "normal", link: str = "identity", fit_intercept: bool = True, window_length: int = 10, required_rolling_window_points: int = 5, trailing_window: bool = False, unique_site_id_key: Optional[str] = None, ): """ Initialize the class. Args: model_specification: A list of lists which each contain three items (in order): 1. a string containing a single pyGAM term; one of 'linear', 'spline', 'tensor', or 'factor'. 2. a list of integers which correspond to the features to be included in that term. 3. a dictionary of kwargs to be included when defining the term. max_iter: A pyGAM argument which determines the maximum iterations allowed when fitting the GAM. tol: A pyGAM argument determining the tolerance used to define the stopping criteria. distribution: A pyGAM argument determining the distribution to be used in the model. link: A pyGAM argument determining the link function to be used in the model. fit_intercept: A pyGAM argument determining whether to include an intercept term in the model. window_length: The length of the rolling window, in days, used to calculate the mean and standard deviation of the input cube when the input cube does not have a realization dimension coordinate. If using a centred rolling window, this must be an even integer greater than 1 in order to allow equal numbers of days on either side of the central time point. If using a trailing rolling window, this must be an integer greater than 1. required_rolling_window_points: The minimum number of valid data points required within a rolling window. If fewer valid points are present, the mean and standard deviation will be set to NaN for this window. trailing_window: If False, a centred window is used, which assigns the calculated statistic to the central time point in the window. If True, a trailing window is used, which assigns the calculated statistic to the final time point. unique_site_id_key: An optional key to use for uniquely identifying each site in the training data. If not provided, the default behavior is to use the spatial coordinates (latitude, longitude) of each site. """ self.model_specification = model_specification self.max_iter = max_iter self.tol = tol self.distribution = distribution self.link = link self.fit_intercept = fit_intercept self.unique_site_id_key = unique_site_id_key # Check if the window_length is an integer greater than one. raise_window_warning = False if window_length < 2 or window_length % 1 != 0: raise_window_warning = True if trailing_window & raise_window_warning: raise ValueError( "The window_length input must be an integer greater than 1 when " f"using a trailing rolling window. Received: {window_length}." ) elif (raise_window_warning or window_length % 2 != 0) and not trailing_window: # Additionally, check if window_length is even when using a centred window. raise ValueError( "The window_length input must be an even integer greater than 1 " f"when using a centred rolling window. Received: {window_length}." ) else: self.window_length = window_length self.trailing_window = trailing_window if ( not isinstance(required_rolling_window_points, int) or required_rolling_window_points < 2 ): raise ValueError( "The required_rolling_window_points input must be an integer greater " f"than 1. Received: {required_rolling_window_points}." ) else: self.required_rolling_window_points = required_rolling_window_points
[docs] def calculate_statistic_by_rolling_window(self, input_cube: Cube): """Function to calculate mean and standard deviation of input_cube using a rolling window calculation over the time coordinate. """ input_mean = CubeList() input_sd = CubeList() # This variable is used to calculate the bounds of each rolling window. window_td = datetime.timedelta(days=self.window_length) for time_index, tp in enumerate(input_cube.coord("time").cells()): time_point = tp.point._to_real_datetime() # Create time constraint for rolling window and extract data within window. if self.trailing_window: time_bounds = [time_point - window_td, time_point] else: time_bounds = [time_point - window_td / 2, time_point + window_td / 2] time_constraint = iris.Constraint( time=lambda cell: time_bounds[0] <= cell.point <= time_bounds[1] ) window_cube = input_cube.extract(time_constraint) if len(window_cube.coord("time").points) == 1: # If there is only one time point in the window, the mean and sd for # this window are set to NaN. window_mean = window_cube.copy( data=np.full_like(window_cube.data, np.nan, dtype=np.float32) ) window_sd = window_cube.copy( data=np.full_like(window_cube.data, np.nan, dtype=np.float32) ) else: window_mean = collapse_time(window_cube, "time", iris.analysis.MEAN) window_sd = collapse_time(window_cube, "time", iris.analysis.STD_DEV) # Check there are enough valid points in the window. Where there are # insufficient data points, replace calculated statistics with NaNs. window_valid_count = collapse_time( window_cube, "time", iris.analysis.COUNT, function=lambda values: ~np.isnan(values), ) window_mean.data = np.where( window_valid_count.data < self.required_rolling_window_points, np.nan, window_mean.data, ) window_sd.data = np.where( window_valid_count.data < self.required_rolling_window_points, np.nan, window_sd.data, ) # Set time-related coordinates to match those for this time point on # the input cube. for coord_name in [ "time", "forecast_reference_time", "blend_time", "forecast_period", ]: if coord_name in [c.name() for c in window_mean.coords()]: # The time-related coordinates are either scalar or have one # point associated with each time point. if len(input_cube.coord(coord_name).points) == 1: point = input_cube.coord(coord_name).points[0] else: point = input_cube.coord(coord_name).points[time_index] window_mean.coord(coord_name).points = np.array([point]) window_mean.coord(coord_name).bounds = None window_sd.coord(coord_name).points = np.array([point]) window_sd.coord(coord_name).bounds = None input_mean.append(window_mean.copy()) input_sd.append(window_sd.copy()) input_mean = MergeCubes().process(input_mean) input_sd = MergeCubes().process(input_sd) return input_mean, input_sd
[docs] def calculate_cube_statistics(self, input_cube: Cube) -> CubeList: """Function to calculate mean and standard deviation of the input cube. If the cube has a realization dimension then statistics will be calculated by collapsing over this dimension. Otherwise, a rolling window calculation over the time dimension will be used. The rolling window method calculates a statistic over data in a fixed time window. For example, for data points [0.0, 1.0, 2.0, 1.0, 0.0] each valid in consecutive hours T+0, T+1, T+2, T+3, T+4, the mean calculated by a rolling window of length 5 would be 0.8. Args: input_cube: A cube with at least one of the following coordinates: 1. A realization dimension coordinate 2. A time coordinate with more than one point. Returns: CubeList containing a mean cube and standard deviation cube. """ if input_cube.coords("realization"): input_mean = collapse_realizations(input_cube, method="mean") input_sd = collapse_realizations(input_cube, method="std_dev") else: input_mean, input_sd = self.calculate_statistic_by_rolling_window( input_cube ) return CubeList([input_mean, input_sd])
[docs] def process( self, input_cube: Cube, features: List[str], additional_fields: Optional[CubeList] = None, ) -> List: """ Function to fit GAMs to model the mean and standard deviation of the input_cube for use in SAMOS. Args: input_cube: Historic forecasts or observations from the training dataset. Must contain at least one of: - a realization coordinate - a time coordinate with more than one point features: The list of features. These must be either coordinates on input_cube or share a name with a cube in additional_fields. The index of each feature should match the indices used in model_specification. additional_fields: Additional fields to use as supplementary predictors. Returns: A list containing fitted GAMs which model the input_cube mean and standard deviation. Raises: ValueError: If input_cube does not contain at least one of a realization or time coordinate. ValueError: If the input cube does not have a realization coordinate and the time coordinate that it does have contains only one point. """ if not input_cube.coords("realization", dim_coords=True): if not input_cube.coords("time"): msg = ( "The input cube must contain at least one of a realization or time " "coordinate in order to allow the calculation of means and " "standard deviations. The following coordinates were found: " f"{input_cube.coords()}." ) raise ValueError(msg) elif len(input_cube.coord("time").points) == 1: msg = ( "The input cube does not contain a realization coordinate. In " "order to calculate means and standard deviations the time " "coordinate must contain more than one point. The following time " f"coordinate was found: {input_cube.coord('time')}." ) warnings.warn(msg) return None # Calculate mean and standard deviation from input cube. stat_cubes = self.calculate_cube_statistics(input_cube) # Create list to put fitted GAM models in. output = [] # Initialize plugin used to fit GAMs. plugin = GAMFit( model_specification=self.model_specification, max_iter=self.max_iter, tol=self.tol, distribution=self.distribution, link=self.link, fit_intercept=self.fit_intercept, ) for stat_cube in stat_cubes: df = prepare_data_for_gam( stat_cube, additional_fields, unique_site_id_key=self.unique_site_id_key ) feature_values = df[features].values targets = df[input_cube.name()].values output.append(plugin.process(feature_values, targets)) if None in output: return None return output
[docs] class TrainEMOSForSAMOS(BasePlugin): """Class to calculate Ensemble Model Output Statistics (EMOS) coefficients to calibrate climate anomaly forecasts given training data including forecasts and verifying observations and four Generalized Additive Models (GAMs) which model: - forecast mean, - forecast standard deviation, - observation mean, - observation standard deviation. This class first calculates climatological means and standard deviations by predicting them from the input GAMs. Following this, the input forecasts and observations are converted to climatological anomalies using the predicted means and standard deviations. Finally, EMOS coefficients are calculated from the climatological anomaly training data. """
[docs] def __init__( self, distribution: str, emos_kwargs: Optional[Dict] = None, unique_site_id_key: Optional[str] = None, constant_extrapolation: bool = False, ) -> None: """Initialize the class. Args: distribution: Name of distribution. Assume that a calibrated version of the climate anomaly forecast could be represented using this distribution. emos_kwargs: Keyword arguments accepted by the EstimateCoefficientsForEnsembleCalibration plugin. Should not contain a distribution argument. unique_site_id_key: If working with spot data and available, the name of the coordinate in the input cubes that contains unique site IDs, e.g. "wmo_id" if all sites have a valid wmo_id. constant_extrapolation: If True, when predicting mean and standard deviation from the GAMs, when the predictor values are outside the range of those used to fit the GAM, constant extrapolation (i.e. the nearest boundary value) will be used. If False, extrapolation extends the trend of each GAM term beyond the range of the training data. Default is False. """ self.distribution = distribution self.emos_kwargs = emos_kwargs if emos_kwargs else {} self.unique_site_id_key = unique_site_id_key self.constant_extrapolation = constant_extrapolation
[docs] def climate_anomaly_emos( self, forecast_cubes: List[Cube], truth_cubes: List[Cube], additional_fields: Optional[CubeList] = None, landsea_mask: Optional[Cube] = None, ) -> CubeList: """Function to convert forecasts and truths to climate anomalies then calculate EMOS coefficients for the climate anomalies. Args: forecast_cubes: A list of three cubes: a cube containing historic forecasts, a cube containing climatological mean predictions of the forecasts and a cube containing climatological standard deviation predictions of the forecasts. truth_cubes: A list of three cubes: a cube containing historic truths, a cube containing climatological mean predictions of the truths and a cube containing climatological standard deviation predictions of the truths. additional_fields: Additional fields to use as supplementary predictors. landsea_mask: The optional cube containing a land-sea mask. If provided, only land points are used to calculate the coefficients. Within the land-sea mask cube land points should be specified as ones, and sea points as zeros. Returns: CubeList constructed using the coefficients provided and using metadata from the historic_forecasts cube. Each cube within the cubelist is for a separate EMOS coefficient e.g. alpha, beta, gamma, delta. """ # Convert forecasts and truths to climatological anomalies. forecast_ca = CalculateClimateAnomalies(ignore_temporal_mismatch=True).process( *forecast_cubes ) truth_ca = CalculateClimateAnomalies(ignore_temporal_mismatch=True).process( *truth_cubes ) plugin = EstimateCoefficientsForEnsembleCalibration( distribution=self.distribution, **self.emos_kwargs, ) return plugin.process( historic_forecasts=forecast_ca, truths=truth_ca, additional_fields=additional_fields, landsea_mask=landsea_mask, )
[docs] def process( self, historic_forecasts: Cube, truths: Cube, forecast_gams: List, truth_gams: List, gam_features: List[str], gam_additional_fields: Optional[CubeList] = None, emos_additional_fields: Optional[CubeList] = None, landsea_mask: Optional[Cube] = None, ) -> CubeList: """Function to convert historic forecasts and truths to climatological anomalies, then fit EMOS coefficients to these anomalies. Args: historic_forecasts: Historic forecasts from the training dataset. truths: Truths from the training dataset. forecast_gams: A list containing two fitted GAMs, the first for predicting the climatological mean of the locations in historic_forecasts and the second predicting the climatological standard deviation. Appropriate GAMs are produced by the TrainGAMsForSAMOS plugin. truth_gams: A list containing two fitted GAMs, the first for predicting the climatological mean of the locations in truths and the second predicting the climatological standard deviation. Appropriate GAMs are produced by the TrainGAMsForSAMOS plugin. gam_features: The list of features. These must be either coordinates on input_cube or share a name with a cube in gam_additional_fields. The index of each feature must match the indices used in model_specification. gam_additional_fields: Additional fields to use as supplementary predictors in the GAMs. emos_additional_fields: Additional fields to use as supplementary predictors in EMOS. landsea_mask: The optional cube containing a land-sea mask. If provided, only land points are used to calculate the EMOS coefficients. Within the land-sea mask cube land points should be specified as ones, and sea points as zeros. Returns: CubeList constructed using the coefficients provided and using metadata from the historic_forecasts cube. Each cube within the cubelist is for a separate EMOS coefficient e.g. alpha, beta, gamma, delta. """ forecast_mean, forecast_sd = get_climatological_stats( historic_forecasts, forecast_gams, gam_features, gam_additional_fields, unique_site_id_key=self.unique_site_id_key, constant_extrapolation=self.constant_extrapolation, ) truth_mean, truth_sd = get_climatological_stats( truths, truth_gams, gam_features, gam_additional_fields, unique_site_id_key=self.unique_site_id_key, constant_extrapolation=self.constant_extrapolation, ) emos_coefficients = self.climate_anomaly_emos( forecast_cubes=[historic_forecasts, forecast_mean, forecast_sd], truth_cubes=[truths, truth_mean, truth_sd], additional_fields=emos_additional_fields, landsea_mask=landsea_mask, ) return emos_coefficients
[docs] class ApplySAMOS(PostProcessingPlugin): """ Class to calibrate an input forecast using SAMOS given the following inputs: - Two GAMs which model, respectively, the climatological mean and standard deviation of the forecast. This allows the forecast to be converted to climatological anomalies. - Two GAMs which model, respectively, the climatological mean and standard deviation of the truths. This allows the climatological anomalies to be transformed back to the original forecast units. - A set of EMOS coefficients which can be applied to correct the climatological anomalies. """
[docs] def __init__( self, percentiles: Optional[Sequence] = None, unique_site_id_key: Optional[str] = None, constant_extrapolation: bool = False, ): """Initialize class. Args: percentiles: The set of percentiles used to create the calibrated forecast. unique_site_id_key: If working with spot data and available, the name of the coordinate in the input cubes that contains unique site IDs, e.g. "wmo_id" if all sites have a valid wmo_id. constant_extrapolation: If True, when predicting mean and standard deviation from the GAMs, when the predictor values are outside the range of those used to fit the GAM, constant extrapolation (i.e. the nearest boundary value) will be used. If False, extrapolation extends the trend of each GAM term beyond the range of the training data. Default is False. """ self.percentiles = [np.float32(p) for p in percentiles] if percentiles else None self.unique_site_id_key = unique_site_id_key self.constant_extrapolation = constant_extrapolation
[docs] @staticmethod def transform_anomalies_to_original_units( location_parameter: Cube, scale_parameter: Cube, truth_mean: Cube, truth_sd: Cube, forecast: Cube, input_forecast_type: str, ) -> None: """Function to transform location and scale parameters which describe a climatological anomaly distribution to location and scale parameters which describe a distribution in the units of the original forecast. Both parameter cubes are modified in place. Predictions of mean and standard deviation from the 'truth' GAMs are used for this transformation. This ensures that the calibrated forecast follows the 'true' distribution, rather than the distribution of the original forecast, following the suggested method in: Dabernig, M., Mayr, G.J., Messner, J.W. and Zeileis, A. (2017). Spatial ensemble post-processing with standardized anomalies. Q.J.R. Meteorol. Soc, 143: 909-916. https://doi.org/10.1002/qj.2975 Args: location_parameter: Cube containing the location parameter of the climatological anomaly distribution. This is modified in place. scale_parameter: Cube containing the scale parameter of the climatological anomaly distribution. This is modified in place. truth_mean: Cube containing climatological mean predictions of the truths. truth_sd: Cube containing climatological standard deviation predictions of the truths. forecast: The original, uncalibrated forecast. input_forecast_type: The type of the original, uncalibrated forecast. One of 'realizations', 'percentiles' or 'probabilities'. """ # The data in these cubes are identical along the realization dimensions. truth_mean = next(truth_mean.slices_over("realization")) truth_sd = next(truth_sd.slices_over("realization")) # Transform location and scale parameters so that they represent a distribution # in the units of the original forecast, rather than climatological anomalies. forecast_units = ( forecast.units if input_forecast_type != "probabilities" else find_threshold_coordinate(forecast).units ) location_parameter.data = ( location_parameter.data * truth_sd.data ) + truth_mean.data location_parameter.units = forecast_units # The scale parameter returned by ApplyEMOS is the standard deviation for a # normal distribution. To get the desired standard deviation in # realization/percentile space we must multiply by the estimated forecast # standard deviation. scale_parameter.data = scale_parameter.data * truth_sd.data scale_parameter.units = forecast_units
[docs] def process( self, forecast: Cube, forecast_gams: List, truth_gams: List, gam_features: List[str], emos_coefficients: CubeList, gam_additional_fields: Optional[CubeList] = None, emos_additional_fields: Optional[CubeList] = None, prob_template: Optional[Cube] = None, realizations_count: Optional[int] = None, ignore_ecc_bounds: bool = True, tolerate_time_mismatch: bool = False, predictor: str = "mean", randomise: bool = False, random_seed: Optional[int] = None, ): """Calibrate input forecast using GAMs to convert the forecast to climatological anomalies and pre-calculated EMOS coefficients to apply to those anomalies. Args: forecast: Uncalibrated forecast as probabilities, percentiles or realizations. forecast_gams: A list containing two fitted GAMs, the first for predicting the climatological mean of the historic forecasts at each location and the second predicting the climatological standard deviation. truth_gams: A list containing two fitted GAMs, the first for predicting the climatological mean of the truths at each location and the second predicting the climatological standard deviation. gam_features: The list of features. These must be either coordinates on input_cube or share a name with a cube in additional_cubes. The index of each feature should match the indices used in model_specification. emos_coefficients: EMOS coefficients. gam_additional_fields: Additional fields to use as supplementary predictors in the GAMs. emos_additional_fields: Additional fields to use as supplementary predictors in EMOS. prob_template: A cube containing a probability forecast that will be used as a template when generating probability output when the input format of the forecast cube is not probabilities i.e. realizations or percentiles. realizations_count: Number of realizations to use when generating the intermediate calibrated forecast from probability or percentile inputs ignore_ecc_bounds: If True, allow percentiles from probabilities to exceed the ECC bounds range. If input is not probabilities, this is ignored. tolerate_time_mismatch: If True, tolerate a mismatch in validity time and forecast period for coefficients vs forecasts. Use with caution! predictor: Predictor to be used to calculate the location parameter of the calibrated distribution. Value is "mean" or "realizations". randomise: Used in generating calibrated realizations. If input forecast is probabilities or percentiles, this is ignored. random_seed: Used in generating calibrated realizations. If input forecast is probabilities or percentiles, this is ignored. Returns: Calibrated forecast in the form of the input (ie probabilities percentiles or realizations). """ input_forecast_type = get_forecast_type(forecast) forecast_as_realizations = forecast.copy() if input_forecast_type != "realizations": forecast_as_realizations = convert_to_realizations( forecast.copy(), realizations_count, ignore_ecc_bounds ) forecast_mean, forecast_sd = get_climatological_stats( forecast_as_realizations, forecast_gams, gam_features, gam_additional_fields, unique_site_id_key=self.unique_site_id_key, constant_extrapolation=self.constant_extrapolation, ) forecast_ca = CalculateClimateAnomalies(ignore_temporal_mismatch=True).process( diagnostic_cube=forecast_as_realizations, mean_cube=forecast_mean, std_cube=forecast_sd, ) # Returns parameters which describe a climate anomaly distribution. location_parameter, scale_parameter = ApplyEMOS( percentiles=self.percentiles ).process( forecast=forecast_ca, coefficients=emos_coefficients, additional_fields=emos_additional_fields, prob_template=prob_template, realizations_count=realizations_count, ignore_ecc_bounds=ignore_ecc_bounds, tolerate_time_mismatch=tolerate_time_mismatch, predictor=predictor, randomise=randomise, random_seed=random_seed, return_parameters=True, ) truth_mean, truth_sd = get_climatological_stats( forecast_as_realizations, truth_gams, gam_features, gam_additional_fields, unique_site_id_key=self.unique_site_id_key, constant_extrapolation=self.constant_extrapolation, ) # Transform location and scale parameters to be in the same units as the # original forecast. self.transform_anomalies_to_original_units( truth_mean=truth_mean, truth_sd=truth_sd, location_parameter=location_parameter, scale_parameter=scale_parameter, forecast=forecast, input_forecast_type=input_forecast_type, ) # Use the GAM distribution for the truths to determine the distribution to use # for the calibrated forecast. distribution = truth_gams[0].distribution.get_params(deep=True)["_name"] distribution = PYGAM_DISTRIBUTION_MAPPING[distribution] shape, location, scale = DistributionalParameters( distribution=distribution, truncation_points=( get_attribute_from_coefficients( emos_coefficients, "shape_parameters", optional=True ) ), ).process( mean_cube=location_parameter, sd_cube=scale_parameter, ) # Generate output in desired format from distribution. self.distribution = { "name": distribution, "location": location, "scale": scale, "shape": shape, } template = prob_template if prob_template else forecast result = generate_forecast_from_distribution( self.distribution, template, self.percentiles, randomise, random_seed ) if input_forecast_type != "probabilities" and not prob_template: # Enforce that the result is within sensible bounds. bounds_pairing = get_bounds_of_distribution( bounds_pairing_key=result.name(), desired_units=result.units ) result.data = np.clip( result.data, a_min=bounds_pairing[0], a_max=bounds_pairing[1] ) # Enforce correct dtype. result.data = result.data.astype(dtype=np.float32) return result