Skip to content
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/psf/black
rev: 25.1.0
rev: 26.1.0
hooks:
- id: black
language_version: python3
200 changes: 190 additions & 10 deletions pysteps/blending/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ class StepsBlendingConfig:
The calculation method of the blending weights. Options are the method
by :cite:`BPS2006` and the covariance-based method by :cite:`SPN2013`.
Defaults to bps.
timestep_start_full_nwp_weight: int, optional.
The timestep, which should be smaller than timesteps, at which a linear
transition takes place from the calculated weights to full NWP weight
(and zero extrapolation and noise weight) to ensure the blending
procedure becomes equal to the NWP forecast(s) at the last timestep
of the blending procedure. If not provided, the blending stick to the
theoretical weights provided by the chosen weights_method for a given
lead time and skill of each blending component.
conditional: bool, optional
If set to True, compute the statistics of the precipitation field
conditionally by excluding pixels where the values are below the threshold
Expand Down Expand Up @@ -291,6 +299,7 @@ class StepsBlendingConfig:
ar_order: int
velocity_perturbation_method: str | None
weights_method: str
timestep_start_full_nwp_weight: int
conditional: bool
probmatching_method: str | None
mask_method: str | None
Expand Down Expand Up @@ -407,6 +416,8 @@ class StepsBlendingState:
# Final outputs
final_blended_forecast: np.ndarray | None = None
final_blended_forecast_non_perturbed: np.ndarray | None = None
weights: np.ndarray | None = None
weights_model_only: np.ndarray | None = None

# Timing and indexing
time_prev_timestep: list[float] | None = None
Expand Down Expand Up @@ -645,7 +656,7 @@ def __blended_nowcast_main_loop(self):
def worker(j):
worker_state = copy(self.__state)
self.__determine_NWP_skill_for_next_timestep(t, j, worker_state)
self.__determine_weights_per_component(worker_state)
self.__determine_weights_per_component(t, worker_state)
self.__regress_extrapolation_and_noise_cascades(j, worker_state, t)
self.__perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep(
t, j, worker_state
Expand All @@ -665,6 +676,7 @@ def worker(j):
worker_state,
)
)

final_blended_forecast_all_members_one_timestep[j] = (
final_blended_forecast_single_member
)
Expand Down Expand Up @@ -890,6 +902,18 @@ def __check_inputs(self):
if self.__config.mask_method == "incremental":
raise ValueError("mask_method='incremental' but timestep=None")

if self.__config.timestep_start_full_nwp_weight is not None:
if self.__config.timestep_start_full_nwp_weight < 0:
raise ValueError(
"timestep_start_full_nwp_weight cannot be smaller than zero"
)

if self.__config.timestep_start_full_nwp_weight is not None:
if self.__config.timestep_start_full_nwp_weight >= self.__timesteps[-1]:
raise ValueError(
"timestep_start_full_nwp_weight cannot be the same or larger than the total number of timesteps in this forecast"
)

