# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Module containing quantile mapping bias correction.
Quantile mapping is a statistical calibration technique that adjusts forecast
values to match the distribution of reference data (i.e. observations or a differently
processed reference forecast). It works by:
1. Finding each forecast value's position (quantile) in the forecast distribution
2. Mapping that quantile to the corresponding value in the reference distribution
This corrects systematic biases while preserving spatial patterns.
"""
from typing import Literal, Optional, Tuple
import numpy as np
from iris.cube import Cube
from improver import PostProcessingPlugin
[docs]
class QuantileMapping(PostProcessingPlugin):
"""Apply quantile mapping bias correction to forecast data."""
[docs]
def __init__(
self,
preservation_threshold: Optional[float] = None,
method: Literal["step", "continuous"] = "step",
) -> None:
"""Initialize the quantile mapping plugin.
Args:
preservation_threshold:
Optional threshold value below which (exclusive) the forecast
values are not adjusted to be like the reference. Useful for
variables such as precipitation, where a user may be wary of mapping
0mm/hr precipitation values to non-zero values.
method:
Choose from two methods of converting forecast values into quantiles
before mapping them onto the reference distribution: 'step' and
'continuous'. These methods differ in three ways:
1. How quantiles are assigned to ranked data ('plotting positions').
- 'step' uses rank/number of points (i/n), which corresponds to the
formal ECDF definition and treats the largest value as the 1.0
quantile (100th percentile).
- 'continuous' uses midpoint plotting positions ((i-0.5)/n), which
place values in the centre of their rank intervals and avoids
probabilities of exactly 0 or 1.
2. How probabilities are mapped back to values.
- 'step' uses flooring, so each probability maps to the nearest
lower observed value in the reference distribution, creating the
step-function mapping.
- 'continuous' uses interpolation, creating a smoother mapping where
small changes in probability lead to small changes in value.
3. How repeated values are treated.
- 'step' assigns the same quantile to repeated values, so they all
map to the same value in the reference distribution (creating flat
steps in the mapping).
- 'continuous' assigns different quantiles to repeated values,
spreading them evenly across their range, so they can map to
different values in the reference distribution.
Example
--------
With the following reference and forecast data (totalling 11 points
in each array), the two methods would produce their output as
illustrated below:
forecast = np.array([0, 0, 0, 0, 0, 0, 0, 0, 10, 20, 30])
reference = np.array([0, 0, 0, 0, 0, 0, 0, 10, 20, 40, 50])
num_points = 11
---- Step method ----
1. The forecast data are sorted.
[0, 0, 0, 0, 0, 0, 0, 0, 10, 20, 30]
2. ECDF quantiles are assigned using i/n, where i is the number of
values less than or equal to each value.
ECDF counts:
[8, 8, 8, 8, 8, 8, 8, 8, 9, 10, 11]
Quantiles:
[8/11, 8/11, 8/11, 8/11, 8/11, 8/11, 8/11, 8/11, 9/11, 10/11,
11/11]
≈ [0.727, ..., 0.727, 0.818, 0.909, 1.0]
3. These quantiles are mapped to the reference distribution using a
stepwise empirical quantile mapping. Each probability is mapped to
the reference value at the right edge of the corresponding ECDF step
(i.e. the smallest reference value whose empirical cumulative
probability is less than or equal to the given probability),
yielding:
[10, 10, 10, 10, 10, 10, 10, 10, 20, 40, 50]
---- Continuous method ----
1. The forecast data are ranked using a stable sorting algorithm
(np.argsort with kind='mergesort'), which assigns ranks in order of
appearance for repeated values:
Ranks:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
2. Midpoint quantiles are assigned using (rank - 0.5) / n:
Quantiles:
[0.045, 0.136, 0.227, 0.318, 0.409, 0.500,
0.591, 0.682, 0.773, 0.864, 0.955]
3. These quantiles are mapped to the reference distribution using
linear interpolation between reference quantiles, yielding
smoothly varying mapped values. Repeated forecast values may
therefore map to different reference values rather than
collapsing to a single step.
[0, 0, 0, 0, 0, 0, 0, 10, 20, 40, 50]
Note: Due to statistical convention, the 'step' method uses standard
plotting positions (i/n), rather than (i/(n+1)). The consequences of
this choice are that the quantiles assigned to the forecast data
will be asymmetrically distributed: i/n produces quantiles ranging
from 1/n to 1, so probabilities are shifted upwards, especially near
the top end of the distribution. While the discrepancies are large
for small datasets (e.g. for n=5, quantiles are [0.2, 0.4, 0.6, 0.8,
1.0] vs [0.16666667, 0.33333333, 0.50000000, 0.66666667,
0.83333333]),the differences become negligible for larger datasets
(e.g. for n=1000,quantiles are [0.001, 0.002, ..., 0.999, 1.0] vs
[0.0005, 0.0015, ..., 0.9985, 0.9995]).
Raises:
ValueError:
If an unsupported method is specified.
"""
self.preservation_threshold = preservation_threshold
method = method.lower()
if method not in ["step", "continuous"]:
raise ValueError(
f"Unsupported method '{method}'. Choose 'step' or 'continuous'."
)
self.method = method
[docs]
def _plotting_positions(self, num_points: int) -> np.ndarray:
"""
Return plotting positions for a sorted sample of size n.
The plotting positions determine the quantiles assigned to each data point
when building the empirical CDF.
"""
i = np.arange(1, num_points + 1)
if self.method == "step": # Standard plotting positions
return i / num_points
else: # self.method == "continuous"
return (i - 0.5) / num_points # Midpoint plotting positions
[docs]
def _build_empirical_cdf(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Build ECDF components (sorted values and quantiles).
Args:
data:
1D array of input data values.
Returns:
Tuple of (sorted_values, quantiles) representing the empirical CDF.
"""
sorted_values = np.sort(data)
quantiles = self._plotting_positions(sorted_values.size)
return sorted_values, quantiles
[docs]
def _forecast_to_quantiles(self, forecast_data: np.ndarray) -> np.ndarray:
"""Assign a quantile to each value in the forecast data based on its rank in the
forecast distribution.
Args:
forecast_data:
1D array of forecast values.
Returns:
1D array of quantiles corresponding to each forecast value.
"""
num_points = forecast_data.size
if self.method == "step":
# Value -> quantile mapping. Repeated values collapse to the same
# (right-most) quantile, creating a step-function mapping.
sorted_values, quantiles = self._build_empirical_cdf(forecast_data)
return np.interp(forecast_data, sorted_values, quantiles)
else: # self.method == 'continuous'
# Rank-based quantiles: repeated values get their own quantiles spread
# evenly across their range.
order = np.argsort(forecast_data, kind="mergesort")
ranks = np.empty_like(order)
ranks[order] = np.arange(1, num_points + 1)
# Convert ranks to midpoint quantiles
return (ranks - 0.5) / num_points
[docs]
def _inverted_cdf(
self,
reference_data: np.ndarray,
quantiles: np.ndarray,
) -> np.ndarray:
"""Get distribution values at specified quantiles (discrete step method).
Uses floored index lookup, rounding each quantile down to the nearest
available data point. This creates a step-function mapping that's faster
but less smooth than interpolation.
Taken from:
https://github.com/ecmwf-projects/ibicus/blob/main/ibicus/utils/_math_utils.py
Args:
reference_data:
1D array of data values defining the reference distribution.
quantiles:
Quantiles to evaluate (values between 0 and 1).
Returns:
Values from the reference data corresponding to the requested quantiles.
"""
sorted_reference = np.sort(reference_data)
num_points = sorted_reference.size
if self.method == "step":
idx = np.floor((num_points - 1) * quantiles).astype(np.int32)
idx = np.clip(idx, 0, num_points - 1) # Ensure indices are within bounds
return sorted_reference[idx]
else: # self.method == 'continuous'
quantiles_reference = self._plotting_positions(num_points)
return np.interp(quantiles, quantiles_reference, sorted_reference)
[docs]
def _map_quantiles(
self,
reference_data: np.ndarray,
forecast_data: np.ndarray,
) -> np.ndarray:
"""Transform forecast values to match the reference distribution.
Behaviour depends on the self.method (see __init__).
For each forecast value:
1. Find its quantile position in the forecast distribution
2. Map that quantile to the corresponding value in the reference distribution
using the specified method (step or continuous).
Examples:
- Discrete
If reference_data is [10, 20, 30, 40, 50] and forecast_data
is [20, 25, 30, 35, 40], the forecast values are mapped to the corresponding
values in the reference data distribution. This stretches the range of the
forecast data, shifting the extreme values by 10 units in opposing
directions. The median value is left unchanged as the two distributions are
aligned at this point. The inter-quartile values are each shifted by 5 units
in opposing directions, again reflecting the broader distribution found in
the reference data.
- Continuous
Using the same reference and forecast data as above, the continuous method
would produce a smoother mapping. The extreme values would still be shifted
by 10 units, but the intermediate values would be adjusted more gradually.
The median value would still be unchanged, but the inter-quartile values
would be shifted by less than 5 units, reflecting the more continuous nature
of the mapping.
Args:
reference_data:
Target distribution (observations or a differently processed forecast).
forecast_data:
Source distribution (biased forecasts to correct).
Returns:
Bias-corrected forecast values matching the reference distribution.
"""
# Convert forecast values to quantiles in the forecast distribution
forecast_quantiles = self._forecast_to_quantiles(forecast_data)
# Map forecast quantiles to values in the reference distribution
corrected_values = self._inverted_cdf(reference_data, forecast_quantiles)
return corrected_values
[docs]
@staticmethod
def _convert_reference_cube_to_forecast_units(
reference_cube: Cube,
forecast_cube: Cube,
) -> tuple[Cube, Cube]:
"""Ensure reference cube uses the same units as forecast cube.
Args:
reference_cube:
The reference data cube.
forecast_cube:
The forecast data cube.
Returns:
Tuple of (reference_cube, forecast_cube) with matching units.
Raises:
ValueError: If units are incompatible and cannot be converted.
"""
target_units = forecast_cube.units
reference_cube = reference_cube.copy()
# Convert reference_cube to target_units if needed
if reference_cube.units != target_units:
try:
reference_cube.convert_units(target_units)
except ValueError:
raise ValueError(
f"Cannot convert cube with units {reference_cube.units} "
f"to target units {target_units}"
)
return (reference_cube, forecast_cube)
[docs]
def _process_masked_data(
self,
reference_cube: Cube,
forecast_cube: Cube,
) -> np.ndarray:
"""Apply quantile mapping while properly handling masked data.
Masked values in reference_cube are excluded from CDF calculations.
Forecast mask is preserved in output to avoid filling intentionally masked
locations.
Args:
reference_cube:
The reference cube (with units already converted).
forecast_cube:
The forecast cube to calibrate.
Returns:
corrected_data_flat:
1D array with corrected values (forecast mask preserved).
"""
# Flatten data
reference_data_flat = reference_cube.data.flatten()
forecast_data_flat = forecast_cube.data.flatten()
# Extract valid (non-masked) data for quantile mapping
if np.ma.is_masked(reference_data_flat):
reference_valid = reference_data_flat.compressed()
else:
reference_valid = reference_data_flat
if np.ma.is_masked(forecast_data_flat):
forecast_mask = np.ma.getmaskarray(forecast_data_flat)
forecast_valid = forecast_data_flat.compressed()
# Apply quantile mapping to valid data only
corrected_valid = self._map_quantiles(reference_valid, forecast_valid)
# Reconstruct with forecast mask preserved
corrected_values_flat = np.empty_like(forecast_data_flat.data)
corrected_values_flat[~forecast_mask] = corrected_valid
corrected_values_flat = np.ma.masked_array(
corrected_values_flat, mask=forecast_mask
)
else:
# No forecast masking - apply directly
corrected_values_flat = self._map_quantiles(
reference_valid, forecast_data_flat
)
return corrected_values_flat
[docs]
def _apply_preservation_threshold(
self, output_cube: Cube, forecast_cube: Cube
) -> None:
"""Preserve original values below preservation threshold.
Modifies output_cube.data in-place.
Args:
output_cube:
The cube with calibrated data to modify.
forecast_cube:
The original forecast cube with values to preserve.
"""
if self.preservation_threshold is None:
return
mask_below_threshold = np.ma.less(
forecast_cube.data, self.preservation_threshold
)
# np.ma.where works for both masked and non-masked arrays
output_cube.data = np.ma.where(
mask_below_threshold, forecast_cube.data, output_cube.data
)
[docs]
def _finalise_output_cube(
self, corrected_values_flat: np.ndarray, forecast_cube: Cube, output_cube: Cube
) -> None:
"""Make final adjustments to output cube metadata and data type.
Args:
corrected_values_flat:
1D array of corrected values to reshape and insert into output cube.
forecast_cube:
The original forecast cube, used to determine the shape and for
preservation threshold.
output_cube:
The cube to finalize.
output_mask:
The mask to apply to the output cube, or None if no masking is needed.
"""
# Reshape corrected data to match original shape and set data type to float32
if corrected_values_flat.dtype != np.float32:
corrected_values_flat = corrected_values_flat.astype(np.float32)
output_cube.data = np.reshape(corrected_values_flat, forecast_cube.shape)
# Preserve low values if threshold is set, modifying in-place
self._apply_preservation_threshold(output_cube, forecast_cube)
[docs]
def process(
self,
reference_cube: Cube,
forecast_cube: Cube,
) -> Cube:
"""Adjust forecast values to match the statistical distribution of reference
data.
This calibration method corrects biases in forecast data by transforming its
values to follow the same distribution as a reference dataset.
Unlike grid-point methods that match values at each location, this approach uses
all data across the spatial domain to build the statistical distributions.
This is particularly useful when forecasts have been smoothed and you want to
restore realistic variation in the values while preserving the spatial patterns.
Uses the discrete (floor) method for quantile lookup, which maps each quantile
to the nearest available reference value, creating a step-function mapping.
Args:
reference_cube:
The reference data that define what the "correct" distribution
should look like.
forecast_cube:
The forecast data you want to correct (e.g. smoothed model output).
Returns:
Calibrated forecast cube with quantiles mapped to the reference
distribution.
Note:
The output mask is the union of the reference and forecast masks. Output
will be masked at any location where EITHER input is masked, as quantile
mapping requires valid data from both sources. This may result in the
output having more masked values than the forecast input.
"""
# Ensure both cubes use the same units
reference_cube_same_units, forecast_cube = (
self._convert_reference_cube_to_forecast_units(
reference_cube, forecast_cube
)
)
# Create output cube to preserve metadata
output_cube = forecast_cube.copy()
# Apply quantile mapping (handles masked data automatically)
corrected_values_flat = self._process_masked_data(
reference_cube_same_units, forecast_cube
)
self._finalise_output_cube(corrected_values_flat, forecast_cube, output_cube)
return output_cube