Skip to content

Commit

Permalink
Improve validation, especially zero flatliners
Browse files Browse the repository at this point in the history
Signed-off-by: Martijn Cazemier <martijn.cazemier@alliander.com>
  • Loading branch information
MartijnCa committed Sep 19, 2023
1 parent 01dc1ef commit f1f7135
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 41 deletions.
4 changes: 2 additions & 2 deletions openstef/data_classes/prediction_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ class PredictionJobDataClass(BaseModel):
"""Minimum fraction of data that should be available for making a regular forecast."""
minimal_table_length: int = 100
"""Minimum length (in rows) of the forecast input for making a regular forecast."""
flatliner_threshold: int = 24
"""Number of consecutive values that have to be constant to detect a flatliner. """
flatliner_threshold_minutes: int = 360
"""Number of minutes that the load has to be constant to detect a flatliner. """
depends_on: Optional[list[Union[int, str]]]
"""Link to another prediction job on which this prediction job might depend."""
sid: Optional[str]
Expand Down
4 changes: 4 additions & 0 deletions openstef/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class InputDataWrongColumnOrderError(InputDataInvalidError):
"""Wrong column order input data."""


class InputDataOngoingZeroFlatlinerError(InputDataInvalidError):
"""All recent load measurements are zero."""


class OldModelHigherScoreError(Exception):
"""Old model has a higher score then new model."""