def __print_forecast_info(self):
"""
Print information about the forecast setup, including inputs, methods, and parameters.
Expand Down Expand Up @@ -2052,29 +2076,48 @@ def __determine_NWP_skill_for_next_timestep(self, t, j, worker_state):
axis=0,
)

def __determine_weights_per_component(self, worker_state):
def __determine_weights_per_component(self, t, worker_state):
"""
Compute blending weights for each component based on the selected method
('bps' or 'spn'). Weights are determined for both full blending and
model-only scenarios, accounting for correlations and covariance.
"""
start_smoothing_to_final_weights = False
if self.__config.timestep_start_full_nwp_weight is not None:
if t > self.__config.timestep_start_full_nwp_weight:
start_smoothing_to_final_weights = True
# Weights following the bps method. These are needed for the velocity
# weights prior to the advection step. If weights method spn is
# selected, weights will be overwritten with those weights prior to
# blending step.
# weight = [(extr_field, n_model_fields, noise), n_cascade_levels, ...]
worker_state.weights = calculate_weights_bps(
worker_state.rho_final_blended_forecast
)
if not start_smoothing_to_final_weights:
worker_state.weights = calculate_weights_bps(
worker_state.rho_final_blended_forecast
)
else:
worker_state.weights = calculate_end_weights(
previous_weights=self.__state.weights,
timestep=t,
n_timesteps=self.__timesteps[-1],
start_full_nwp_weight=self.__config.timestep_start_full_nwp_weight,
model_only=False,
)

# The model only weights
if self.__config.weights_method == "bps":
if (
self.__config.weights_method == "bps"
and not start_smoothing_to_final_weights
):
# Determine the weights of the components without the extrapolation
# cascade, in case this is no data or outside the mask.
worker_state.weights_model_only = calculate_weights_bps(
worker_state.rho_final_blended_forecast[1:, :]
)
elif self.__config.weights_method == "spn":
elif (
self.__config.weights_method == "spn"
and not start_smoothing_to_final_weights
):
# Only the weights of the components without the extrapolation
# cascade will be determined here. The full set of weights are
# determined after the extrapolation step in this method.
Expand Down Expand Up @@ -2113,13 +2156,23 @@ def __determine_weights_per_component(self, worker_state):
else:
# Same as correlation and noise is 1 - correlation
worker_state.weights_model_only = calculate_weights_bps(
worker_state.rho_final_blended_forecast[1:, :]
worker_state.rho_final_blended_forecast[1:, :],
)
elif start_smoothing_to_final_weights:
worker_state.weights_model_only = calculate_end_weights(
previous_weights=self.__state.weights_model_only,
timestep=t,
n_timesteps=self.__timesteps[-1],
start_full_nwp_weight=self.__config.timestep_start_full_nwp_weight,
model_only=True,
)
else:
raise ValueError(
"Unknown weights method %s: must be 'bps' or 'spn'"
% self.__config.weights_method
)
self.__state.weights = worker_state.weights
self.__state.weights_model_only = worker_state.weights_model_only

def __regress_extrapolation_and_noise_cascades(self, j, worker_state, t):
"""
Expand Down Expand Up @@ -2775,8 +2828,16 @@ def __blend_cascades(self, t_sub, j, worker_state):
)

# First determine the blending weights if method is spn. The
# weights for method bps have already been determined.
if self.__config.weights_method == "spn":
# weights for method bps have already been determined.'
start_smoothing_to_final_weights = False
if self.__config.timestep_start_full_nwp_weight is not None:
if t_sub >= self.__config.timestep_start_full_nwp_weight:
start_smoothing_to_final_weights = True

