# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Plugin for adding stochastic noise to a cube using Short-Space Fourier Transform
(SSFT).
"""
import os
import warnings
from typing import Optional
import numpy as np
from dask import compute, delayed
from iris.cube import Cube
from improver import BasePlugin
from improver.utilities.cube_checker import validate_cube_dimensions
[docs]
class StochasticNoise(BasePlugin):
"""Class to apply spatially-structured stochastic noise to non-positive regions of a
field, building on the Short-Space Fourier Transform (SSFT) approach from
Nerini et al. (2017).
This plugin is intended for use with positive zero-bounded diagnostics only, and is
a particularly useful tool for Ensemble Copula Coupling-Quantile (ECC-Q) realization
generation. While EEC-Q is used to improve the accuracy of forecasts by calibrating
ensemble members to better represent the true distribution of the forecast variable,
the rank-based reordering (sorting) of ensemble members at each grid point can lead
to unrealistic individual members (e.g. single-pixel precipitation artifacts) when
multiple raw ensemble members have identical values ('ties') of zero (very common
in precipitation forecasts) and the post-processed calibrated probabilities
indicate a non-zero value should occur. By adding spatially-structured noise to
break ties in these non-positive regions, more realistic spatial structures can be
generated in the final ECC-Q realizations, while still respecting the calibrated
probabilities.
"""
[docs]
def __init__(
self,
ssft_init_params: Optional[dict] = None,
ssft_generate_params: Optional[dict] = None,
db_threshold: float = 0.03,
db_threshold_units: str = "mm/hr",
num_workers: Optional[int] = len(os.sched_getaffinity(0)),
scale_non_positive_noise: bool = False,
allow_seeded_parallel_processing: bool = False,
arbitrary_offset: float = 5.0,
):
"""
Initialise the plugin.
If ssft_init_params or ssft_generate_params are not provided, default values
from the Pysteps documentation will be used.
Args:
ssft_init_params:
Keyword arguments for initializing SSFT filter using
pysteps.noise.fftgenerators.initialize_nonparam_2d_ssft_filter.
Default is an empty dict, which will use the pysteps defaults.
ssft_generate_params:
Keyword arguments for generating stochastic noise using
pysteps.noise.fftgenerators.generate_noise_2d_ssft_filter.
Default is an empty dict, which will use the pysteps defaults.
db_threshold:
Threshold value below which data will be set to a constant in dB scale
to avoid issues with log(0). Value provided in units of
`db_threshold_units`.
Default is 0.03 mm/hr.
db_threshold_units:
Units of the db_threshold value. Default is "mm/hr".
num_workers:
Number of worker threads for parallel FFT computation.
If not specified, uses the smaller of the plugin's default (number of
available CPUs) or the number of realizations in the input cube.
scale_non_positive_noise:
If True, noise in non-positive regions (where template.data <= 0) will
be scaled such that the maximum noise value in those regions is zero and
all other noise values are negative. This prevents the addition of
positive noise to non-positive regions, which could artificially
increase values where the input cube indicates no signal should occur.
Default is False.
allow_seeded_parallel_processing:
If True, allows multiple workers to be used even when a seed is
provided in ssft_generate_params. This may improve computation speed,
but can introduce run-to-run variation because pySTEPS uses global RNG
seeding. If False, seeded runs are forced to a single worker for
reproducibility. Default is False.
arbitrary_offset:
An arbitrary offset value to add to the dB values of sub-threshold
pixels. This is used to ensure that all sub-threshold pixels have a
distinct value in dB space, which allows them to be handled
appropriately in the _from_dB method. The default value of 5 was chosen
to provide a clear separation from the threshold value in dB space, but
can be adjusted if needed.
Raises:
ValueError:
If db_threshold is not a positive value.
Example dictionaries for initializing and generating SSFT filter::
ssft_init_params = {"win_size": (100, 100), "overlap": 0.3, "war_thr": 0.1}
ssft_generate_params = {"overlap": 0.3, "seed": 0}
See Pysteps documentation for further keyword arguments.
"""
if db_threshold <= 0:
raise ValueError("db_threshold must be a positive value.")
self.ssft_init_params = ssft_init_params or {}
self.ssft_generate_params = ssft_generate_params or {}
self.db_threshold = db_threshold
self.db_threshold_units = db_threshold_units
self.num_workers = num_workers
self.scale_non_positive_noise = scale_non_positive_noise
self.allow_seeded_parallel_processing = allow_seeded_parallel_processing
self.arbitrary_offset = arbitrary_offset
[docs]
def _to_dB(self, cube: Cube) -> Cube:
"""Convert cube data to dB scale and apply thresholding using db_threshold
specified in the plugin initialization.
Function based on dB_transform function (with arg inverse=False) from
https://github.com/pySTEPS/pysteps/blob/master/pysteps/utils/transformation.py.
Args:
cube:
Cube containing data to be converted to dB scale.
Returns:
Cube with data converted from linear scale to dB scale.
"""
threshold_dB = 10.0 * np.log10(self.db_threshold)
mask = cube.data < self.db_threshold
cube.data[~mask] = 10.0 * np.log10(cube.data[~mask])
# The below offsets sub-threshold values. The choice to subtract 5 is arbitrary,
# and ensures masked values have a distinct value, which is later handled in
# _from_dB by setting values below the threshold to zero.
cube.data[mask] = (
threshold_dB - self.arbitrary_offset
) # Offset sub-threshold values
return cube
[docs]
def _from_dB(
self,
data: np.ndarray,
) -> np.ndarray:
"""Convert cube data from dB scale with thresholding.
Function based on dB_transform function (with arg inverse=True) from
https://github.com/pySTEPS/pysteps/blob/master/pysteps/utils/transformation.py.
Args:
data:
data in dB scale.
Returns:
np.ndarray with data converted from dB scale to original scale.
Note: After conversion to original scale, values below the threshold
are set to zero.
"""
linear = 10 ** (data / 10.0)
linear[linear < self.db_threshold] = 0.0
return linear
[docs]
def do_fft(
self,
data: np.ndarray,
) -> np.ndarray:
"""
Generate stochastic noise using SSFT for a 2-D array slice (one realization).
Args:
data:
2D array for which stochastic noise is to be added.
Returns:
np.ndarray:
2D array of generated stochastic noise.
"""
from pysteps.noise.fftgenerators import (
generate_noise_2d_ssft_filter,
initialize_nonparam_2d_ssft_filter,
)
nonparametric_filter = initialize_nonparam_2d_ssft_filter(
data,
**self.ssft_init_params,
)
stochastic_noise = generate_noise_2d_ssft_filter(
nonparametric_filter, **self.ssft_generate_params
)
return stochastic_noise
[docs]
def process(self, input_cube: Cube) -> Cube:
"""
Add locally-conditioned stochastic noise to a cube object using Short-Space
Fourier Transform (SSFT).
Args:
input_cube:
Cube to which stochastic noise will be added.
Returns:
Cube with added stochastic noise.
Warnings:
UserWarning:
If a seed is provided in ssft_generate_params and allow_seeded_parallel_processing
is True, a warning is raised to indicate that using multiple workers
with a fixed seed may introduce run-to-run variation because pySTEPS
uses global RNG seeding.
"""
# Check that input cube has the expected dimensions for processing
validate_cube_dimensions(
cube=input_cube,
required_dimensions=["realization", "x", "y"],
exact_match=True,
)
# Store original cube units and mask
original_units = input_cube.units
original_mask = None
if np.ma.isMaskedArray(input_cube.data):
original_mask = input_cube.data.mask.copy()
# Convert to db_threshold_units for processing
template = input_cube.copy()
template.convert_units(self.db_threshold_units)
# Fill masked values with 0 for processing (dask does not support native numpy
# masked arrays)
if np.ma.isMaskedArray(template.data):
template.data = np.ma.filled(template.data, 0.0).astype(np.float32)
# Identify non-positive regions where noise should be added
non_positive_mask = template.data <= 0
# If no non-positive values, return input unchanged (output would be
# unchanged with SSFT noise addition only to non-positive regions)
if not np.any(non_positive_mask):
return input_cube
# Create a copy of the template in dB scale to use for SSFT processing
template_dB = self._to_dB(template.copy())
# Build delayed processing tasks for each realization
tasks = []
for slice in template_dB.slices_over("realization"):
data_slice = slice.data.astype(np.float32)
tasks.append(delayed(self.do_fft)(data_slice))
# Set number of workers for parallel processing
num_workers = min(
self.num_workers,
len(template.coord("realization").points),
)
# pySTEPS uses numpy.random.seed when a seed kwarg is passed. By default,
# restrict to a single worker in that case to avoid concurrent global RNG
# mutations and preserve reproducibility.
if (
"seed" in self.ssft_generate_params
and not self.allow_seeded_parallel_processing
):
num_workers = 1
elif (
"seed" in self.ssft_generate_params
and self.allow_seeded_parallel_processing
):
warnings.warn(
"Using multiple workers with a fixed seed may introduce run-to-run "
"variation because pySTEPS uses global RNG seeding.",
UserWarning,
)
# Compute all SSFT noise arrays (in dB scale) in parallel
results = compute(*tasks, scheduler="threads", num_workers=num_workers)
# Convert dB to linear scale
noise_linear = template.copy()
noise_linear.data = np.zeros_like(template.data, dtype=np.float32)
for k, result_db in enumerate(results):
# Guard against occasional non-finite outputs from SSFT generation on
# degenerate fields by mapping them to a sub-threshold dB value.
if not np.all(np.isfinite(result_db)):
# Repeat scaling from _to_dB to get a sub-threshold dB value for
# non-finite outputs.
sub_threshold_dB = (
10.0 * np.log10(self.db_threshold) - self.arbitrary_offset
)
result_db = np.nan_to_num(
result_db,
nan=sub_threshold_dB,
posinf=sub_threshold_dB,
neginf=sub_threshold_dB,
)
lin_noise = self._from_dB(data=result_db)
# Ensure no non-finite values propagate into downstream scaling.
lin_noise = np.nan_to_num(lin_noise, nan=0.0, posinf=0.0, neginf=0.0)
noise_linear.data[k] = lin_noise
# If requested, scale noise in non-positive regions to prevent increasing values
# where there should be no signal
if self.scale_non_positive_noise:
noise_in_non_positive_regions = np.nan_to_num(
noise_linear.data[non_positive_mask],
nan=0.0,
posinf=0.0,
neginf=0.0,
)
noise_linear.data[non_positive_mask] = (
noise_in_non_positive_regions - np.max(noise_in_non_positive_regions)
)
# Add noise only to non-positive regions, leave positive regions
# unchanged
output_cube = template.copy()
output_cube.data[non_positive_mask] = (
template.data[non_positive_mask] + noise_linear.data[non_positive_mask]
)
# Restore original mask
if original_mask is not None:
output_cube.data = np.ma.masked_array(output_cube.data, mask=original_mask)
# Convert back to original units
output_cube.convert_units(original_units)
return output_cube