Expand Down
7 changes: 6 additions & 1 deletion openstef/pipeline/create_basecase_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ def create_basecase_forecast_pipeline(
# Validate data
# Currently effectively disabled by giving None.
# We keep this step so it later can be filled using arguments defined in PJ
validated_data = validation.validate(pj["id"], input_data, flatliner_threshold=None)
validated_data = validation.validate(
pj["id"],
input_data,
flatliner_threshold_minutes=None,
resolution_minutes=pj["resolution_minutes"],
)

# Add features
data_with_features = OperationalPredictFeatureApplicator(
Expand Down
7 changes: 6 additions & 1 deletion openstef/pipeline/create_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ def create_forecast_pipeline_core(
fallback_strategy = "extreme_day" # this can later be expanded

# Validate and clean data
validated_data = validation.validate(pj["id"], input_data, pj["flatliner_threshold"])
validated_data = validation.validate(
pj["id"],
input_data,
pj["flatliner_threshold_minutes"],
pj["resolution_minutes"],
)

# Custom data prep or legacy behavior
if pj.data_prep_class:
Expand Down
7 changes: 6 additions & 1 deletion openstef/pipeline/optimize_hyperparameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ def optimize_hyperparameters_pipeline_core(

# Validate and clean data
validated_data = validation.drop_target_na(
validation.validate(pj["id"], input_data, pj["flatliner_threshold"])
validation.validate(
pj["id"],
input_data,
pj["flatliner_threshold_minutes"],
pj["resolution_minutes"],
)
)

# Check if sufficient data is left after cleaning
Expand Down
7 changes: 6 additions & 1 deletion openstef/pipeline/train_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,12 @@ def train_pipeline_step_compute_features(
input_data = input_data.sort_values(horizons)
# Validate and clean data
validated_data = validation.drop_target_na(
validation.validate(pj["id"], input_data, pj["flatliner_threshold"], pj["resolution_minutes"])
validation.validate(
pj["id"],
input_data,
pj["flatliner_threshold_minutes"],
pj["resolution_minutes"],
)
)
# Check if sufficient data is left after cleaning
if not validation.is_data_sufficient(
Expand Down
43 changes: 31 additions & 12 deletions openstef/validation/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,35 @@
from datetime import timedelta
from typing import Union

import math
import numpy as np
import pandas as pd
import structlog

from openstef.exceptions import InputDataOngoingZeroFlatlinerError
from openstef.preprocessing.preprocessing import replace_repeated_values_with_nan
from openstef.model.regressors.regressor import OpenstfRegressor


def validate(
pj_id: Union[int, str],
data: pd.DataFrame,
flatliner_threshold: Union[int, None],
flatliner_threshold_minutes: Union[int, None],
resolution_minutes: int,
) -> pd.DataFrame:
"""Validate prediction job and timeseries data.
Steps:
1. Replace repeated values for longer than flatliner_threshold with NaN
# TODO: The function description suggests it
'validates' the PJ and Data, but is appears to 'just' replace repeated observations with NaN.
1. Check if input dataframe has a datetime index.
1. Check if a zero flatliner pattern is ongoing (i.e. all recent measurements are zero).
2. Replace repeated values for longer than flatliner_threshold_minutes with NaN.
Args:
pj_id: ind/str, used to identify log statements
data: pd.DataFrame where the first column should be the target. index=datetimeIndex
flatliner_threshold: int of max repetitions considered a flatline.
flatliner_threshold_minutes: int indicating the number of minutes after which constant load is considered a flatline.
if None, the validation is effectively skipped
resolution_minutes: The forecasting resolution in minutes.
Returns:
Dataframe where repeated values are set to None
Expand All @@ -39,13 +43,26 @@ def validate(
if not isinstance(data.index, pd.DatetimeIndex):
raise ValueError("Input dataframe does not have a datetime index.")

if flatliner_threshold is None:
if flatliner_threshold_minutes is None:
logger.info("Skipping validation of input data", pj_id=pj_id)
return data

zero_flatliner_ongoing = detect_ongoing_zero_flatliner(
load=data.iloc[:, 0], duration_threshold_minutes=flatliner_threshold_minutes
)

if zero_flatliner_ongoing:
raise InputDataOngoingZeroFlatlinerError(
"All recent load measurements are zero."
)

flatliner_threshold_repetitions = math.ceil(
flatliner_threshold_minutes / resolution_minutes
)

# Drop 'false' measurements. e.g. where load appears to be constant.
data = replace_repeated_values_with_nan(
data, max_length=flatliner_threshold, column_name=data.columns[0]
data, max_length=flatliner_threshold_repetitions, column_name=data.columns[0]
)
num_repeated_values = len(data) - len(data.iloc[:, 0].dropna())
if num_repeated_values > 0:
Expand Down Expand Up @@ -193,22 +210,24 @@ def calc_completeness_features(

def detect_ongoing_zero_flatliner(
load: pd.Series,
duration_threshold_hours: int,
duration_threshold_minutes: int,
) -> bool:
"""Detects if the latest measurements follow a zero flatliner pattern.
Args:
load (pd.Series): A timeseries of measured load with a datetime index.
duration_threshold_hours (int): A zero flatliner is only detected if it exceeds the threshold duration.
duration_threshold_minutes (int): A zero flatliner is only detected if it exceeds the threshold duration.
Returns:
bool: Indicating wether or not there is a zero flatliner ongoing for the given load.
"""

latest_measurement_time = load.index.max()
latest_measurements = load[latest_measurement_time - timedelta(minutes=duration_threshold_hours * 60):].dropna()

return ((latest_measurements == 0).all() & (not latest_measurements.empty))
latest_measurements = load[
latest_measurement_time - timedelta(minutes=duration_threshold_minutes) :
].dropna()

return (latest_measurements == 0).all() & (not latest_measurements.empty)


def calc_completeness_dataframe(
Expand Down
14 changes: 12 additions & 2 deletions test/unit/pipeline/test_pipeline_train_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,12 @@ def test_train_model_pipeline_core_happy_flow(self):

# Validate and clean data
validated_data = validation.drop_target_na(
validation.validate(pj["id"], train_input, flatliner_threshold=24)
validation.validate(
pj["id"],
train_input,
flatliner_threshold_minutes=360,
resolution_minutes=15,
)
)

# Add features
Expand Down Expand Up @@ -245,7 +250,12 @@ def test_train_model_pipeline_core_happy_flow_with_legacy_data_prep(self):

# Validate and clean data
validated_data = validation.drop_target_na(
validation.validate(pj["id"], train_input, flatliner_threshold=24)
validation.validate(
pj["id"],
train_input,
flatliner_threshold_minutes=360,
resolution_minutes=15,
)
)

# Add features
Expand Down
8 changes: 6 additions & 2 deletions test/unit/pipeline/test_train_predict_backtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ def test_train_model_pipeline_core_happy_flow_nfold(self):
# check if forecast is indeed of the entire range of the input data
validated_data = validation.drop_target_na(
validation.validate(
self.pj["id"], self.train_input, self.pj["flatliner_threshold"]
self.pj["id"],
self.train_input,
self.pj["flatliner_threshold_minutes"],
resolution_minutes=15,
)
)
data_with_features = TrainFeatureApplicator(
Expand Down Expand Up @@ -158,7 +161,8 @@ def test_train_model_pipeline_core_custom_split(self):
validation.validate(
self.pj["id"],
self.train_input[-nb_test:],
self.pj["flatliner_threshold"],
self.pj["flatliner_threshold_minutes"],
self.pj["resolution_minutes"],
)
)
data_with_features = TrainFeatureApplicator(
Expand Down
66 changes: 56 additions & 10 deletions test/unit/validation/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from openstef.validation import validation

from openstef.exceptions import InputDataOngoingZeroFlatlinerError


class TestDataValidation(BaseTestCase):
def setUp(self):
Expand All @@ -19,38 +21,82 @@ def setUp(self):
self.data_predict = TestData.load("input_data.csv")
self.pj = TestData.get_prediction_job(pid=307)

def test_drop_target_na(self):
# No data should be removed
def test_drop_target_na_no_data_removed(self):
# Arrange
original_length = len(self.data_train)

# Act
cleaned_data = validation.drop_target_na(self.data_train)

# Assert
self.assertEqual(len(cleaned_data), original_length)

# Data should be removed, since all rows have NaN load except the first 1000 rows
def test_drop_target_na_lots_of_data_removed(self):
# Arrange
temp_data = self.data_train.copy()
temp_data.iloc[1000:, 0] = np.nan

# Act
cleaned_data = validation.drop_target_na(temp_data)

# Assert
# Data should be removed, since all rows have NaN load except the first 1000 rows
self.assertEqual(len(cleaned_data), 1000)

def test_validate(self):

self.data_predict["load"][0:50] = 10.0
validated_data = validation.validate(
self.pj["id"], self.data_predict, self.pj["flatliner_threshold"]
self.pj["id"],
self.data_predict,
self.pj["flatliner_threshold_minutes"],
resolution_minutes=15,
)
self.assertEqual(26, len(validated_data[validated_data["load"].isna()]))

def test_validate_none_threshold(self):
"""return the input if flatliner_threshold is None"""
"""return the input if flatliner_threshold_minutes is None"""
# Arrange
input_data = self.data_train
res = validation.validate(self.pj["id"], input_data, flatliner_threshold=None)
flatliner_threshold_minutes = None

# Act
res = validation.validate(
self.pj["id"],
input_data,
flatliner_threshold_minutes,
resolution_minutes=15,
)

# Assert
self.assertDataframeEqual(res, input_data)

def test_validate_no_datetime_index(self):
"""Raise a value error if input data has no datetime index."""
# Arrange
input_data = self.data_train
input_data = input_data.reset_index(drop=True)

# Act & assert
with pytest.raises(ValueError):
res = validation.validate(
self.pj["id"], input_data, flatliner_threshold=None
validation.validate(
self.pj["id"],
input_data,
flatliner_threshold_minutes=None,
resolution_minutes=15,
)

def test_validate_ongoing_zero_flatliner(self):
# Arrange
input_data = self.data_train
input_data.iloc[-80:, 0] = 0
flatliner_threshold_minutes = 360
resolution_minutes = 15

# Act & assert
with pytest.raises(InputDataOngoingZeroFlatlinerError):
validation.validate(
self.pj["id"],
input_data,
flatliner_threshold_minutes,
resolution_minutes,
)
self.assertIsInstance(res, pd.DataFrame)
Loading

0 comments on commit f1f7135

Please sign in to comment.