From 2a408d89209c2686ff026b0868141418726e08f5 Mon Sep 17 00:00:00 2001 From: mpvginde <81358394+mpvginde@users.noreply.github.com> Date: Wed, 24 Apr 2024 20:18:02 +0200 Subject: [PATCH] Hackathon July 2023 (#351) * Fixed issue with loading older rmi radar hdf5 files * Implement no-rain check on NWP fields and implement the no-rain-radar + no-rain-nwp case * Initialisation of the noise cascade with random noise (with correct spatial correlation). * Implement no-rain check on radar fields/set default autocorrelation parameters * Avoid accidentally masking precip arrays by passing a copy to spatial_correlation() * Renormalize radar extrapolation cascade after AR(2)-evolution to not lose power * Implement optional norain_thr parameter in the pysteps blending routine. This allows some flexibility for when we consider the radar rainfall field to contain "no rain", since there will usually be some clutter present even when it's dry. The same does not hold for NWP, since there is no clutter there. This also introduces the new utility function blending.utils.check_norain * Refactoring of the extrapolation: -Precipiation and noise cascade are recomposed before extrapolation -Both fields are advected -They are decomposed again This removes the loss of power in the smallest scales due to the interpolation needed for the semilagrangian advection. In addition computational cost is reduced because the number of extrapolation is greatly reduced. * Implement no-rain-radar and rain-nwp case * refactoring and changes to take into account the maximum number of wet grid cells instead of the maximum per grid cell in NWP * make sure precip_thr is None does not give an error in _determine_max_nr_rainy_cells_nwp function * added zerovalue for filling mask after extrapolation and fixed extrapolation in case of not subtimesteps * Add tests for: - blending.utils.check_norain - load NWP - No rain cases * Clean up and checks to ensure ensemble members remain the same during multi-threading * Run black * Add missing 'seed' parameter * fix tests by removing redundant statement and add print statements --------- Co-authored-by: RubenImhoff Co-authored-by: Lesley De Cruz Co-authored-by: ned Co-authored-by: Ruben Imhoff --- pysteps/blending/steps.py | 1751 ++++++++++++++++---------- pysteps/blending/utils.py | 54 +- pysteps/io/importers.py | 5 +- pysteps/tests/test_blending_steps.py | 126 +- pysteps/tests/test_blending_utils.py | 28 +- 5 files changed, 1213 insertions(+), 751 deletions(-) diff --git a/pysteps/blending/steps.py b/pysteps/blending/steps.py index ca51c1017..e41db6a5b 100644 --- a/pysteps/blending/steps.py +++ b/pysteps/blending/steps.py @@ -8,13 +8,13 @@ consists of the following main steps: #. Set the radar rainfall fields in a Lagrangian space. - #. Initialize the noise method. #. Perform the cascade decomposition for the input radar rainfall fields. The method assumes that the cascade decomposition of the NWP model fields is already done prior to calling the function, as the NWP model fields are generally not updated with the same frequency (which is more efficient). A method to decompose and store the NWP model fields whenever a new NWP model field is present, is present in pysteps.blending.utils.decompose_NWP. + #. Initialize the noise method. #. Estimate AR parameters for the extrapolation nowcast and noise cascade. #. Initialize all the random generators. #. Calculate the initial skill of the NWP model forecasts at t=0. @@ -77,6 +77,7 @@ def forecast( n_cascade_levels=8, blend_nwp_members=False, precip_thr=None, + norain_thr=0.0, kmperpixel=None, extrap_method="semilagrangian", decomp_method="fft", @@ -161,6 +162,11 @@ def forecast( precip_thr: float, optional Specifies the threshold value for minimum observable precipitation intensity. Required if mask_method is not None or conditional is True. + norain_thr: float, optional + Specifies the threshold value for the fraction of rainy (see above) pixels + in the radar rainfall field below which we consider there to be no rain. + Depends on the amount of clutter typically present. + Standard set to 0.0 kmperpixel: float, optional Spatial resolution of the input data (kilometers/pixel). Required if vel_pert_method is not None or mask_method is 'incremental'. @@ -466,6 +472,7 @@ def forecast( if conditional or mask_method is not None: print(f"precip. intensity threshold: {precip_thr}") + print(f"no-rain fraction threshold for radar: {norain_thr}") print("") # 0.3 Get the methods that will be used @@ -507,33 +514,18 @@ def forecast( else: MASK_thr = None + # we need to know the zerovalue of precip to replace the mask when decomposing after extrapolation + zerovalue = np.nanmin(precip) + # 1. Start with the radar rainfall fields. We want the fields in a # Lagrangian space precip = _transform_to_lagrangian( precip, velocity, ar_order, xy_coords, extrapolator, extrap_kwargs, num_workers ) - # 2. Initialize the noise method - np.random.seed(seed) - pp, generate_noise, noise_std_coeffs = _init_noise( - precip, - precip_thr, - n_cascade_levels, - bp_filter, - decompositor, - fft, - noise_method, - noise_kwargs, - noise_stddev_adj, - measure_time, - num_workers, - seed, - ) - - # 3. Perform the cascade decomposition for the input precip fields and + # 2. Perform the cascade decomposition for the input precip fields and # and, if necessary, for the (NWP) model fields - - # 3.1 Compute the cascade decompositions of the input precipitation fields + # 2.1 Compute the cascade decompositions of the input precipitation fields ( precip_cascade, mu_extrapolation, @@ -550,7 +542,7 @@ def forecast( fft, ) - # 3.2 If necessary, decompose (NWP) model forecasts and stack cascades + # 2.2 If necessary, decompose (NWP) model forecasts and stack cascades ( precip_models_cascade, mu_models, @@ -560,336 +552,661 @@ def forecast( precip_models, bp_filter, decompositor, recompositor, fft, domain ) - # 4. Estimate AR parameters for the radar rainfall field - PHI = _estimate_ar_parameters_radar( - precip_cascade, ar_order, n_cascade_levels, MASK_thr + # 2.3 Check for zero input fields in the radar and NWP data. + zero_precip_radar = blending.utils.check_norain(precip, precip_thr, norain_thr) + # The norain fraction threshold used for nwp is the default value of 0.0, + # since nwp does not suffer from clutter. + zero_model_fields = blending.utils.check_norain( + precip_models_pm, precip_thr, norain_thr ) - # 5. Repeat precip_cascade for n ensemble members - # First, discard all except the p-1 last cascades because they are not needed - # for the AR(p) model - precip_cascade = [precip_cascade[i][-ar_order:] for i in range(n_cascade_levels)] - - precip_cascade = [ - [precip_cascade[j].copy() for j in range(n_cascade_levels)] - for i in range(n_ens_members) - ] - precip_cascade = np.stack(precip_cascade) - - # Also initialize the cascade of temporally correlated noise, which has the - # same shape as precip_cascade, but starts with value zero. - noise_cascade = np.zeros(precip_cascade.shape) - - # 6. Initialize all the random generators and prepare for the forecast loop - randgen_prec, vps, generate_vel_noise = _init_random_generators( - velocity, - noise_method, - vel_pert_method, - vp_par, - vp_perp, - seed, - n_ens_members, - kmperpixel, - timestep, - ) - D, D_Yn, D_pb, R_f, R_m, mask_rim, struct, fft_objs = _prepare_forecast_loop( - precip_cascade, - noise_method, - fft_method, - n_cascade_levels, - n_ens_members, - mask_method, - mask_kwargs, - timestep, - kmperpixel, - ) - - precip = precip[-1, :, :] + # 2.3.1 If precip is below the norain threshold and precip_models_pm is zero, + # we consider it as no rain in the domain. + # The forecast will directly return an array filled with the minimum + # value present in precip (which equals zero rainfall in the used + # transformation) + if zero_precip_radar and zero_model_fields: + print( + "No precipitation above the threshold found in both the radar and NWP fields" + ) + print("The resulting forecast will contain only zeros") + # Create the output list + R_f = [[] for j in range(n_ens_members)] - # 7. initizalize the current and previous extrapolation forecast scale - # for the nowcasting component - rho_extr_prev = np.repeat(1.0, PHI.shape[0]) - rho_extr = PHI[:, 0] / (1.0 - PHI[:, 1]) # phi1 / (1 - phi2), see BPS2004 + if isinstance(timesteps, int): + timesteps = range(timesteps + 1) + timestep_type = "int" + else: + original_timesteps = [0] + list(timesteps) + timesteps = nowcast_utils.binned_timesteps(original_timesteps) + timestep_type = "list" + + # Save per time step to ensure the array does not become too large if + # no return_output is requested and callback is not None. + for t, subtimestep_idx in enumerate(timesteps): + # If the timestep is not the first one, we need to provide the zero forecast + if t > 0: + # Create an empty np array with shape [n_ens_members, rows, cols] + # and fill it with the minimum value from precip (corresponding to + # zero precipitation) + R_f_ = np.full( + (n_ens_members, precip_shape[0], precip_shape[1]), np.nanmin(precip) + ) + if callback is not None: + if R_f_.shape[1] > 0: + callback(R_f_.squeeze()) + if return_output: + for j in range(n_ens_members): + R_f[j].append(R_f_[j]) - if measure_time: - init_time = time.time() - starttime_init + R_f_ = None - ### - # 8. Start the forecasting loop - ### - print("Starting blended nowcast computation.") + if measure_time: + zero_precip_time = time.time() - starttime_init - if measure_time: - starttime_mainloop = time.time() + if return_output: + outarr = np.stack([np.stack(R_f[j]) for j in range(n_ens_members)]) + if measure_time: + return outarr, zero_precip_time, zero_precip_time + else: + return outarr + else: + return None - if isinstance(timesteps, int): - timesteps = range(timesteps + 1) - timestep_type = "int" else: - original_timesteps = [0] + list(timesteps) - timesteps = nowcast_utils.binned_timesteps(original_timesteps) - timestep_type = "list" - - extrap_kwargs["return_displacement"] = True - forecast_prev = precip_cascade - noise_prev = noise_cascade - t_prev = [0.0 for j in range(n_ens_members)] - t_total = [0.0 for j in range(n_ens_members)] - - # iterate each time step - for t, subtimestep_idx in enumerate(timesteps): - if timestep_type == "list": - subtimesteps = [original_timesteps[t_] for t_ in subtimestep_idx] - else: - subtimesteps = [t] + # 2.3.3 If zero_precip_radar, make sure that precip_cascade does not contain + # only nans or infs. If so, fill it with the zero value. + if zero_precip_radar: + precip_cascade = np.nan_to_num( + precip_cascade, + copy=True, + nan=np.nanmin(precip_models_cascade), + posinf=np.nanmin(precip_models_cascade), + neginf=np.nanmin(precip_models_cascade), + ) - if (timestep_type == "list" and subtimesteps) or ( - timestep_type == "int" and t > 0 - ): - is_nowcast_time_step = True - else: - is_nowcast_time_step = False + # 2.3.4 Check if the NWP fields contain nans or infinite numbers. If so, + # fill these with the minimum value present in precip (corresponding to + # zero rainfall in the radar observations) + ( + precip_models_cascade, + precip_models_pm, + mu_models, + sigma_models, + ) = _fill_nans_infs_nwp_cascade( + precip_models_cascade, + precip_models_pm, + precip_cascade, + precip, + mu_models, + sigma_models, + ) - if is_nowcast_time_step: - print( - f"Computing nowcast for time step {t}... ", - end="", - flush=True, + # 2.3.5 If zero_precip_radar is True, only use the velocity field of the NWP + # forecast. I.e., velocity (radar) equals velocity_model at the first time + # step. + if zero_precip_radar: + # Use the velocity from velocity_models at time step 0 + velocity = velocity_models[:, 0, :, :, :] + # Take the average over the first axis, which corresponds to n_models + # (hence, the model average) + velocity = np.mean(velocity, axis=0) + + # 3. Initialize the noise method. + # If zero_precip_radar is True, initialize noise based on the NWP field time + # step where the fraction of rainy cells is highest (because other lead times + # might be zero as well). Else, initialize the noise with the radar + # rainfall data + if zero_precip_radar: + precip_noise_input = _determine_max_nr_rainy_cells_nwp( + precip_models_pm, precip_thr, precip_models_pm.shape[0], timesteps ) + # Make sure precip_noise_input is three dimensional + precip_noise_input = precip_noise_input[np.newaxis, :, :] + else: + precip_noise_input = precip.copy() - if measure_time: - starttime = time.time() + pp, generate_noise, noise_std_coeffs = _init_noise( + precip_noise_input, + precip_thr, + n_cascade_levels, + bp_filter, + decompositor, + fft, + noise_method, + noise_kwargs, + noise_stddev_adj, + measure_time, + num_workers, + seed, + ) + precip_noise_input = None - # 8.1.1 Before calling the worker for the forecast loop, determine which (NWP) - # models will be combined with which nowcast ensemble members. With the - # way it is implemented at this moment: n_ens_members of the output equals - # the maximum number of (ensemble) members in the input (either the nowcasts or NWP). - ( - precip_models_cascade_temp, - precip_models_pm_temp, - velocity_models_temp, - mu_models_temp, - sigma_models_temp, - n_model_indices, - ) = _find_nwp_combination( - precip_models_cascade[:, t, :, :, :], - precip_models_pm[:, t, :, :], - velocity_models[:, t, :, :, :], - mu_models[:, t, :], - sigma_models[:, t, :], - n_ens_members, + # 4. Estimate AR parameters for the radar rainfall field + PHI = _estimate_ar_parameters_radar( + precip_cascade, ar_order, n_cascade_levels, - blend_nwp_members, + MASK_thr, + zero_precip_radar, ) - if t == 0: - # 8.1.2 Calculate the initial skill of the (NWP) model forecasts at t=0 - rho_nwp_models = _compute_initial_nwp_skill( - precip_cascade, - precip_models_cascade_temp, - domain_mask, - issuetime, - outdir_path_skill, - clim_kwargs, - ) + # 5. Repeat precip_cascade for n ensemble members + # First, discard all except the p-1 last cascades because they are not needed + # for the AR(p) model + precip_cascade = [ + precip_cascade[i][-ar_order:] for i in range(n_cascade_levels) + ] + + precip_cascade = [ + [precip_cascade[j].copy() for j in range(n_cascade_levels)] + for i in range(n_ens_members) + ] + precip_cascade = np.stack(precip_cascade) + + # 6. Initialize all the random generators and prepare for the forecast loop + randgen_prec, vps, generate_vel_noise = _init_random_generators( + velocity, + noise_method, + vel_pert_method, + vp_par, + vp_perp, + seed, + n_ens_members, + kmperpixel, + timestep, + ) + D, D_Yn, D_pb, R_f, R_m, mask_rim, struct, fft_objs = _prepare_forecast_loop( + precip_cascade, + noise_method, + fft_method, + n_cascade_levels, + n_ens_members, + mask_method, + mask_kwargs, + timestep, + kmperpixel, + ) - if t > 0: - # 8.1.3 Determine the skill of the components for lead time (t0 + t) - # First for the extrapolation component. Only calculate it when t > 0. - ( - rho_extr, - rho_extr_prev, - ) = blending.skill_scores.lt_dependent_cor_extrapolation( - PHI=PHI, correlations=rho_extr, correlations_prev=rho_extr_prev - ) + # Also initialize the cascade of temporally correlated noise, which has the + # same shape as precip_cascade, but starts random noise. + noise_cascade, mu_noise, sigma_noise = _init_noise_cascade( + shape=precip_cascade.shape, + n_ens_members=n_ens_members, + n_cascade_levels=n_cascade_levels, + generate_noise=generate_noise, + decompositor=decompositor, + pp=pp, + randgen_prec=randgen_prec, + fft_objs=fft_objs, + bp_filter=bp_filter, + domain=domain, + noise_method=noise_method, + noise_std_coeffs=noise_std_coeffs, + ar_order=ar_order, + ) - # the nowcast iteration for each ensemble member - def worker(j): - # 8.1.2 Determine the skill of the nwp components for lead time (t0 + t) - # Then for the model components - if blend_nwp_members: - rho_nwp_fc = [ - blending.skill_scores.lt_dependent_cor_nwp( - lt=(t * int(timestep)), - correlations=rho_nwp_models[n_model], - outdir_path=outdir_path_skill, - n_model=n_model, - skill_kwargs=clim_kwargs, - ) - for n_model in range(rho_nwp_models.shape[0]) - ] - rho_nwp_fc = np.stack(rho_nwp_fc) - # Concatenate rho_extr and rho_nwp - rho_fc = np.concatenate((rho_extr[None, :], rho_nwp_fc), axis=0) + precip = precip[-1, :, :] + + # 7. initizalize the current and previous extrapolation forecast scale + # for the nowcasting component + rho_extr_prev = np.repeat(1.0, PHI.shape[0]) + rho_extr = PHI[:, 0] / (1.0 - PHI[:, 1]) # phi1 / (1 - phi2), see BPS2004 + + if measure_time: + init_time = time.time() - starttime_init + + ### + # 8. Start the forecasting loop + ### + print("Starting blended nowcast computation.") + + if measure_time: + starttime_mainloop = time.time() + + if isinstance(timesteps, int): + timesteps = range(timesteps + 1) + timestep_type = "int" + else: + original_timesteps = [0] + list(timesteps) + timesteps = nowcast_utils.binned_timesteps(original_timesteps) + timestep_type = "list" + + extrap_kwargs["return_displacement"] = True + forecast_prev = precip_cascade + noise_prev = noise_cascade + t_prev = [0.0 for j in range(n_ens_members)] + t_total = [0.0 for j in range(n_ens_members)] + + # iterate each time step + for t, subtimestep_idx in enumerate(timesteps): + if timestep_type == "list": + subtimesteps = [original_timesteps[t_] for t_ in subtimestep_idx] else: - rho_nwp_fc = blending.skill_scores.lt_dependent_cor_nwp( - lt=(t * int(timestep)), - correlations=rho_nwp_models[j], - outdir_path=outdir_path_skill, - n_model=n_model_indices[j], - skill_kwargs=clim_kwargs, - ) - # Concatenate rho_extr and rho_nwp - rho_fc = np.concatenate( - (rho_extr[None, :], rho_nwp_fc[None, :]), axis=0 - ) + subtimesteps = [t] - # 8.2 Determine the weights per component - - # Weights following the bps method. These are needed for the velocity - # weights prior to the advection step. If weights method spn is - # selected, weights will be overwritten with those weights prior to - # blending step. - # weight = [(extr_field, n_model_fields, noise), n_cascade_levels, ...] - weights = calculate_weights_bps(rho_fc) - - # The model only weights - if weights_method == "bps": - # Determine the weights of the components without the extrapolation - # cascade, in case this is no data or outside the mask. - weights_model_only = calculate_weights_bps(rho_fc[1:, :]) - elif weights_method == "spn": - # Only the weights of the components without the extrapolation - # cascade will be determined here. The full set of weights are - # determined after the extrapolation step in this method. - if blend_nwp_members and precip_models_cascade_temp.shape[0] > 1: - weights_model_only = np.zeros( - (precip_models_cascade_temp.shape[0] + 1, n_cascade_levels) - ) - for i in range(n_cascade_levels): - # Determine the normalized covariance matrix (containing) - # the cross-correlations between the models - cov = np.corrcoef( - np.stack( - [ - precip_models_cascade_temp[ - n_model, i, :, : - ].flatten() - for n_model in range( - precip_models_cascade_temp.shape[0] - ) - ] - ) - ) - # Determine the weights for this cascade level - weights_model_only[:, i] = calculate_weights_spn( - correlations=rho_fc[1:, i], cov=cov - ) - else: - # Same as correlation and noise is 1 - correlation - weights_model_only = calculate_weights_bps(rho_fc[1:, :]) + if (timestep_type == "list" and subtimesteps) or ( + timestep_type == "int" and t > 0 + ): + is_nowcast_time_step = True else: - raise ValueError( - "Unknown weights method %s: must be 'bps' or 'spn'" % weights_method + is_nowcast_time_step = False + + if is_nowcast_time_step: + print( + f"Computing nowcast for time step {t}... ", + end="", + flush=True, ) - # 8.3 Determine the noise cascade and regress this to the subsequent - # time step + regress the extrapolation component to the subsequent - # time step + if measure_time: + starttime = time.time() - # 8.3.1 Determine the epsilon, a cascade of temporally independent - # but spatially correlated noise - if noise_method is not None: - # generate noise field - EPS = generate_noise( - pp, randstate=randgen_prec[j], fft_method=fft_objs[j], domain=domain + # 8.1.1 Before calling the worker for the forecast loop, determine which (NWP) + # models will be combined with which nowcast ensemble members. With the + # way it is implemented at this moment: n_ens_members of the output equals + # the maximum number of (ensemble) members in the input (either the nowcasts or NWP). + ( + precip_models_cascade_temp, + precip_models_pm_temp, + velocity_models_temp, + mu_models_temp, + sigma_models_temp, + n_model_indices, + ) = _find_nwp_combination( + precip_models_cascade[:, t, :, :, :], + precip_models_pm[:, t, :, :], + velocity_models[:, t, :, :, :], + mu_models[:, t, :], + sigma_models[:, t, :], + n_ens_members, + ar_order, + n_cascade_levels, + blend_nwp_members, + ) + + # If zero_precip_radar is True, set the velocity field equal to the NWP + # velocity field for the current time step (velocity_models_temp). + if zero_precip_radar: + # Use the velocity from velocity_models and take the average over + # n_models (axis=0) + velocity = np.mean(velocity_models_temp, axis=0) + + if t == 0: + # 8.1.2 Calculate the initial skill of the (NWP) model forecasts at t=0 + rho_nwp_models = _compute_initial_nwp_skill( + precip_cascade, + precip_models_cascade_temp, + domain_mask, + issuetime, + outdir_path_skill, + clim_kwargs, ) - # decompose the noise field into a cascade - EPS = decompositor( - EPS, - bp_filter, - fft_method=fft_objs[j], - input_domain=domain, - output_domain=domain, - compute_stats=True, - normalize=True, - compact_output=True, + if t > 0: + # 8.1.3 Determine the skill of the components for lead time (t0 + t) + # First for the extrapolation component. Only calculate it when t > 0. + ( + rho_extr, + rho_extr_prev, + ) = blending.skill_scores.lt_dependent_cor_extrapolation( + PHI=PHI, correlations=rho_extr, correlations_prev=rho_extr_prev ) - else: - EPS = None - # 8.3.2 regress the extrapolation component to the subsequent time - # step - # iterate the AR(p) model for each cascade level - for i in range(n_cascade_levels): - # apply AR(p) process to extrapolation cascade level - if EPS is not None or vel_pert_method is not None: - precip_cascade[j][i] = autoregression.iterate_ar_model( - precip_cascade[j][i], PHI[i, :] - ) + # the nowcast iteration for each ensemble member + R_f_ = [None for _ in range(n_ens_members)] + def worker(j): + # 8.1.2 Determine the skill of the nwp components for lead time (t0 + t) + # Then for the model components + if blend_nwp_members: + rho_nwp_fc = [ + blending.skill_scores.lt_dependent_cor_nwp( + lt=(t * int(timestep)), + correlations=rho_nwp_models[n_model], + outdir_path=outdir_path_skill, + n_model=n_model, + skill_kwargs=clim_kwargs, + ) + for n_model in range(rho_nwp_models.shape[0]) + ] + rho_nwp_fc = np.stack(rho_nwp_fc) + # Concatenate rho_extr and rho_nwp + rho_fc = np.concatenate((rho_extr[None, :], rho_nwp_fc), axis=0) else: - # use the deterministic AR(p) model computed above if - # perturbations are disabled - precip_cascade[j][i] = R_m[i] + rho_nwp_fc = blending.skill_scores.lt_dependent_cor_nwp( + lt=(t * int(timestep)), + correlations=rho_nwp_models[j], + outdir_path=outdir_path_skill, + n_model=n_model_indices[j], + skill_kwargs=clim_kwargs, + ) + # Concatenate rho_extr and rho_nwp + rho_fc = np.concatenate( + (rho_extr[None, :], rho_nwp_fc[None, :]), axis=0 + ) - # 8.3.3 regress the noise component to the subsequent time step - # iterate the AR(p) model for each cascade level - for i in range(n_cascade_levels): - # normalize the noise cascade - if EPS is not None: - EPS_ = EPS["cascade_levels"][i] - EPS_ *= noise_std_coeffs[i] + # 8.2 Determine the weights per component + + # Weights following the bps method. These are needed for the velocity + # weights prior to the advection step. If weights method spn is + # selected, weights will be overwritten with those weights prior to + # blending step. + # weight = [(extr_field, n_model_fields, noise), n_cascade_levels, ...] + weights = calculate_weights_bps(rho_fc) + + # The model only weights + if weights_method == "bps": + # Determine the weights of the components without the extrapolation + # cascade, in case this is no data or outside the mask. + weights_model_only = calculate_weights_bps(rho_fc[1:, :]) + elif weights_method == "spn": + # Only the weights of the components without the extrapolation + # cascade will be determined here. The full set of weights are + # determined after the extrapolation step in this method. + if blend_nwp_members and precip_models_cascade_temp.shape[0] > 1: + weights_model_only = np.zeros( + (precip_models_cascade_temp.shape[0] + 1, n_cascade_levels) + ) + for i in range(n_cascade_levels): + # Determine the normalized covariance matrix (containing) + # the cross-correlations between the models + cov = np.corrcoef( + np.stack( + [ + precip_models_cascade_temp[ + n_model, i, :, : + ].flatten() + for n_model in range( + precip_models_cascade_temp.shape[0] + ) + ] + ) + ) + # Determine the weights for this cascade level + weights_model_only[:, i] = calculate_weights_spn( + correlations=rho_fc[1:, i], cov=cov + ) + else: + # Same as correlation and noise is 1 - correlation + weights_model_only = calculate_weights_bps(rho_fc[1:, :]) else: - EPS_ = None - # apply AR(p) process to noise cascade level - # (Returns zero noise if EPS is None) - noise_cascade[j][i] = autoregression.iterate_ar_model( - noise_cascade[j][i], PHI[i, :], eps=EPS_ - ) + raise ValueError( + "Unknown weights method %s: must be 'bps' or 'spn'" + % weights_method + ) - EPS = None - EPS_ = None + # 8.3 Determine the noise cascade and regress this to the subsequent + # time step + regress the extrapolation component to the subsequent + # time step + + # 8.3.1 Determine the epsilon, a cascade of temporally independent + # but spatially correlated noise + if noise_method is not None: + # generate noise field + EPS = generate_noise( + pp, + randstate=randgen_prec[j], + fft_method=fft_objs[j], + domain=domain, + ) + + # decompose the noise field into a cascade + EPS = decompositor( + EPS, + bp_filter, + fft_method=fft_objs[j], + input_domain=domain, + output_domain=domain, + compute_stats=True, + normalize=True, + compact_output=True, + ) + else: + EPS = None - # 8.4 Perturb and blend the advection fields + advect the - # extrapolation and noise cascade to the current time step - # (or subtimesteps if non-integer time steps are given) - - # Settings and initialize the output - extrap_kwargs_ = extrap_kwargs.copy() - extrap_kwargs_noise = extrap_kwargs.copy() - extrap_kwargs_pb = extrap_kwargs.copy() - velocity_pert = velocity - R_f_ep_out = [] - Yn_ep_out = [] - R_pm_ep = [] - - # Extrapolate per sub time step - for t_sub in subtimesteps: - if t_sub > 0: - t_diff_prev_int = t_sub - int(t_sub) - if t_diff_prev_int > 0.0: - R_f_ip = [ - (1.0 - t_diff_prev_int) * forecast_prev[j][i][-1, :] - + t_diff_prev_int * precip_cascade[j][i][-1, :] - for i in range(n_cascade_levels) - ] - Yn_ip = [ - (1.0 - t_diff_prev_int) * noise_prev[j][i][-1, :] - + t_diff_prev_int * noise_cascade[j][i][-1, :] - for i in range(n_cascade_levels) - ] + # 8.3.2 regress the extrapolation component to the subsequent time + # step + # iterate the AR(p) model for each cascade level + for i in range(n_cascade_levels): + # apply AR(p) process to extrapolation cascade level + if EPS is not None or vel_pert_method is not None: + precip_cascade[j][i] = autoregression.iterate_ar_model( + precip_cascade[j][i], PHI[i, :] + ) + # Renormalize the cascade + precip_cascade[j][i][1] /= np.std(precip_cascade[j][i][1]) + else: + # use the deterministic AR(p) model computed above if + # perturbations are disabled + precip_cascade[j][i] = R_m[i] + # 8.3.3 regress the noise component to the subsequent time step + # iterate the AR(p) model for each cascade level + for i in range(n_cascade_levels): + # normalize the noise cascade + if EPS is not None: + EPS_ = EPS["cascade_levels"][i] + EPS_ *= noise_std_coeffs[i] else: - R_f_ip = [ - forecast_prev[j][i][-1, :] for i in range(n_cascade_levels) - ] - Yn_ip = [ - noise_prev[j][i][-1, :] for i in range(n_cascade_levels) - ] + EPS_ = None + # apply AR(p) process to noise cascade level + # (Returns zero noise if EPS is None) + noise_cascade[j][i] = autoregression.iterate_ar_model( + noise_cascade[j][i], PHI[i, :], eps=EPS_ + ) + + EPS = None + EPS_ = None + + # 8.4 Perturb and blend the advection fields + advect the + # extrapolation and noise cascade to the current time step + # (or subtimesteps if non-integer time steps are given) + + # Settings and initialize the output + extrap_kwargs_ = extrap_kwargs.copy() + extrap_kwargs_noise = extrap_kwargs.copy() + extrap_kwargs_pb = extrap_kwargs.copy() + velocity_pert = velocity + R_f_ep_out = [] + Yn_ep_out = [] + R_pm_ep = [] + + # Extrapolate per sub time step + for t_sub in subtimesteps: + if t_sub > 0: + t_diff_prev_int = t_sub - int(t_sub) + if t_diff_prev_int > 0.0: + R_f_ip = [ + (1.0 - t_diff_prev_int) * forecast_prev[j][i][-1, :] + + t_diff_prev_int * precip_cascade[j][i][-1, :] + for i in range(n_cascade_levels) + ] + Yn_ip = [ + (1.0 - t_diff_prev_int) * noise_prev[j][i][-1, :] + + t_diff_prev_int * noise_cascade[j][i][-1, :] + for i in range(n_cascade_levels) + ] + + else: + R_f_ip = [ + forecast_prev[j][i][-1, :] + for i in range(n_cascade_levels) + ] + Yn_ip = [ + noise_prev[j][i][-1, :] for i in range(n_cascade_levels) + ] + + R_f_ip = np.stack(R_f_ip) + Yn_ip = np.stack(Yn_ip) + + t_diff_prev = t_sub - t_prev[j] + t_total[j] += t_diff_prev + + # compute the perturbed motion field - include the NWP + # velocities and the weights. Note that we only perturb + # the extrapolation velocity field, as the NWP velocity + # field is present per time step + if vel_pert_method is not None: + velocity_pert = velocity + generate_vel_noise( + vps[j], t_total[j] * timestep + ) - R_f_ip = np.stack(R_f_ip) - Yn_ip = np.stack(Yn_ip) + # Stack the perturbed extrapolation and the NWP velocities + if blend_nwp_members: + V_stack = np.concatenate( + ( + velocity_pert[None, :, :, :], + velocity_models_temp, + ), + axis=0, + ) + else: + V_model_ = velocity_models_temp[j] + V_stack = np.concatenate( + (velocity_pert[None, :, :, :], V_model_[None, :, :, :]), + axis=0, + ) + V_model_ = None + + # Obtain a blended optical flow, using the weights of the + # second cascade following eq. 24 in BPS2006 + velocity_blended = blending.utils.blend_optical_flows( + flows=V_stack, + weights=weights[ + :-1, 1 + ], # [(extr_field, n_model_fields), cascade_level=2] + ) - t_diff_prev = t_sub - t_prev[j] + # Extrapolate both cascades to the next time step + # First recompose the cascade, advect it and decompose it again + # This is needed to remove the interpolation artifacts. + # In addition, the number of extrapolations is greatly reduced + # A. Radar Rain + R_f_ip_recomp = blending.utils.recompose_cascade( + combined_cascade=R_f_ip, + combined_mean=mu_extrapolation, + combined_sigma=sigma_extrapolation, + ) + # Make sure we have values outside the mask + if zero_precip_radar: + R_f_ip_recomp = np.nan_to_num( + R_f_ip_recomp, + copy=True, + nan=zerovalue, + posinf=zerovalue, + neginf=zerovalue, + ) + # Put back the mask + R_f_ip_recomp[domain_mask] = np.NaN + extrap_kwargs["displacement_prev"] = D[j] + R_f_ep_recomp_, D[j] = extrapolator( + R_f_ip_recomp, + velocity_blended, + [t_diff_prev], + allow_nonfinite_values=True, + **extrap_kwargs, + ) + R_f_ep_recomp = R_f_ep_recomp_[0].copy() + temp_mask = ~np.isfinite(R_f_ep_recomp) + # TODO WHERE DO CAN I FIND THIS -15.0 + R_f_ep_recomp[~np.isfinite(R_f_ep_recomp)] = zerovalue + R_f_ep = decompositor( + R_f_ep_recomp, + bp_filter, + mask=MASK_thr, + fft_method=fft, + output_domain=domain, + normalize=True, + compute_stats=True, + compact_output=True, + )["cascade_levels"] + # Make sure we have values outside the mask + if zero_precip_radar: + R_f_ep = np.nan_to_num( + R_f_ep, + copy=True, + nan=np.nanmin(R_f_ip), + posinf=np.nanmin(R_f_ip), + neginf=np.nanmin(R_f_ip), + ) + for i in range(n_cascade_levels): + R_f_ep[i][temp_mask] = np.NaN + # B. Noise + Yn_ip_recomp = blending.utils.recompose_cascade( + combined_cascade=Yn_ip, + combined_mean=mu_noise[j], + combined_sigma=sigma_noise[j], + ) + extrap_kwargs_noise["displacement_prev"] = D_Yn[j] + extrap_kwargs_noise["map_coordinates_mode"] = "wrap" + Yn_ep_recomp_, D_Yn[j] = extrapolator( + Yn_ip_recomp, + velocity_blended, + [t_diff_prev], + allow_nonfinite_values=True, + **extrap_kwargs_noise, + ) + Yn_ep_recomp = Yn_ep_recomp_[0].copy() + Yn_ep = decompositor( + Yn_ep_recomp, + bp_filter, + mask=MASK_thr, + fft_method=fft, + output_domain=domain, + normalize=True, + compute_stats=True, + compact_output=True, + )["cascade_levels"] + for i in range(n_cascade_levels): + Yn_ep[i] *= noise_std_coeffs[i] + + # Append the results to the output lists + R_f_ep_out.append(R_f_ep.copy()) + Yn_ep_out.append(Yn_ep.copy()) + R_f_ip = None + R_f_ip_recomp = None + R_f_ep_recomp_ = None + R_f_ep_recomp = None + R_f_ep = None + Yn_ip = None + Yn_ip_recomp = None + Yn_ep_recomp_ = None + Yn_ep_recomp = None + Yn_ep = None + + # Finally, also extrapolate the initial radar rainfall + # field. This will be blended with the rainfall field(s) + # of the (NWP) model(s) for Lagrangian blended prob. matching + # min_R = np.min(precip) + extrap_kwargs_pb["displacement_prev"] = D_pb[j] + # Apply the domain mask to the extrapolation component + R_ = precip.copy() + R_[domain_mask] = np.NaN + R_pm_ep_, D_pb[j] = extrapolator( + R_, + velocity_blended, + [t_diff_prev], + allow_nonfinite_values=True, + **extrap_kwargs_pb, + ) + R_pm_ep.append(R_pm_ep_[0]) + + t_prev[j] = t_sub + + if len(R_f_ep_out) > 0: + R_f_ep_out = np.stack(R_f_ep_out) + Yn_ep_out = np.stack(Yn_ep_out) + R_pm_ep = np.stack(R_pm_ep) + + # advect the forecast field by one time step if no subtimesteps in the + # current interval were found + if not subtimesteps: + t_diff_prev = t + 1 - t_prev[j] t_total[j] += t_diff_prev # compute the perturbed motion field - include the NWP - # velocities and the weights. Note that we only perturb - # the extrapolation velocity field, as the NWP velocity - # field is present per time step + # velocities and the weights if vel_pert_method is not None: velocity_pert = velocity + generate_vel_noise( vps[j], t_total[j] * timestep @@ -898,10 +1215,7 @@ def worker(j): # Stack the perturbed extrapolation and the NWP velocities if blend_nwp_members: V_stack = np.concatenate( - ( - velocity_pert[None, :, :, :], - velocity_models_temp, - ), + (velocity_pert[None, :, :, :], velocity_models_temp), axis=0, ) else: @@ -921,410 +1235,320 @@ def worker(j): ], # [(extr_field, n_model_fields), cascade_level=2] ) - # Extrapolate both cascades to the next time step - R_f_ep = np.zeros(R_f_ip.shape) - Yn_ep = np.zeros(Yn_ip.shape) + # Extrapolate the extrapolation and noise cascade - for i in range(n_cascade_levels): - extrap_kwargs_["displacement_prev"] = D[j][i] - extrap_kwargs_noise["displacement_prev"] = D_Yn[j][i] - extrap_kwargs_noise["map_coordinates_mode"] = "wrap" - - # First, extrapolate the extrapolation component - # Apply the domain mask to the extrapolation component - R_f_ip[i][domain_mask] = np.NaN - R_f_ep_, D[j][i] = extrapolator( - R_f_ip[i], - velocity_blended, - [t_diff_prev], - allow_nonfinite_values=True, - **extrap_kwargs_, - ) - R_f_ep[i] = R_f_ep_[0] + extrap_kwargs_["displacement_prev"] = D[j] + extrap_kwargs_noise["displacement_prev"] = D_Yn[j] + extrap_kwargs_noise["map_coordinates_mode"] = "wrap" - # Then, extrapolate the noise component - Yn_ep_, D_Yn[j][i] = extrapolator( - Yn_ip[i], - velocity_blended, - [t_diff_prev], - allow_nonfinite_values=True, - **extrap_kwargs_noise, - ) - Yn_ep[i] = Yn_ep_[0] - - # Append the results to the output lists - R_f_ep_out.append(R_f_ep) - Yn_ep_out.append(Yn_ep) - R_f_ep_ = None - Yn_ep_ = None - - # Finally, also extrapolate the initial radar rainfall - # field. This will be blended with the rainfall field(s) - # of the (NWP) model(s) for Lagrangian blended prob. matching - # min_R = np.min(precip) - extrap_kwargs_pb["displacement_prev"] = D_pb[j] - # Apply the domain mask to the extrapolation component - R_ = precip.copy() - R_[domain_mask] = np.NaN - R_pm_ep_, D_pb[j] = extrapolator( - R_, + _, D[j] = extrapolator( + None, velocity_blended, [t_diff_prev], allow_nonfinite_values=True, - **extrap_kwargs_pb, - ) - R_pm_ep.append(R_pm_ep_[0]) - - t_prev[j] = t_sub - - if len(R_f_ep_out) > 0: - R_f_ep_out = np.stack(R_f_ep_out) - Yn_ep_out = np.stack(Yn_ep_out) - R_pm_ep = np.stack(R_pm_ep) - - # advect the forecast field by one time step if no subtimesteps in the - # current interval were found - if not subtimesteps: - t_diff_prev = t + 1 - t_prev[j] - t_total[j] += t_diff_prev - - # compute the perturbed motion field - include the NWP - # velocities and the weights - if vel_pert_method is not None: - velocity_pert = velocity + generate_vel_noise( - vps[j], t_total[j] * timestep - ) - - # Stack the perturbed extrapolation and the NWP velocities - if blend_nwp_members: - V_stack = np.concatenate( - (velocity_pert[None, :, :, :], velocity_models_temp), - axis=0, - ) - else: - V_model_ = velocity_models_temp[j] - V_stack = np.concatenate( - (velocity_pert[None, :, :, :], V_model_[None, :, :, :]), axis=0 + **extrap_kwargs_, ) - V_model_ = None - - # Obtain a blended optical flow, using the weights of the - # second cascade following eq. 24 in BPS2006 - velocity_blended = blending.utils.blend_optical_flows( - flows=V_stack, - weights=weights[ - :-1, 1 - ], # [(extr_field, n_model_fields), cascade_level=2] - ) - # Extrapolate the extrapolation and noise cascade - for i in range(n_cascade_levels): - extrap_kwargs_["displacement_prev"] = D[j][i] - extrap_kwargs_noise["displacement_prev"] = D_Yn[j][i] - extrap_kwargs_noise["map_coordinates_mode"] = "wrap" - - _, D[j][i] = extrapolator( + _, D_Yn[j] = extrapolator( None, velocity_blended, [t_diff_prev], allow_nonfinite_values=True, - **extrap_kwargs_, + **extrap_kwargs_noise, ) - _, D_Yn[j][i] = extrapolator( + # Also extrapolate the radar observation, used for the probability + # matching and post-processing steps + extrap_kwargs_pb["displacement_prev"] = D_pb[j] + _, D_pb[j] = extrapolator( None, velocity_blended, [t_diff_prev], allow_nonfinite_values=True, - **extrap_kwargs_noise, + **extrap_kwargs_pb, ) - # Also extrapolate the radar observation, used for the probability - # matching and post-processing steps - extrap_kwargs_pb["displacement_prev"] = D_pb[j] - _, D_pb[j] = extrapolator( - None, - velocity_blended, - [t_diff_prev], - allow_nonfinite_values=True, - **extrap_kwargs_pb, - ) - - t_prev[j] = t + 1 + t_prev[j] = t + 1 + + forecast_prev[j] = precip_cascade[j] + noise_prev[j] = noise_cascade[j] + + # 8.5 Blend the cascades + R_f_out = [] + + for t_sub in subtimesteps: + # TODO: does it make sense to use sub time steps - check if it works? + if t_sub > 0: + t_index = np.where(np.array(subtimesteps) == t_sub)[0][0] + # First concatenate the cascades and the means and sigmas + # precip_models = [n_models,timesteps,n_cascade_levels,m,n] + if blend_nwp_members: + cascades_stacked = np.concatenate( + ( + R_f_ep_out[None, t_index], + precip_models_cascade_temp, + Yn_ep_out[None, t_index], + ), + axis=0, + ) # [(extr_field, n_model_fields, noise), n_cascade_levels, ...] + means_stacked = np.concatenate( + (mu_extrapolation[None, :], mu_models_temp), axis=0 + ) + sigmas_stacked = np.concatenate( + (sigma_extrapolation[None, :], sigma_models_temp), + axis=0, + ) + else: + cascades_stacked = np.concatenate( + ( + R_f_ep_out[None, t_index], + precip_models_cascade_temp[None, j], + Yn_ep_out[None, t_index], + ), + axis=0, + ) # [(extr_field, n_model_fields, noise), n_cascade_levels, ...] + means_stacked = np.concatenate( + (mu_extrapolation[None, :], mu_models_temp[None, j]), + axis=0, + ) + sigmas_stacked = np.concatenate( + ( + sigma_extrapolation[None, :], + sigma_models_temp[None, j], + ), + axis=0, + ) - forecast_prev[j] = precip_cascade[j] + # First determine the blending weights if method is spn. The + # weights for method bps have already been determined. + if weights_method == "spn": + weights = np.zeros( + (cascades_stacked.shape[0], n_cascade_levels) + ) + for i in range(n_cascade_levels): + # Determine the normalized covariance matrix (containing) + # the cross-correlations between the models + cascades_stacked_ = np.stack( + [ + cascades_stacked[n_model, i, :, :].flatten() + for n_model in range( + cascades_stacked.shape[0] - 1 + ) + ] + ) # -1 to exclude the noise component + cov = np.ma.corrcoef( + np.ma.masked_invalid(cascades_stacked_) + ) + # Determine the weights for this cascade level + weights[:, i] = calculate_weights_spn( + correlations=rho_fc[:, i], cov=cov + ) + + # Blend the extrapolation, (NWP) model(s) and noise cascades + R_f_blended = blending.utils.blend_cascades( + cascades_norm=cascades_stacked, weights=weights + ) - # 8.5 Blend the cascades - R_f_out = [] + # Also blend the cascade without the extrapolation component + R_f_blended_mod_only = blending.utils.blend_cascades( + cascades_norm=cascades_stacked[1:, :], + weights=weights_model_only, + ) - for t_sub in subtimesteps: - # TODO: does it make sense to use sub time steps - check if it works? - if t_sub > 0: - t_index = np.where(np.array(subtimesteps) == t_sub)[0][0] - # First concatenate the cascades and the means and sigmas - # precip_models = [n_models,timesteps,n_cascade_levels,m,n] - if blend_nwp_members: - cascades_stacked = np.concatenate( - ( - R_f_ep_out[None, t_index], - precip_models_cascade_temp, - Yn_ep_out[None, t_index], - ), - axis=0, - ) # [(extr_field, n_model_fields, noise), n_cascade_levels, ...] - means_stacked = np.concatenate( - (mu_extrapolation[None, :], mu_models_temp), axis=0 + # Blend the means and standard deviations + # Input is array of shape [number_components, scale_level, ...] + means_blended, sigmas_blended = blend_means_sigmas( + means=means_stacked, sigmas=sigmas_stacked, weights=weights ) - sigmas_stacked = np.concatenate( - (sigma_extrapolation[None, :], sigma_models_temp), - axis=0, + # Also blend the means and sigmas for the cascade without extrapolation + ( + means_blended_mod_only, + sigmas_blended_mod_only, + ) = blend_means_sigmas( + means=means_stacked[1:, :], + sigmas=sigmas_stacked[1:, :], + weights=weights_model_only, ) - else: - cascades_stacked = np.concatenate( - ( - R_f_ep_out[None, t_index], - precip_models_cascade_temp[None, j], - Yn_ep_out[None, t_index], - ), - axis=0, - ) # [(extr_field, n_model_fields, noise), n_cascade_levels, ...] - means_stacked = np.concatenate( - (mu_extrapolation[None, :], mu_models_temp[None, j]), axis=0 + + # 8.6 Recompose the cascade to a precipitation field + # (The function first normalizes the blended cascade, R_f_blended + # again) + R_f_new = blending.utils.recompose_cascade( + combined_cascade=R_f_blended, + combined_mean=means_blended, + combined_sigma=sigmas_blended, ) - sigmas_stacked = np.concatenate( - (sigma_extrapolation[None, :], sigma_models_temp[None, j]), - axis=0, + # The recomposed cascade without the extrapolation (for NaN filling + # outside the radar domain) + R_f_new_mod_only = blending.utils.recompose_cascade( + combined_cascade=R_f_blended_mod_only, + combined_mean=means_blended_mod_only, + combined_sigma=sigmas_blended_mod_only, ) - - # First determine the blending weights if method is spn. The - # weights for method bps have already been determined. - if weights_method == "spn": - weights = np.zeros( - (cascades_stacked.shape[0], n_cascade_levels) + if domain == "spectral": + # TODO: Check this! (Only tested with domain == 'spatial') + R_f_new = fft_objs[j].irfft2(R_f_new) + R_f_new_mod_only = fft_objs[j].irfft2(R_f_new_mod_only) + + # 8.7 Post-processing steps - use the mask and fill no data with + # the blended NWP forecast. Probability matching following + # Lagrangian blended probability matching which uses the + # latest extrapolated radar rainfall field blended with the + # nwp model(s) rainfall forecast fields as 'benchmark'. + + # TODO: Check probability matching method + # 8.7.1 first blend the extrapolated rainfall field (the field + # that is only used for post-processing steps) with the NWP + # rainfall forecast for this time step using the weights + # at scale level 2. + weights_pm = weights[:-1, 1] # Weights without noise, level 2 + weights_pm_normalized = weights_pm / np.sum(weights_pm) + # And the weights for outside the radar domain + weights_pm_mod_only = weights_model_only[ + :-1, 1 + ] # Weights without noise, level 2 + weights_pm_normalized_mod_only = weights_pm_mod_only / np.sum( + weights_pm_mod_only ) - for i in range(n_cascade_levels): - # Determine the normalized covariance matrix (containing) - # the cross-correlations between the models - cascades_stacked_ = np.stack( - [ - cascades_stacked[n_model, i, :, :].flatten() - for n_model in range(cascades_stacked.shape[0] - 1) - ] - ) # -1 to exclude the noise component - cov = np.ma.corrcoef( - np.ma.masked_invalid(cascades_stacked_) + # Stack the fields + if blend_nwp_members: + R_pm_stacked = np.concatenate( + ( + R_pm_ep[None, t_index], + precip_models_pm_temp, + ), + axis=0, ) - # Determine the weights for this cascade level - weights[:, i] = calculate_weights_spn( - correlations=rho_fc[:, i], cov=cov + else: + R_pm_stacked = np.concatenate( + ( + R_pm_ep[None, t_index], + precip_models_pm_temp[None, j], + ), + axis=0, ) - - # Blend the extrapolation, (NWP) model(s) and noise cascades - R_f_blended = blending.utils.blend_cascades( - cascades_norm=cascades_stacked, weights=weights - ) - - # Also blend the cascade without the extrapolation component - R_f_blended_mod_only = blending.utils.blend_cascades( - cascades_norm=cascades_stacked[1:, :], - weights=weights_model_only, - ) - - # Blend the means and standard deviations - # Input is array of shape [number_components, scale_level, ...] - means_blended, sigmas_blended = blend_means_sigmas( - means=means_stacked, sigmas=sigmas_stacked, weights=weights - ) - # Also blend the means and sigmas for the cascade without extrapolation - ( - means_blended_mod_only, - sigmas_blended_mod_only, - ) = blend_means_sigmas( - means=means_stacked[1:, :], - sigmas=sigmas_stacked[1:, :], - weights=weights_model_only, - ) - - # 8.6 Recompose the cascade to a precipitation field - # (The function first normalizes the blended cascade, R_f_blended - # again) - R_f_new = blending.utils.recompose_cascade( - combined_cascade=R_f_blended, - combined_mean=means_blended, - combined_sigma=sigmas_blended, - ) - # The recomposed cascade without the extrapolation (for NaN filling - # outside the radar domain) - R_f_new_mod_only = blending.utils.recompose_cascade( - combined_cascade=R_f_blended_mod_only, - combined_mean=means_blended_mod_only, - combined_sigma=sigmas_blended_mod_only, - ) - if domain == "spectral": - # TODO: Check this! (Only tested with domain == 'spatial') - R_f_new = fft_objs[j].irfft2(R_f_new) - R_f_new_mod_only = fft_objs[j].irfft2(R_f_new_mod_only) - - # 8.7 Post-processing steps - use the mask and fill no data with - # the blended NWP forecast. Probability matching following - # Lagrangian blended probability matching which uses the - # latest extrapolated radar rainfall field blended with the - # nwp model(s) rainfall forecast fields as 'benchmark'. - - # TODO: Check probability matching method - # 8.7.1 first blend the extrapolated rainfall field (the field - # that is only used for post-processing steps) with the NWP - # rainfall forecast for this time step using the weights - # at scale level 2. - weights_pm = weights[:-1, 1] # Weights without noise, level 2 - weights_pm_normalized = weights_pm / np.sum(weights_pm) - # And the weights for outside the radar domain - weights_pm_mod_only = weights_model_only[ - :-1, 1 - ] # Weights without noise, level 2 - weights_pm_normalized_mod_only = weights_pm_mod_only / np.sum( - weights_pm_mod_only - ) - # Stack the fields - if blend_nwp_members: - R_pm_stacked = np.concatenate( - ( - R_pm_ep[None, t_index], - precip_models_pm_temp, - ), - axis=0, - ) - else: - R_pm_stacked = np.concatenate( - ( - R_pm_ep[None, t_index], - precip_models_pm_temp[None, j], - ), - axis=0, - ) - # Blend it - R_pm_blended = np.sum( - weights_pm_normalized.reshape( - weights_pm_normalized.shape[0], 1, 1 - ) - * R_pm_stacked, - axis=0, - ) - if blend_nwp_members: - R_pm_blended_mod_only = np.sum( - weights_pm_normalized_mod_only.reshape( - weights_pm_normalized_mod_only.shape[0], 1, 1 + # Blend it + R_pm_blended = np.sum( + weights_pm_normalized.reshape( + weights_pm_normalized.shape[0], 1, 1 ) - * precip_models_pm_temp, + * R_pm_stacked, axis=0, ) - else: - R_pm_blended_mod_only = precip_models_pm_temp[j] - - # The extrapolation components are NaN outside the advected - # radar domain. This results in NaN values in the blended - # forecast outside the radar domain. Therefore, fill these - # areas with the "..._mod_only" blended forecasts, consisting - # of the NWP and noise components. - nan_indices = np.isnan(R_f_new) - R_f_new[nan_indices] = R_f_new_mod_only[nan_indices] - nan_indices = np.isnan(R_pm_blended) - R_pm_blended[nan_indices] = R_pm_blended_mod_only[nan_indices] - # Finally, fill the remaining nan values, if present, with - # the minimum value in the forecast - nan_indices = np.isnan(R_f_new) - R_f_new[nan_indices] = np.nanmin(R_f_new) - nan_indices = np.isnan(R_pm_blended) - R_pm_blended[nan_indices] = np.nanmin(R_pm_blended) - - # 8.7.2. Apply the masking and prob. matching - if mask_method is not None: - # apply the precipitation mask to prevent generation of new - # precipitation into areas where it was not originally - # observed - R_cmin = R_f_new.min() - if mask_method == "incremental": - # The incremental mask is slightly different from - # the implementation in the non-blended steps.py, as - # it is not based on the last forecast, but instead - # on R_pm_blended. Therefore, the buffer does not - # increase over time. - # Get the mask for this forecast - MASK_prec = R_pm_blended >= precip_thr - # Buffer the mask - MASK_prec = _compute_incremental_mask( - MASK_prec, struct, mask_rim + if blend_nwp_members: + R_pm_blended_mod_only = np.sum( + weights_pm_normalized_mod_only.reshape( + weights_pm_normalized_mod_only.shape[0], 1, 1 + ) + * precip_models_pm_temp, + axis=0, ) - # Get the final mask - R_f_new = R_cmin + (R_f_new - R_cmin) * MASK_prec - MASK_prec_ = R_f_new > R_cmin - elif mask_method == "obs": - # The mask equals the most recent benchmark - # rainfall field - MASK_prec_ = R_pm_blended >= precip_thr - - # Set to min value outside of mask - R_f_new[~MASK_prec_] = R_cmin - - if probmatching_method == "cdf": - # adjust the CDF of the forecast to match the most recent - # benchmark rainfall field (R_pm_blended) - R_f_new = probmatching.nonparam_match_empirical_cdf( - R_f_new, R_pm_blended - ) - elif probmatching_method == "mean": - # Use R_pm_blended as benchmark field and - mu_0 = np.mean(R_pm_blended[R_pm_blended >= precip_thr]) - MASK = R_f_new >= precip_thr - mu_fct = np.mean(R_f_new[MASK]) - R_f_new[MASK] = R_f_new[MASK] - mu_fct + mu_0 - - R_f_out.append(R_f_new) - - return R_f_out - - res = [] - for j in range(n_ens_members): - if not DASK_IMPORTED or n_ens_members == 1: - res.append(worker(j)) + else: + R_pm_blended_mod_only = precip_models_pm_temp[j] + + # The extrapolation components are NaN outside the advected + # radar domain. This results in NaN values in the blended + # forecast outside the radar domain. Therefore, fill these + # areas with the "..._mod_only" blended forecasts, consisting + # of the NWP and noise components. + nan_indices = np.isnan(R_f_new) + R_f_new[nan_indices] = R_f_new_mod_only[nan_indices] + nan_indices = np.isnan(R_pm_blended) + R_pm_blended[nan_indices] = R_pm_blended_mod_only[nan_indices] + # Finally, fill the remaining nan values, if present, with + # the minimum value in the forecast + nan_indices = np.isnan(R_f_new) + R_f_new[nan_indices] = np.nanmin(R_f_new) + nan_indices = np.isnan(R_pm_blended) + R_pm_blended[nan_indices] = np.nanmin(R_pm_blended) + + # 8.7.2. Apply the masking and prob. matching + if mask_method is not None: + # apply the precipitation mask to prevent generation of new + # precipitation into areas where it was not originally + # observed + R_cmin = R_f_new.min() + if mask_method == "incremental": + # The incremental mask is slightly different from + # the implementation in the non-blended steps.py, as + # it is not based on the last forecast, but instead + # on R_pm_blended. Therefore, the buffer does not + # increase over time. + # Get the mask for this forecast + MASK_prec = R_pm_blended >= precip_thr + # Buffer the mask + MASK_prec = _compute_incremental_mask( + MASK_prec, struct, mask_rim + ) + # Get the final mask + R_f_new = R_cmin + (R_f_new - R_cmin) * MASK_prec + MASK_prec_ = R_f_new > R_cmin + elif mask_method == "obs": + # The mask equals the most recent benchmark + # rainfall field + MASK_prec_ = R_pm_blended >= precip_thr + + # Set to min value outside of mask + R_f_new[~MASK_prec_] = R_cmin + + if probmatching_method == "cdf": + # Adjust the CDF of the forecast to match the most recent + # benchmark rainfall field (R_pm_blended). If the forecast + if np.any(np.isfinite(R_f_new)): + R_f_new = probmatching.nonparam_match_empirical_cdf( + R_f_new, R_pm_blended + ) + elif probmatching_method == "mean": + # Use R_pm_blended as benchmark field and + mu_0 = np.mean(R_pm_blended[R_pm_blended >= precip_thr]) + MASK = R_f_new >= precip_thr + mu_fct = np.mean(R_f_new[MASK]) + R_f_new[MASK] = R_f_new[MASK] - mu_fct + mu_0 + + R_f_out.append(R_f_new) + + R_f_[j] = R_f_out + + res = [] + + if DASK_IMPORTED and n_ens_members > 1: + for j in range(n_ens_members): + res.append(dask.delayed(worker)(j)) + dask.compute(*res, num_workers=num_ensemble_workers) else: - res.append(dask.delayed(worker)(j)) + for j in range(n_ens_members): + worker(j) - R_f_ = ( - dask.compute(*res, num_workers=num_ensemble_workers) - if DASK_IMPORTED and n_ens_members > 1 - else res - ) - res = None + res = None - if is_nowcast_time_step: - if measure_time: - print(f"{time.time() - starttime:.2f} seconds.") - else: - print("done.") + if is_nowcast_time_step: + if measure_time: + print(f"{time.time() - starttime:.2f} seconds.") + else: + print("done.") - if callback is not None: - R_f_stacked = np.stack(R_f_) - if R_f_stacked.shape[1] > 0: - callback(R_f_stacked.squeeze()) + if callback is not None: + R_f_stacked = np.stack(R_f_) + if R_f_stacked.shape[1] > 0: + callback(R_f_stacked.squeeze()) - if return_output: - for j in range(n_ens_members): - R_f[j].extend(R_f_[j]) + if return_output: + for j in range(n_ens_members): + R_f[j].extend(R_f_[j]) - R_f_ = None + R_f_ = None - if measure_time: - mainloop_time = time.time() - starttime_mainloop - - if return_output: - outarr = np.stack([np.stack(R_f[j]) for j in range(n_ens_members)]) if measure_time: - return outarr, init_time, mainloop_time + mainloop_time = time.time() - starttime_mainloop + + if return_output: + outarr = np.stack([np.stack(R_f[j]) for j in range(n_ens_members)]) + if measure_time: + return outarr, init_time, mainloop_time + else: + return outarr else: - return outarr - else: - return None + return None def calculate_ratios(correlations): @@ -1823,13 +2047,54 @@ def _compute_cascade_decomposition_nwp( return precip_models, mu_models, sigma_models, precip_models_pm -def _estimate_ar_parameters_radar(R_c, ar_order, n_cascade_levels, MASK_thr): +def _estimate_ar_parameters_radar( + R_c, ar_order, n_cascade_levels, MASK_thr, zero_precip_radar +): """Estimate AR parameters for the radar rainfall field.""" - # compute lag-l temporal autocorrelation coefficients for each cascade level + # If there are values in the radar fields, compute the autocorrelations GAMMA = np.empty((n_cascade_levels, ar_order)) - for i in range(n_cascade_levels): - GAMMA[i, :] = correlation.temporal_autocorrelation(R_c[i], mask=MASK_thr) + if not zero_precip_radar: + # compute lag-l temporal autocorrelation coefficients for each cascade level + for i in range(n_cascade_levels): + GAMMA[i, :] = correlation.temporal_autocorrelation(R_c[i], mask=MASK_thr) + + # Else, use standard values for the autocorrelations + else: + # Get the climatological lag-1 and lag-2 autocorrelation values from Table 2 + # in `BPS2004`. + # Hard coded, change to own (climatological) values when present. + GAMMA = np.array( + [ + [0.99805, 0.9925, 0.9776, 0.9297, 0.796, 0.482, 0.079, 0.0006], + [0.9933, 0.9752, 0.923, 0.750, 0.367, 0.069, 0.0018, 0.0014], + ] + ) + + # Check whether the number of cascade_levels is correct + if GAMMA.shape[1] > n_cascade_levels: + GAMMA = GAMMA[:, 0:n_cascade_levels] + elif GAMMA.shape[1] < n_cascade_levels: + # Get the number of cascade levels that is missing + n_extra_lev = n_cascade_levels - GAMMA.shape[1] + # Append the array with correlation values of 10e-4 + GAMMA = np.append( + GAMMA, + [np.repeat(0.0006, n_extra_lev), np.repeat(0.0014, n_extra_lev)], + axis=1, + ) + + # Finally base GAMMA.shape[0] on the AR-level + if ar_order == 1: + GAMMA = GAMMA[0, :] + if ar_order > 2: + for repeat_index in range(ar_order - 2): + GAMMA = np.vstack((GAMMA, GAMMA[1, :])) + # Finally, transpose GAMMA to ensure that the shape is the same as np.empty((n_cascade_levels, ar_order)) + GAMMA = GAMMA.transpose() + assert GAMMA.shape == (n_cascade_levels, ar_order) + + # Print the GAMMA value nowcast_utils.print_corrcoefs(GAMMA) if ar_order == 2: @@ -1988,8 +2253,8 @@ def _prepare_forecast_loop( ): """Prepare for the forecast loop.""" # Empty arrays for the previous displacements and the forecast cascade - D = np.stack([np.full(n_cascade_levels, None) for j in range(n_ens_members)]) - D_Yn = np.stack([np.full(n_cascade_levels, None) for j in range(n_ens_members)]) + D = np.stack([None for j in range(n_ens_members)]) + D_Yn = np.stack([None for j in range(n_ens_members)]) D_pb = np.stack([None for j in range(n_ens_members)]) R_f = [[] for j in range(n_ens_members)] @@ -2023,8 +2288,8 @@ def _compute_initial_nwp_skill( """Calculate the initial skill of the (NWP) model forecasts at t=0.""" rho_nwp_models = [ blending.skill_scores.spatial_correlation( - obs=R_c[0, :, -1, :, :], - mod=precip_models[n_model, :, :, :], + obs=R_c[0, :, -1, :, :].copy(), + mod=precip_models[n_model, :, :, :].copy(), domain_mask=domain_mask, ) for n_model in range(precip_models.shape[0]) @@ -2046,3 +2311,121 @@ def _compute_initial_nwp_skill( **clim_kwargs, ) return rho_nwp_models + + +def _init_noise_cascade( + shape, + n_ens_members, + n_cascade_levels, + generate_noise, + decompositor, + pp, + randgen_prec, + fft_objs, + bp_filter, + domain, + noise_method, + noise_std_coeffs, + ar_order, +): + """Initialize the noise cascade with identical noise for all AR(n) steps + We also need to return the mean and standard deviations of the noise + for the recombination of the noise before advecting it. + """ + noise_cascade = np.zeros(shape) + mu_noise = np.zeros((n_ens_members, n_cascade_levels)) + sigma_noise = np.zeros((n_ens_members, n_cascade_levels)) + if noise_method: + for j in range(n_ens_members): + EPS = generate_noise( + pp, randstate=randgen_prec[j], fft_method=fft_objs[j], domain=domain + ) + EPS = decompositor( + EPS, + bp_filter, + fft_method=fft_objs[j], + input_domain=domain, + output_domain=domain, + compute_stats=True, + normalize=True, + compact_output=True, + ) + mu_noise[j] = EPS["means"] + sigma_noise[j] = EPS["stds"] + for i in range(n_cascade_levels): + EPS_ = EPS["cascade_levels"][i] + EPS_ *= noise_std_coeffs[i] + for n in range(ar_order): + noise_cascade[j][i][n] = EPS_ + EPS = None + EPS_ = None + return noise_cascade, mu_noise, sigma_noise + + +def _fill_nans_infs_nwp_cascade( + precip_models_cascade, + precip_models_pm, + precip_cascade, + precip, + mu_models, + sigma_models, +): + """Ensure that the NWP cascade and fields do no contain any nans or infinite number""" + # Fill nans and infinite numbers with the minimum value present in precip + # (corresponding to zero rainfall in the radar observations) + precip_models_cascade = np.nan_to_num( + precip_models_cascade, + copy=True, + nan=np.nanmin(precip_cascade), + posinf=np.nanmin(precip_cascade), + neginf=np.nanmin(precip_cascade), + ) + precip_models_pm = np.nan_to_num( + precip_models_pm, + copy=True, + nan=np.nanmin(precip), + posinf=np.nanmin(precip), + neginf=np.nanmin(precip), + ) + # Also set any nans or infs in the mean and sigma of the cascade to + # respectively 0.0 and 1.0 + mu_models = np.nan_to_num( + mu_models, + copy=True, + nan=0.0, + posinf=0.0, + neginf=0.0, + ) + sigma_models = np.nan_to_num( + sigma_models, + copy=True, + nan=0.0, + posinf=0.0, + neginf=0.0, + ) + + return precip_models_cascade, precip_models_pm, mu_models, sigma_models + + +def _determine_max_nr_rainy_cells_nwp( + precip_models_pm, precip_thr, n_models, timesteps +): + """Initialize noise based on the NWP field time step where the fraction of rainy cells is highest""" + if precip_thr is None: + precip_thr = np.nanmin(precip_models_pm) + + max_rain_pixels = -1 + max_rain_pixels_j = -1 + max_rain_pixels_t = -1 + for j in range(n_models): + for t in range(timesteps): + rain_pixels = precip_models_pm[j][t][ + precip_models_pm[j][t] > precip_thr + ].size + if rain_pixels > max_rain_pixels: + max_rain_pixels = rain_pixels + max_rain_pixels_j = j + max_rain_pixels_t = t + precip_noise_input = precip_models_pm[max_rain_pixels_j][max_rain_pixels_t] + + return precip_noise_input diff --git a/pysteps/blending/utils.py b/pysteps/blending/utils.py index b1bebc579..303af0bab 100644 --- a/pysteps/blending/utils.py +++ b/pysteps/blending/utils.py @@ -15,6 +15,7 @@ decompose_NWP compute_store_nwp_motion load_NWP + check_norain """ import datetime @@ -490,6 +491,13 @@ def load_NWP(input_nc_path_decomp, input_path_velocities, start_time, n_timestep assert analysis_time + start_i * timestep == start_time end_i = start_i + n_timesteps + 1 + # Check if the requested end time (the forecast horizon) is in the stored data. + # If not, raise an error + if end_i > ncf_decomp.variables["pr_decomposed"].shape[0]: + raise IndexError( + "The requested forecast horizon is outside the stored NWP forecast horizon. Either request a shorter forecast horizon or store a longer NWP forecast horizon" + ) + # Add the valid times to the output decomp_dict["valid_times"] = valid_times[start_i:end_i] @@ -502,23 +510,49 @@ def load_NWP(input_nc_path_decomp, input_path_velocities, start_time, n_timestep for i in range(start_i, end_i): decomp_dict_ = decomp_dict.copy() + # Obtain the decomposed cascades for time step i cascade_levels = ncf_decomp.variables["pr_decomposed"][i, :, :, :] - - # In the netcdf file this is saved as a masked array, so we're checking if there is no mask - assert not cascade_levels.mask - + # Obtain the mean values means = ncf_decomp.variables["means"][i, :] - assert not means.mask - + # Obtain de standard deviations stds = ncf_decomp.variables["stds"][i, :] - assert not stds.mask # Save the values in the dictionary as normal arrays with the filled method - decomp_dict_["cascade_levels"] = np.ma.filled(cascade_levels) - decomp_dict_["means"] = np.ma.filled(means) - decomp_dict_["stds"] = np.ma.filled(stds) + decomp_dict_["cascade_levels"] = np.ma.filled(cascade_levels, fill_value=np.nan) + decomp_dict_["means"] = np.ma.filled(means, fill_value=np.nan) + decomp_dict_["stds"] = np.ma.filled(stds, fill_value=np.nan) # Append the output list R_d.append(decomp_dict_) + ncf_decomp.close() return R_d, uv + + +def check_norain(precip_arr, precip_thr=None, norain_thr=0.0): + """ + + Parameters + ---------- + precip_arr: array-like + Array containing the input precipitation field + precip_thr: float, optional + Specifies the threshold value for minimum observable precipitation intensity. If None, the + minimum value over the domain is taken. + norain_thr: float, optional + Specifies the threshold value for the fraction of rainy pixels in precip_arr below which we consider there to be + no rain. Standard set to 0.0 + Returns + ------- + norain: bool + Returns whether the fraction of rainy pixels is below the norain_thr threshold. + + """ + + if precip_thr is None: + precip_thr = np.nanmin(precip_arr) + rain_pixels = precip_arr[precip_arr > precip_thr] + norain = rain_pixels.size / precip_arr.size <= norain_thr + print("Field is below no rain fraction :", norain) + print("rain fraction is :", rain_pixels.size / precip_arr.size) + return norain diff --git a/pysteps/io/importers.py b/pysteps/io/importers.py index d7a9a850e..8709e68a8 100644 --- a/pysteps/io/importers.py +++ b/pysteps/io/importers.py @@ -1374,7 +1374,10 @@ def import_odim_hdf5(filename, qty="RATE", **kwargs): raise IOError("requested quantity %s not found" % qty) where = f["where"] - proj4str = where.attrs["projdef"].decode() + if isinstance(where.attrs["projdef"], str): + proj4str = where.attrs["projdef"] + else: + proj4str = where.attrs["projdef"].decode() pr = pyproj.Proj(proj4str) ll_lat = where.attrs["LL_lat"] diff --git a/pysteps/tests/test_blending_steps.py b/pysteps/tests/test_blending_steps.py index d3246657f..3f4a0580b 100644 --- a/pysteps/tests/test_blending_steps.py +++ b/pysteps/tests/test_blending_steps.py @@ -7,6 +7,32 @@ from pysteps import cascade, blending +steps_arg_values = [ + (1, 3, 4, 8, None, None, False, "spn", True, 4, False, False), + (1, 3, 4, 8, "obs", None, False, "spn", True, 4, False, False), + (1, 3, 4, 8, "incremental", None, False, "spn", True, 4, False, False), + (1, 3, 4, 8, None, "mean", False, "spn", True, 4, False, False), + (1, 3, 4, 8, None, "cdf", False, "spn", True, 4, False, False), + (1, 3, 4, 8, "incremental", "cdf", False, "spn", True, 4, False, False), + (1, 3, 4, 6, "incremental", "cdf", False, "bps", True, 4, False, False), + (1, 3, 4, 6, "incremental", "cdf", False, "bps", False, 4, False, False), + (1, 3, 4, 9, "incremental", "cdf", False, "spn", True, 4, False, False), + (2, 3, 10, 8, "incremental", "cdf", False, "spn", True, 10, False, False), + (5, 3, 5, 8, "incremental", "cdf", False, "spn", True, 5, False, False), + (1, 10, 1, 8, "incremental", "cdf", False, "spn", True, 1, False, False), + (2, 3, 2, 8, "incremental", "cdf", True, "spn", True, 2, False, False), + (1, 3, 6, 8, None, None, False, "spn", True, 6, False, False), + # Test the case where the radar image contains no rain. + (1, 3, 6, 8, None, None, False, "spn", True, 6, True, False), + (5, 3, 5, 6, "incremental", "cdf", False, "spn", False, 5, True, False), + # Test the case where the NWP fields contain no rain. + (1, 3, 6, 8, None, None, False, "spn", True, 6, False, True), + (5, 3, 5, 6, "incremental", "cdf", False, "spn", False, 5, False, True), + # Test the case where both the radar image and the NWP fields contain no rain. + (1, 3, 6, 8, None, None, False, "spn", True, 6, True, True), + (5, 3, 5, 6, "incremental", "cdf", False, "spn", False, 5, True, True), + (5, 3, 5, 6, "obs", "mean", True, "spn", True, 5, True, True), +] steps_arg_names = ( "n_models", "n_timesteps", @@ -18,24 +44,10 @@ "weights_method", "decomposed_nwp", "expected_n_ens_members", + "zero_radar", + "zero_nwp", ) -steps_arg_values = [ - (1, 3, 4, 8, None, None, False, "spn", True, 4), - (1, 3, 4, 8, "obs", None, False, "spn", True, 4), - (1, 3, 4, 8, "incremental", None, False, "spn", True, 4), - (1, 3, 4, 8, None, "mean", False, "spn", True, 4), - (1, 3, 4, 8, None, "cdf", False, "spn", True, 4), - (1, 3, 4, 8, "incremental", "cdf", False, "spn", True, 4), - (1, 3, 4, 6, "incremental", "cdf", False, "bps", True, 4), - (1, 3, 4, 6, "incremental", "cdf", False, "bps", False, 4), - (1, 3, 4, 9, "incremental", "cdf", False, "spn", True, 4), - (2, 3, 10, 8, "incremental", "cdf", False, "spn", True, 10), - (5, 3, 5, 8, "incremental", "cdf", False, "spn", True, 5), - (1, 10, 1, 8, "incremental", "cdf", False, "spn", True, 1), - (2, 3, 2, 8, "incremental", "cdf", True, "spn", True, 2), -] - @pytest.mark.parametrize(steps_arg_names, steps_arg_values) def test_steps_blending( @@ -49,6 +61,8 @@ def test_steps_blending( weights_method, decomposed_nwp, expected_n_ens_members, + zero_radar, + zero_nwp, ): pytest.importorskip("cv2") @@ -58,49 +72,51 @@ def test_steps_blending( # Initialise dummy NWP data nwp_precip = np.zeros((n_models, n_timesteps + 1, 200, 200)) - for n_model in range(n_models): - for i in range(nwp_precip.shape[1]): - nwp_precip[n_model, i, 30:185, 30 + 1 * (i + 1) * n_model] = 0.1 - nwp_precip[n_model, i, 30:185, 31 + 1 * (i + 1) * n_model] = 0.1 - nwp_precip[n_model, i, 30:185, 32 + 1 * (i + 1) * n_model] = 1.0 - nwp_precip[n_model, i, 30:185, 33 + 1 * (i + 1) * n_model] = 5.0 - nwp_precip[n_model, i, 30:185, 34 + 1 * (i + 1) * n_model] = 5.0 - nwp_precip[n_model, i, 30:185, 35 + 1 * (i + 1) * n_model] = 4.5 - nwp_precip[n_model, i, 30:185, 36 + 1 * (i + 1) * n_model] = 4.5 - nwp_precip[n_model, i, 30:185, 37 + 1 * (i + 1) * n_model] = 4.0 - nwp_precip[n_model, i, 30:185, 38 + 1 * (i + 1) * n_model] = 2.0 - nwp_precip[n_model, i, 30:185, 39 + 1 * (i + 1) * n_model] = 1.0 - nwp_precip[n_model, i, 30:185, 40 + 1 * (i + 1) * n_model] = 0.5 - nwp_precip[n_model, i, 30:185, 41 + 1 * (i + 1) * n_model] = 0.1 + if not zero_nwp: + for n_model in range(n_models): + for i in range(nwp_precip.shape[1]): + nwp_precip[n_model, i, 30:185, 30 + 1 * (i + 1) * n_model] = 0.1 + nwp_precip[n_model, i, 30:185, 31 + 1 * (i + 1) * n_model] = 0.1 + nwp_precip[n_model, i, 30:185, 32 + 1 * (i + 1) * n_model] = 1.0 + nwp_precip[n_model, i, 30:185, 33 + 1 * (i + 1) * n_model] = 5.0 + nwp_precip[n_model, i, 30:185, 34 + 1 * (i + 1) * n_model] = 5.0 + nwp_precip[n_model, i, 30:185, 35 + 1 * (i + 1) * n_model] = 4.5 + nwp_precip[n_model, i, 30:185, 36 + 1 * (i + 1) * n_model] = 4.5 + nwp_precip[n_model, i, 30:185, 37 + 1 * (i + 1) * n_model] = 4.0 + nwp_precip[n_model, i, 30:185, 38 + 1 * (i + 1) * n_model] = 2.0 + nwp_precip[n_model, i, 30:185, 39 + 1 * (i + 1) * n_model] = 1.0 + nwp_precip[n_model, i, 30:185, 40 + 1 * (i + 1) * n_model] = 0.5 + nwp_precip[n_model, i, 30:185, 41 + 1 * (i + 1) * n_model] = 0.1 # Define dummy nowcast input data radar_precip = np.zeros((3, 200, 200)) - for i in range(2): - radar_precip[i, 5:150, 30 + 1 * i] = 0.1 - radar_precip[i, 5:150, 31 + 1 * i] = 0.5 - radar_precip[i, 5:150, 32 + 1 * i] = 0.5 - radar_precip[i, 5:150, 33 + 1 * i] = 5.0 - radar_precip[i, 5:150, 34 + 1 * i] = 5.0 - radar_precip[i, 5:150, 35 + 1 * i] = 4.5 - radar_precip[i, 5:150, 36 + 1 * i] = 4.5 - radar_precip[i, 5:150, 37 + 1 * i] = 4.0 - radar_precip[i, 5:150, 38 + 1 * i] = 1.0 - radar_precip[i, 5:150, 39 + 1 * i] = 0.5 - radar_precip[i, 5:150, 40 + 1 * i] = 0.5 - radar_precip[i, 5:150, 41 + 1 * i] = 0.1 - radar_precip[2, 30:155, 30 + 1 * 2] = 0.1 - radar_precip[2, 30:155, 31 + 1 * 2] = 0.1 - radar_precip[2, 30:155, 32 + 1 * 2] = 1.0 - radar_precip[2, 30:155, 33 + 1 * 2] = 5.0 - radar_precip[2, 30:155, 34 + 1 * 2] = 5.0 - radar_precip[2, 30:155, 35 + 1 * 2] = 4.5 - radar_precip[2, 30:155, 36 + 1 * 2] = 4.5 - radar_precip[2, 30:155, 37 + 1 * 2] = 4.0 - radar_precip[2, 30:155, 38 + 1 * 2] = 2.0 - radar_precip[2, 30:155, 39 + 1 * 2] = 1.0 - radar_precip[2, 30:155, 40 + 1 * 3] = 0.5 - radar_precip[2, 30:155, 41 + 1 * 3] = 0.1 + if not zero_radar: + for i in range(2): + radar_precip[i, 5:150, 30 + 1 * i] = 0.1 + radar_precip[i, 5:150, 31 + 1 * i] = 0.5 + radar_precip[i, 5:150, 32 + 1 * i] = 0.5 + radar_precip[i, 5:150, 33 + 1 * i] = 5.0 + radar_precip[i, 5:150, 34 + 1 * i] = 5.0 + radar_precip[i, 5:150, 35 + 1 * i] = 4.5 + radar_precip[i, 5:150, 36 + 1 * i] = 4.5 + radar_precip[i, 5:150, 37 + 1 * i] = 4.0 + radar_precip[i, 5:150, 38 + 1 * i] = 1.0 + radar_precip[i, 5:150, 39 + 1 * i] = 0.5 + radar_precip[i, 5:150, 40 + 1 * i] = 0.5 + radar_precip[i, 5:150, 41 + 1 * i] = 0.1 + radar_precip[2, 30:155, 30 + 1 * 2] = 0.1 + radar_precip[2, 30:155, 31 + 1 * 2] = 0.1 + radar_precip[2, 30:155, 32 + 1 * 2] = 1.0 + radar_precip[2, 30:155, 33 + 1 * 2] = 5.0 + radar_precip[2, 30:155, 34 + 1 * 2] = 5.0 + radar_precip[2, 30:155, 35 + 1 * 2] = 4.5 + radar_precip[2, 30:155, 36 + 1 * 2] = 4.5 + radar_precip[2, 30:155, 37 + 1 * 2] = 4.0 + radar_precip[2, 30:155, 38 + 1 * 2] = 2.0 + radar_precip[2, 30:155, 39 + 1 * 2] = 1.0 + radar_precip[2, 30:155, 40 + 1 * 3] = 0.5 + radar_precip[2, 30:155, 41 + 1 * 3] = 0.1 metadata = dict() metadata["unit"] = "mm" diff --git a/pysteps/tests/test_blending_utils.py b/pysteps/tests/test_blending_utils.py index 40f138d9c..63fc1ffa5 100644 --- a/pysteps/tests/test_blending_utils.py +++ b/pysteps/tests/test_blending_utils.py @@ -15,6 +15,7 @@ decompose_NWP, compute_store_nwp_motion, load_NWP, + check_norain, ) pytest.importorskip("netCDF4") @@ -96,8 +97,8 @@ precip_nwp, nwp_metadata = converter(precip_nwp, nwp_metadata) # Threshold the data -precip_nwp[precip_nwp < 0.1] = 0.0 nwp_metadata["threshold"] = 0.1 +precip_nwp[precip_nwp < nwp_metadata["threshold"]] = 0.0 # Transform the data transformer = pysteps.utils.get_method("dB") @@ -378,3 +379,28 @@ def test_blending_utils( decimal=3, err_msg="Recomposed field of second forecast does not equal original field", ) + + precip_arr = precip_nwp + # rainy fraction is 0.005847 + assert not check_norain(precip_arr) + assert not check_norain(precip_arr, precip_thr=nwp_metadata["threshold"]) + assert not check_norain( + precip_arr, precip_thr=nwp_metadata["threshold"], norain_thr=0.005 + ) + assert not check_norain(precip_arr, norain_thr=0.005) + # so with norain_thr beyond this number it should report that there's no rain + assert check_norain(precip_arr, norain_thr=0.006) + assert check_norain( + precip_arr, precip_thr=nwp_metadata["threshold"], norain_thr=0.006 + ) + + # also if we set the precipitation threshold sufficiently high, it should report there's no rain + # rainy fraction > 4mm/h is 0.004385 + assert not check_norain(precip_arr, precip_thr=4.0, norain_thr=0.004) + assert check_norain(precip_arr, precip_thr=4.0, norain_thr=0.005) + + # no rain above 100mm/h so it should give norain + assert check_norain(precip_arr, precip_thr=100) + + # should always give norain if the threshold is set to 100% + assert check_norain(precip_arr, norain_thr=1.0)