if (
self.__config.weights_method == "spn"
and not start_smoothing_to_final_weights
):
worker_state.weights = np.zeros(
(
cascade_stack_all_components.shape[0],
Expand All @@ -2801,6 +2862,8 @@ def __blend_cascades(self, t_sub, j, worker_state):
covariance=covariance_nwp_models,
)

self.__state.weights = worker_state.weights

# Create weights_with_noise to ensure there is always a 3D weights field, even
# if self.__config.nowcasting_method is "external_nowcast" and n_ens_members is 1.
worker_state.weights_with_noise = worker_state.weights.copy()
Expand Down Expand Up @@ -3238,6 +3301,7 @@ def forecast(
ar_order=2,
vel_pert_method="bps",
weights_method="bps",
timestep_start_full_nwp_weight=None,
conditional=False,
probmatching_method="cdf",
mask_method="incremental",
Expand Down Expand Up @@ -3381,6 +3445,14 @@ def forecast(
The calculation method of the blending weights. Options are the method
by :cite:`BPS2006` and the covariance-based method by :cite:`SPN2013`.
Defaults to bps.
timestep_start_full_nwp_weight: int, optional.
The timestep, which should be smaller than timesteps, at which a linear
transition takes place from the calculated weights to full (1.0) NWP weight
(and zero extrapolation and noise weight) to ensure the blending
procedure becomes equal to the NWP forecast(s) at the last timestep
of the blending procedure. If not provided, the blending stick to the
theoretical weights provided by the chosen weights_method for a given
lead time and skill of each blending component.
conditional: bool, optional
If set to True, compute the statistics of the precipitation field
conditionally by excluding pixels where the values are below the threshold
Expand Down Expand Up @@ -3570,6 +3642,7 @@ def forecast(
ar_order=ar_order,
velocity_perturbation_method=vel_pert_method,
weights_method=weights_method,
timestep_start_full_nwp_weight=timestep_start_full_nwp_weight,
conditional=conditional,
probmatching_method=probmatching_method,
mask_method=mask_method,
Expand Down Expand Up @@ -3679,6 +3752,7 @@ def calculate_weights_bps(correlations):
# total_ratios: [scale, ...] - the denominator of eq. 11 & 12 in BPS2006
weights = correlations * np.sqrt(ratios / total_ratios)
# weights: [component, scale, ...]

# Calculate the weight of the noise component.
# Original BPS2006 method in the following two lines (eq. 13)
total_square_weights = np.sum(np.square(weights), axis=0)
Expand Down Expand Up @@ -3780,6 +3854,112 @@ def calculate_weights_spn(correlations, covariance):
return weights


# TODO: Where does this piece of code best fit: in utils or inside the class?
def calculate_end_weights(
previous_weights, timestep, n_timesteps, start_full_nwp_weight, model_only=False
):
"""Calculate the linear transition from the previous weights to the final weights
(1.0 for NWP and 0.0 for the extrapolation and noise components). This method uses
the BPS weights determination method to determine the corresponding noise.

Parameters
----------
previous_weights : array-like
The weights from the previous timestep. This weight will be used to ensure
a linear transition takes place from the last weights at the timestep of
start_full_nwp_weight and the final weights (1.0 for NWP and 0.0 for
the extrapolation and noise components).
timestep : int
The timestep or sub timestep for which the weight is calculated. Only
used when start_full_nwp_weight is not None.
n_timesteps: int
The total number of forecast timesteps in the forecast.
start_full_nwp_weight : int
The timestep, which should be smaller than timesteps, at which a linear
transition takes place from the calculated weights to full NWP weight
(and zero extrapolation and noise weight) to ensure the blending
procedure becomes equal to the NWP forecast(s) at the last timestep
of the blending procedure. If not provided, the blending stick to the
theoretical weights provided by the chosen weights_method for a given
lead time and skill of each blending component.
model_only : bool
If set to True, the weights will only be determined for the model and
noise components.

Returns
-------
weights : array-like
Array of shape [component+1, scale_level, ...]
containing the weights to be used in STEPS blending for
each original component plus an addtional noise component, scale level,
and optionally along [y, x] dimensions.

References
----------
:cite:`BPS2006`

Notes
-----
The weights in the BPS method can sum op to more than 1.0.
"""
weights = previous_weights[:-1, :].copy()
if not model_only:
if timestep > start_full_nwp_weight and timestep < n_timesteps:
weights[0, :] = weights[0, :] - (
(timestep - start_full_nwp_weight)
/ (n_timesteps - start_full_nwp_weight)
* weights[0, :]
)
weights[1:, :] = (
1.0
/ weights[1:, :].shape[0]
* (
weights[1:, :]
+ (
(timestep - start_full_nwp_weight)
/ (n_timesteps - start_full_nwp_weight)
* (1.0 - weights[1:, :])
)
)
)
elif timestep > start_full_nwp_weight and timestep == n_timesteps:
weights[0, :] = 0.0
# If one model or model member is provided to blend together,
# the weight equals 1.0, otherwise the sum of the weights
# equals 1.0.
weights[1:, :] = 1.0 / weights[1:, :].shape[0]

else:
if timestep > start_full_nwp_weight and timestep < n_timesteps:
weights = (
1.0
/ weights.shape[0]
* (
weights
+ (
(timestep - start_full_nwp_weight)
/ (n_timesteps - start_full_nwp_weight)
* (1.0 - weights)
)
)
)
elif timestep > start_full_nwp_weight and timestep == n_timesteps:
weights[:] = 1.0 / weights.shape[0]

if weights.shape[0] > 1:
# Calculate the weight of the noise component.
# Original BPS2006 method in the following two lines (eq. 13)
total_square_weights = np.sum(np.square(weights), axis=0)
noise_weight = np.sqrt(1.0 - total_square_weights)
# Finally, add the noise_weights to the weights variable.
weights = np.concatenate((weights, noise_weight[None, ...]), axis=0)
else:
noise_weight = 1.0 - weights
weights = np.concatenate((weights, noise_weight), axis=0)

return weights


# TODO: Where does this piece of code best fit: in utils or inside the class?
def blend_means_sigmas(means, sigmas, weights):
"""Calculate the blended means and sigmas, the normalization parameters
Expand Down
Loading