# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""
This module defines all the "plugins" specific to Standardised Anomaly Model Output
Statistics (SAMOS).
"""
import datetime
import warnings
from typing import Dict, List, Optional, Sequence, Tuple
import iris
import iris.pandas
import numpy as np
import pandas as pd
from improver.utilities.statistical import (
DistributionalParameters,
)
try:
import pygam
except ModuleNotFoundError:
# Define dummy class to avoid type hint errors.
from iris.cube import Cube, CubeList
from improver import BasePlugin, PostProcessingPlugin
from improver.calibration import add_static_feature_from_cube_to_df
from improver.calibration.emos_calibration import (
ApplyEMOS,
EstimateCoefficientsForEnsembleCalibration,
convert_to_realizations,
generate_forecast_from_distribution,
get_attribute_from_coefficients,
get_forecast_type,
)
from improver.ensemble_copula_coupling.utilities import get_bounds_of_distribution
from improver.metadata.probabilistic import find_threshold_coordinate
from improver.utilities.cube_manipulation import (
MergeCubes,
collapse_realizations,
collapse_time,
)
from improver.utilities.generalized_additive_models import GAMFit, GAMPredict
from improver.utilities.mathematical_operations import CalculateClimateAnomalies
# Setting to allow cubes with more than 2 dimensions to be converted to/from dataframes.
iris.FUTURE.pandas_ndim = True
# Dictionary mapping PyGAM distribution names to those used internally in Scipy.
PYGAM_DISTRIBUTION_MAPPING = {
"normal": "norm",
"binomial": "binom",
"poisson": "poisson",
"gamma": "gamma",
"inv_gauss": "invgauss",
}
[docs]
def prepare_data_for_gam(
input_cube: Cube,
additional_fields: Optional[CubeList] = None,
unique_site_id_key: Optional[str] = None,
) -> pd.DataFrame:
"""
Convert input cubes in to a single, combined dataframe.
Each of the input cubes is converted to a pandas dataframe. The dataframe derived
from input_cube then forms the left in a series of left dataframe joins with those
derived from each cube in additional_fields. The x and y coordinates are used to
perform this join. This means that the resulting combined dataframe will contain all
of the sites/grid points in input_cube, but not any other sites/grid points in the
additional_fields cubes.
Args:
input_cube:
A cube of forecast or observation data.
additional_fields:
Additional cubes with points which can be matched with points
in input_cube by matching spatial coordinate values.
unique_site_id_key:
If working with spot data and available, the name of the coordinate
in the input cubes that contains unique site IDs, e.g. "wmo_id" if
all sites have a valid wmo_id.
Returns:
A pandas dataframe with rows equal to the number of sites/grid points in
input_cube and containing the following columns:
1. A column with the same name as input_cube containing the original cube data
2. A series of columns derived from the input_cube dimension coordinates
3. A series of columns associated with any auxiliary coordinates
(scalar or otherwise) of input_cube
4. One column associated with each of the cubes in additional cubes, with
column names matching the associated cube
"""
df = iris.pandas.as_data_frame(
input_cube,
add_aux_coords=True,
add_cell_measures=True,
add_ancillary_variables=True,
)
df.reset_index(inplace=True)
if additional_fields:
# Check if we are dealing with spot data.
site_data = "spot_index" in [c.name() for c in input_cube.coords()]
# For site data we should use unique IDs wherever possible. As a
# fallback we can match on latitude, longitude and altitude. We
# need altitude to accommodate e.g. sites with a lower and upper
# forecast altitude. If we are working with gridded data we use the
# spatial coordinates of the input cube.
if site_data and unique_site_id_key is not None:
match_coords = [unique_site_id_key]
elif site_data:
match_coords = ["latitude", "longitude", "altitude"]
else:
match_coords = [
input_cube.coord(axis="X").name(),
input_cube.coord(axis="Y").name(),
]
for cube in additional_fields:
df = add_static_feature_from_cube_to_df(
df,
cube,
feature_name=cube.name(),
possible_merge_columns=match_coords,
float_decimals=4,
)
return df
[docs]
def convert_dataframe_to_cube(
df: pd.DataFrame,
template_cube: Cube,
) -> Cube:
"""Function to convert a Pandas dataframe to Iris cube format by using a template
cube. The input template_cube provides all metadata for the output.
Args:
df: A Pandas dataframe which must contain at least the following columns:
1. A column matching the name of template_cube
2. A series of columns with names which match the dimension coordinates on
template_cube. The data in these columns should match the points on the
corresponding dimension of template_cube.
template_cube: A cube which will provide all metadata for the output cube
Returns:
A copy of template_cube containing data from df.
"""
dim_coords = [c.name() for c in template_cube.coords(dim_coords=True)]
diagnostic = template_cube.name()
indexed_df = df.set_index(dim_coords, inplace=False)
indexed_df.sort_index(inplace=True)
# The as_cubes() function returns a cubelist. In this case, the cubelist contains
# only one element.
converted_cube = iris.pandas.as_cubes(indexed_df[[diagnostic]])[0]
result = template_cube.copy(data=converted_cube.data)
return result
[docs]
def get_climatological_stats(
input_cube: Cube,
gams: List,
gam_features: List[str],
additional_cubes: Optional[CubeList],
sd_clip: float = 0.25,
unique_site_id_key: Optional[str] = None,
constant_extrapolation: bool = False,
) -> Tuple[Cube, Cube]:
"""Function to predict climatological means and standard deviations given fitted
GAMs for each statistic and cubes which can be used to construct a dataframe
containing all required features for those GAMs.
Args:
input_cube
gams: A list containing two fitted GAMs, the first for predicting the
climatological mean of the locations in input_cube and the second
predicting the climatological standard deviation.
gam_features:
The list of features. These must be either coordinates on input_cube or
share a name with a cube in additional_cubes. The index of each
feature should match the indices used in model_specification.
additional_cubes:
Additional fields to use as supplementary predictors.
sd_clip:
The minimum standard deviation value to allow when predicting from the GAM.
Any predictions below this value will be set to this value.
unique_site_id_key:
If working with spot data and available, the name of the coordinate
in the input cubes that contains unique site IDs, e.g. "wmo_id" if
all sites have a valid wmo_id.
constant_extrapolation:
If True, predictor values outside the range of those used to fit the GAM
will be predicted using constant extrapolation (i.e. the nearest
boundary value). If False, extrapolation extends the trend of each
GAM term beyond the range of the training data. Default is False.
Returns:
A pair of cubes containing climatological mean and climatological standard
deviation predictions respectively.
"""
diagnostic = input_cube.name()
df = prepare_data_for_gam(
input_cube, additional_cubes, unique_site_id_key=unique_site_id_key
)
# Calculate climatological means and standard deviations using previously
# fitted GAMs.
mean_pred = GAMPredict(constant_extrapolation=constant_extrapolation).process(
gams[0], np.array(df[gam_features])
)
sd_pred = GAMPredict(constant_extrapolation=constant_extrapolation).process(
gams[1], np.array(df[gam_features])
)
# Convert means and standard deviations into cubes
df[diagnostic] = mean_pred
mean_cube = convert_dataframe_to_cube(df, input_cube)
df[diagnostic] = sd_pred
sd_cube = convert_dataframe_to_cube(df, input_cube)
sd_cube.data = np.clip(sd_cube.data, a_min=sd_clip, a_max=None)
return mean_cube, sd_cube
[docs]
class TrainGAMsForSAMOS(BasePlugin):
"""
Class for fitting Generalised Additive Models (GAMs) to training data for use in
a Standardised Anomaly Model Output Statistics (SAMOS) calibration scheme.
Two GAMs are trained: one modelling the mean of the training data and one
modelling the standard deviation. These can then be used to convert forecasts or
observations to climatological anomalies. This plugin should be run separately
for forecast and observation data.
"""
[docs]
def __init__(
self,
model_specification: list[list[str], list[int], dict],
max_iter: int = 100,
tol: float = 0.0001,
distribution: str = "normal",
link: str = "identity",
fit_intercept: bool = True,
window_length: int = 10,
required_rolling_window_points: int = 5,
trailing_window: bool = False,
unique_site_id_key: Optional[str] = None,
):
"""
Initialize the class.
Args:
model_specification:
A list of lists which each contain three items (in order):
1. a string containing a single pyGAM term; one of 'linear',
'spline', 'tensor', or 'factor'.
2. a list of integers which correspond to the features to be
included in that term.
3. a dictionary of kwargs to be included when defining the term.
max_iter:
A pyGAM argument which determines the maximum iterations allowed when
fitting the GAM.
tol:
A pyGAM argument determining the tolerance used to define the stopping
criteria.
distribution:
A pyGAM argument determining the distribution to be used in the model.
link:
A pyGAM argument determining the link function to be used in the model.
fit_intercept:
A pyGAM argument determining whether to include an intercept term in
the model.
window_length:
The length of the rolling window, in days, used to calculate the mean
and standard deviation of the input cube when the input cube does not
have a realization dimension coordinate. If using a centred rolling
window, this must be an even integer greater than 1 in order to allow
equal numbers of days on either side of the central time point. If
using a trailing rolling window, this must be an integer greater than 1.
required_rolling_window_points:
The minimum number of valid data points required within a rolling
window. If fewer valid points are present, the mean and standard
deviation will be set to NaN for this window.
trailing_window:
If False, a centred window is used, which assigns the calculated
statistic to the central time point in the window. If True, a trailing
window is used, which assigns the calculated statistic to the final time
point.
unique_site_id_key:
An optional key to use for uniquely identifying each site in the
training data. If not provided, the default behavior is to use the
spatial coordinates (latitude, longitude) of each site.
"""
self.model_specification = model_specification
self.max_iter = max_iter
self.tol = tol
self.distribution = distribution
self.link = link
self.fit_intercept = fit_intercept
self.unique_site_id_key = unique_site_id_key
# Check if the window_length is an integer greater than one.
raise_window_warning = False
if window_length < 2 or window_length % 1 != 0:
raise_window_warning = True
if trailing_window & raise_window_warning:
raise ValueError(
"The window_length input must be an integer greater than 1 when "
f"using a trailing rolling window. Received: {window_length}."
)
elif (raise_window_warning or window_length % 2 != 0) and not trailing_window:
# Additionally, check if window_length is even when using a centred window.
raise ValueError(
"The window_length input must be an even integer greater than 1 "
f"when using a centred rolling window. Received: {window_length}."
)
else:
self.window_length = window_length
self.trailing_window = trailing_window
if (
not isinstance(required_rolling_window_points, int)
or required_rolling_window_points < 2
):
raise ValueError(
"The required_rolling_window_points input must be an integer greater "
f"than 1. Received: {required_rolling_window_points}."
)
else:
self.required_rolling_window_points = required_rolling_window_points
[docs]
def calculate_statistic_by_rolling_window(self, input_cube: Cube):
"""Function to calculate mean and standard deviation of input_cube using a
rolling window calculation over the time coordinate.
"""
input_mean = CubeList()
input_sd = CubeList()
# This variable is used to calculate the bounds of each rolling window.
window_td = datetime.timedelta(days=self.window_length)
for time_index, tp in enumerate(input_cube.coord("time").cells()):
time_point = tp.point._to_real_datetime()
# Create time constraint for rolling window and extract data within window.
if self.trailing_window:
time_bounds = [time_point - window_td, time_point]
else:
time_bounds = [time_point - window_td / 2, time_point + window_td / 2]
time_constraint = iris.Constraint(
time=lambda cell: time_bounds[0] <= cell.point <= time_bounds[1]
)
window_cube = input_cube.extract(time_constraint)
if len(window_cube.coord("time").points) == 1:
# If there is only one time point in the window, the mean and sd for
# this window are set to NaN.
window_mean = window_cube.copy(
data=np.full_like(window_cube.data, np.nan, dtype=np.float32)
)
window_sd = window_cube.copy(
data=np.full_like(window_cube.data, np.nan, dtype=np.float32)
)
else:
window_mean = collapse_time(window_cube, "time", iris.analysis.MEAN)
window_sd = collapse_time(window_cube, "time", iris.analysis.STD_DEV)
# Check there are enough valid points in the window. Where there are
# insufficient data points, replace calculated statistics with NaNs.
window_valid_count = collapse_time(
window_cube,
"time",
iris.analysis.COUNT,
function=lambda values: ~np.isnan(values),
)
window_mean.data = np.where(
window_valid_count.data < self.required_rolling_window_points,
np.nan,
window_mean.data,
)
window_sd.data = np.where(
window_valid_count.data < self.required_rolling_window_points,
np.nan,
window_sd.data,
)
# Set time-related coordinates to match those for this time point on
# the input cube.
for coord_name in [
"time",
"forecast_reference_time",
"blend_time",
"forecast_period",
]:
if coord_name in [c.name() for c in window_mean.coords()]:
# The time-related coordinates are either scalar or have one
# point associated with each time point.
if len(input_cube.coord(coord_name).points) == 1:
point = input_cube.coord(coord_name).points[0]
else:
point = input_cube.coord(coord_name).points[time_index]
window_mean.coord(coord_name).points = np.array([point])
window_mean.coord(coord_name).bounds = None
window_sd.coord(coord_name).points = np.array([point])
window_sd.coord(coord_name).bounds = None
input_mean.append(window_mean.copy())
input_sd.append(window_sd.copy())
input_mean = MergeCubes().process(input_mean)
input_sd = MergeCubes().process(input_sd)
return input_mean, input_sd
[docs]
def calculate_cube_statistics(self, input_cube: Cube) -> CubeList:
"""Function to calculate mean and standard deviation of the input cube. If the
cube has a realization dimension then statistics will be calculated by
collapsing over this dimension. Otherwise, a rolling window calculation over
the time dimension will be used.
The rolling window method calculates a statistic over data in a fixed time
window. For example, for data points [0.0, 1.0, 2.0, 1.0, 0.0] each valid in
consecutive hours T+0, T+1, T+2, T+3, T+4, the mean calculated by a rolling
window of length 5 would be 0.8.
Args:
input_cube:
A cube with at least one of the following coordinates:
1. A realization dimension coordinate
2. A time coordinate with more than one point.
Returns:
CubeList containing a mean cube and standard deviation cube.
"""
if input_cube.coords("realization"):
input_mean = collapse_realizations(input_cube, method="mean")
input_sd = collapse_realizations(input_cube, method="std_dev")
else:
input_mean, input_sd = self.calculate_statistic_by_rolling_window(
input_cube
)
return CubeList([input_mean, input_sd])
[docs]
def process(
self,
input_cube: Cube,
features: List[str],
additional_fields: Optional[CubeList] = None,
) -> List:
"""
Function to fit GAMs to model the mean and standard deviation of the input_cube
for use in SAMOS.
Args:
input_cube:
Historic forecasts or observations from the training dataset. Must
contain at least one of:
- a realization coordinate
- a time coordinate with more than one point
features:
The list of features. These must be either coordinates on input_cube or
share a name with a cube in additional_fields. The index of each
feature should match the indices used in model_specification.
additional_fields:
Additional fields to use as supplementary predictors.
Returns:
A list containing fitted GAMs which model the input_cube mean and
standard deviation.
Raises:
ValueError: If input_cube does not contain at least one of a realization or
time coordinate.
ValueError: If the input cube does not have a realization coordinate and the
time coordinate that it does have contains only one point.
"""
if not input_cube.coords("realization", dim_coords=True):
if not input_cube.coords("time"):
msg = (
"The input cube must contain at least one of a realization or time "
"coordinate in order to allow the calculation of means and "
"standard deviations. The following coordinates were found: "
f"{input_cube.coords()}."
)
raise ValueError(msg)
elif len(input_cube.coord("time").points) == 1:
msg = (
"The input cube does not contain a realization coordinate. In "
"order to calculate means and standard deviations the time "
"coordinate must contain more than one point. The following time "
f"coordinate was found: {input_cube.coord('time')}."
)
warnings.warn(msg)
return None
# Calculate mean and standard deviation from input cube.
stat_cubes = self.calculate_cube_statistics(input_cube)
# Create list to put fitted GAM models in.
output = []
# Initialize plugin used to fit GAMs.
plugin = GAMFit(
model_specification=self.model_specification,
max_iter=self.max_iter,
tol=self.tol,
distribution=self.distribution,
link=self.link,
fit_intercept=self.fit_intercept,
)
for stat_cube in stat_cubes:
df = prepare_data_for_gam(
stat_cube, additional_fields, unique_site_id_key=self.unique_site_id_key
)
feature_values = df[features].values
targets = df[input_cube.name()].values
output.append(plugin.process(feature_values, targets))
if None in output:
return None
return output
[docs]
class TrainEMOSForSAMOS(BasePlugin):
"""Class to calculate Ensemble Model Output Statistics (EMOS) coefficients to
calibrate climate anomaly forecasts given training data including forecasts and
verifying observations and four Generalized Additive Models (GAMs) which model:
- forecast mean,
- forecast standard deviation,
- observation mean,
- observation standard deviation.
This class first calculates climatological means and standard deviations by
predicting them from the input GAMs. Following this, the input forecasts and
observations are converted to climatological anomalies using the predicted means
and standard deviations. Finally, EMOS coefficients are calculated from the
climatological anomaly training data.
"""
[docs]
def __init__(
self,
distribution: str,
emos_kwargs: Optional[Dict] = None,
unique_site_id_key: Optional[str] = None,
constant_extrapolation: bool = False,
) -> None:
"""Initialize the class.
Args:
distribution:
Name of distribution. Assume that a calibrated version of the
climate anomaly forecast could be represented using this distribution.
emos_kwargs: Keyword arguments accepted by the
EstimateCoefficientsForEnsembleCalibration plugin. Should not contain
a distribution argument.
unique_site_id_key:
If working with spot data and available, the name of the coordinate
in the input cubes that contains unique site IDs, e.g. "wmo_id" if
all sites have a valid wmo_id.
constant_extrapolation:
If True, when predicting mean and standard deviation from the GAMs,
when the predictor values are outside the range of those used to fit
the GAM, constant extrapolation (i.e. the nearest boundary value) will
be used. If False, extrapolation extends the trend of each
GAM term beyond the range of the training data. Default is False.
"""
self.distribution = distribution
self.emos_kwargs = emos_kwargs if emos_kwargs else {}
self.unique_site_id_key = unique_site_id_key
self.constant_extrapolation = constant_extrapolation
[docs]
def climate_anomaly_emos(
self,
forecast_cubes: List[Cube],
truth_cubes: List[Cube],
additional_fields: Optional[CubeList] = None,
landsea_mask: Optional[Cube] = None,
) -> CubeList:
"""Function to convert forecasts and truths to climate anomalies then calculate
EMOS coefficients for the climate anomalies.
Args:
forecast_cubes:
A list of three cubes: a cube containing historic forecasts, a cube
containing climatological mean predictions of the forecasts and a cube
containing climatological standard deviation predictions of the
forecasts.
truth_cubes:
A list of three cubes: a cube containing historic truths, a cube
containing climatological mean predictions of the truths and a cube
containing climatological standard deviation predictions of the truths.
additional_fields:
Additional fields to use as supplementary predictors.
landsea_mask:
The optional cube containing a land-sea mask. If provided, only
land points are used to calculate the coefficients. Within the
land-sea mask cube land points should be specified as ones,
and sea points as zeros.
Returns:
CubeList constructed using the coefficients provided and using
metadata from the historic_forecasts cube. Each cube within the
cubelist is for a separate EMOS coefficient e.g. alpha, beta,
gamma, delta.
"""
# Convert forecasts and truths to climatological anomalies.
forecast_ca = CalculateClimateAnomalies(ignore_temporal_mismatch=True).process(
*forecast_cubes
)
truth_ca = CalculateClimateAnomalies(ignore_temporal_mismatch=True).process(
*truth_cubes
)
plugin = EstimateCoefficientsForEnsembleCalibration(
distribution=self.distribution,
**self.emos_kwargs,
)
return plugin.process(
historic_forecasts=forecast_ca,
truths=truth_ca,
additional_fields=additional_fields,
landsea_mask=landsea_mask,
)
[docs]
def process(
self,
historic_forecasts: Cube,
truths: Cube,
forecast_gams: List,
truth_gams: List,
gam_features: List[str],
gam_additional_fields: Optional[CubeList] = None,
emos_additional_fields: Optional[CubeList] = None,
landsea_mask: Optional[Cube] = None,
) -> CubeList:
"""Function to convert historic forecasts and truths to climatological
anomalies, then fit EMOS coefficients to these anomalies.
Args:
historic_forecasts:
Historic forecasts from the training dataset.
truths:
Truths from the training dataset.
forecast_gams:
A list containing two fitted GAMs, the first for predicting the
climatological mean of the locations in historic_forecasts and the
second predicting the climatological standard deviation. Appropriate
GAMs are produced by the TrainGAMsForSAMOS plugin.
truth_gams:
A list containing two fitted GAMs, the first for predicting the
climatological mean of the locations in truths and the second
predicting the climatological standard deviation. Appropriate
GAMs are produced by the TrainGAMsForSAMOS plugin.
gam_features:
The list of features. These must be either coordinates on input_cube or
share a name with a cube in gam_additional_fields. The index of each
feature must match the indices used in model_specification.
gam_additional_fields:
Additional fields to use as supplementary predictors in the GAMs.
emos_additional_fields:
Additional fields to use as supplementary predictors in EMOS.
landsea_mask:
The optional cube containing a land-sea mask. If provided, only
land points are used to calculate the EMOS coefficients. Within the
land-sea mask cube land points should be specified as ones,
and sea points as zeros.
Returns:
CubeList constructed using the coefficients provided and using
metadata from the historic_forecasts cube. Each cube within the
cubelist is for a separate EMOS coefficient e.g. alpha, beta,
gamma, delta.
"""
forecast_mean, forecast_sd = get_climatological_stats(
historic_forecasts,
forecast_gams,
gam_features,
gam_additional_fields,
unique_site_id_key=self.unique_site_id_key,
constant_extrapolation=self.constant_extrapolation,
)
truth_mean, truth_sd = get_climatological_stats(
truths,
truth_gams,
gam_features,
gam_additional_fields,
unique_site_id_key=self.unique_site_id_key,
constant_extrapolation=self.constant_extrapolation,
)
emos_coefficients = self.climate_anomaly_emos(
forecast_cubes=[historic_forecasts, forecast_mean, forecast_sd],
truth_cubes=[truths, truth_mean, truth_sd],
additional_fields=emos_additional_fields,
landsea_mask=landsea_mask,
)
return emos_coefficients
[docs]
class ApplySAMOS(PostProcessingPlugin):
"""
Class to calibrate an input forecast using SAMOS given the following inputs:
- Two GAMs which model, respectively, the climatological mean and standard
deviation of the forecast. This allows the forecast to be converted to
climatological anomalies.
- Two GAMs which model, respectively, the climatological mean and standard
deviation of the truths. This allows the climatological anomalies to be
transformed back to the original forecast units.
- A set of EMOS coefficients which can be applied to correct the climatological
anomalies.
"""
[docs]
def __init__(
self,
percentiles: Optional[Sequence] = None,
unique_site_id_key: Optional[str] = None,
constant_extrapolation: bool = False,
):
"""Initialize class.
Args:
percentiles:
The set of percentiles used to create the calibrated forecast.
unique_site_id_key:
If working with spot data and available, the name of the coordinate
in the input cubes that contains unique site IDs, e.g. "wmo_id" if
all sites have a valid wmo_id.
constant_extrapolation:
If True, when predicting mean and standard deviation from the GAMs,
when the predictor values are outside the range of those used to fit
the GAM, constant extrapolation (i.e. the nearest boundary value) will
be used. If False, extrapolation extends the trend of each
GAM term beyond the range of the training data. Default is False.
"""
self.percentiles = [np.float32(p) for p in percentiles] if percentiles else None
self.unique_site_id_key = unique_site_id_key
self.constant_extrapolation = constant_extrapolation
[docs]
def process(
self,
forecast: Cube,
forecast_gams: List,
truth_gams: List,
gam_features: List[str],
emos_coefficients: CubeList,
gam_additional_fields: Optional[CubeList] = None,
emos_additional_fields: Optional[CubeList] = None,
prob_template: Optional[Cube] = None,
realizations_count: Optional[int] = None,
ignore_ecc_bounds: bool = True,
tolerate_time_mismatch: bool = False,
predictor: str = "mean",
randomise: bool = False,
random_seed: Optional[int] = None,
):
"""Calibrate input forecast using GAMs to convert the forecast to climatological
anomalies and pre-calculated EMOS coefficients to apply to those anomalies.
Args:
forecast:
Uncalibrated forecast as probabilities, percentiles or
realizations.
forecast_gams:
A list containing two fitted GAMs, the first for predicting the
climatological mean of the historic forecasts at each location and the
second predicting the climatological standard deviation.
truth_gams:
A list containing two fitted GAMs, the first for predicting the
climatological mean of the truths at each location and the
second predicting the climatological standard deviation.
gam_features:
The list of features. These must be either coordinates on input_cube or
share a name with a cube in additional_cubes. The index of each
feature should match the indices used in model_specification.
emos_coefficients:
EMOS coefficients.
gam_additional_fields:
Additional fields to use as supplementary predictors in the GAMs.
emos_additional_fields:
Additional fields to use as supplementary predictors in EMOS.
prob_template:
A cube containing a probability forecast that will be used as
a template when generating probability output when the input
format of the forecast cube is not probabilities i.e. realizations
or percentiles.
realizations_count:
Number of realizations to use when generating the intermediate
calibrated forecast from probability or percentile inputs
ignore_ecc_bounds:
If True, allow percentiles from probabilities to exceed the ECC
bounds range. If input is not probabilities, this is ignored.
tolerate_time_mismatch:
If True, tolerate a mismatch in validity time and forecast
period for coefficients vs forecasts. Use with caution!
predictor:
Predictor to be used to calculate the location parameter of the
calibrated distribution. Value is "mean" or "realizations".
randomise:
Used in generating calibrated realizations. If input forecast
is probabilities or percentiles, this is ignored.
random_seed:
Used in generating calibrated realizations. If input forecast
is probabilities or percentiles, this is ignored.
Returns:
Calibrated forecast in the form of the input (ie probabilities
percentiles or realizations).
"""
input_forecast_type = get_forecast_type(forecast)
forecast_as_realizations = forecast.copy()
if input_forecast_type != "realizations":
forecast_as_realizations = convert_to_realizations(
forecast.copy(), realizations_count, ignore_ecc_bounds
)
forecast_mean, forecast_sd = get_climatological_stats(
forecast_as_realizations,
forecast_gams,
gam_features,
gam_additional_fields,
unique_site_id_key=self.unique_site_id_key,
constant_extrapolation=self.constant_extrapolation,
)
forecast_ca = CalculateClimateAnomalies(ignore_temporal_mismatch=True).process(
diagnostic_cube=forecast_as_realizations,
mean_cube=forecast_mean,
std_cube=forecast_sd,
)
# Returns parameters which describe a climate anomaly distribution.
location_parameter, scale_parameter = ApplyEMOS(
percentiles=self.percentiles
).process(
forecast=forecast_ca,
coefficients=emos_coefficients,
additional_fields=emos_additional_fields,
prob_template=prob_template,
realizations_count=realizations_count,
ignore_ecc_bounds=ignore_ecc_bounds,
tolerate_time_mismatch=tolerate_time_mismatch,
predictor=predictor,
randomise=randomise,
random_seed=random_seed,
return_parameters=True,
)
truth_mean, truth_sd = get_climatological_stats(
forecast_as_realizations,
truth_gams,
gam_features,
gam_additional_fields,
unique_site_id_key=self.unique_site_id_key,
constant_extrapolation=self.constant_extrapolation,
)
# Transform location and scale parameters to be in the same units as the
# original forecast.
self.transform_anomalies_to_original_units(
truth_mean=truth_mean,
truth_sd=truth_sd,
location_parameter=location_parameter,
scale_parameter=scale_parameter,
forecast=forecast,
input_forecast_type=input_forecast_type,
)
# Use the GAM distribution for the truths to determine the distribution to use
# for the calibrated forecast.
distribution = truth_gams[0].distribution.get_params(deep=True)["_name"]
distribution = PYGAM_DISTRIBUTION_MAPPING[distribution]
shape, location, scale = DistributionalParameters(
distribution=distribution,
truncation_points=(
get_attribute_from_coefficients(
emos_coefficients, "shape_parameters", optional=True
)
),
).process(
mean_cube=location_parameter,
sd_cube=scale_parameter,
)
# Generate output in desired format from distribution.
self.distribution = {
"name": distribution,
"location": location,
"scale": scale,
"shape": shape,
}
template = prob_template if prob_template else forecast
result = generate_forecast_from_distribution(
self.distribution, template, self.percentiles, randomise, random_seed
)
if input_forecast_type != "probabilities" and not prob_template:
# Enforce that the result is within sensible bounds.
bounds_pairing = get_bounds_of_distribution(
bounds_pairing_key=result.name(), desired_units=result.units
)
result.data = np.clip(
result.data, a_min=bounds_pairing[0], a_max=bounds_pairing[1]
)
# Enforce correct dtype.
result.data = result.data.astype(dtype=np.float32)
return result