Skip to content

Commit

Permalink
#80: Refactoring project structure according to feedback. Fixed impor…
Browse files Browse the repository at this point in the history
…ts and black-linted

Signed-off-by: Dominik Hoffmann <dominik151099@outlook.de>
  • Loading branch information
dh1542 committed Nov 27, 2024
1 parent 17e4361 commit 53bcfc1
Show file tree
Hide file tree
Showing 28 changed files with 123 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from pyspark.sql import DataFrame

from rtdip_sdk.pipelines.interfaces import PipelineComponentBaseInterface
from src.sdk.python.rtdip_sdk.pipelines.interfaces import PipelineComponentBaseInterface


class DataManipulationBaseInterface(PipelineComponentBaseInterface):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
from pyspark.sql import DataFrame as PySparkDataFrame

from ...interfaces import DataManipulationBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)


class DuplicateDetection(DataManipulationBaseInterface):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)
from ...interfaces import DataManipulationBaseInterface


class IntervalFiltering (DataManipulationBaseInterface):
class IntervalFiltering(DataManipulationBaseInterface):
"""
Cleanses a DataFrame by removing rows outside a specified interval window.
Example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import mean, stddev, abs, col
from ...interfaces import DataManipulationBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)


class KSigmaAnomalyDetection(DataManipulationBaseInterface):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from datetime import timedelta
from typing import List
from ...interfaces import DataManipulationBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)


class MissingValueImputation(DataManipulationBaseInterface):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark.sql import DataFrame as PySparkDataFrame
from rtdip_sdk.pipelines.data_quality.data_manipulation.interfaces import DataManipulationBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.interfaces import (
DataManipulationBaseInterface,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)
from .normalization import (
NormalizationBaseClass,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@
from abc import abstractmethod
from pyspark.sql import DataFrame as PySparkDataFrame
from typing import List
from rtdip_sdk.pipelines.data_quality.data_manipulation.interfaces import DataManipulationBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.interfaces import (
DataManipulationBaseInterface,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)


class NormalizationBaseClass(DataManipulationBaseInterface):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from statsmodels.tsa.arima.model import ARIMA, ARIMAResults

from ....interfaces import DataManipulationBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)


class ArimaPrediction(DataManipulationBaseInterface):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from pyspark.sql import DataFrame

from rtdip_sdk.pipelines.interfaces import PipelineComponentBaseInterface
from src.sdk.python.rtdip_sdk.pipelines.interfaces import PipelineComponentBaseInterface


class MonitoringBaseInterface(PipelineComponentBaseInterface):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
from functools import reduce
from operator import or_

from rtdip_sdk.pipelines.data_quality.monitoring.interfaces import MonitoringBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.interfaces import (
MonitoringBaseInterface,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)


class CheckValueRanges(MonitoringBaseInterface):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
from pyspark.sql.functions import col, when, lag, count, sum
from pyspark.sql.window import Window

from rtdip_sdk.pipelines.data_quality.monitoring.interfaces import MonitoringBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.interfaces import (
MonitoringBaseInterface,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)


class FlatlineDetection(MonitoringBaseInterface):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@

import great_expectations as gx
from pyspark.sql import DataFrame, SparkSession
from rtdip_sdk.pipelines.data_quality.monitoring.interfaces import MonitoringBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.interfaces import (
MonitoringBaseInterface,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)
from great_expectations.checkpoint import (
Checkpoint,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
from pyspark.sql import functions as F
from pyspark.sql.window import Window

from rtdip_sdk.pipelines.data_quality.monitoring.interfaces import MonitoringBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from rtdip_sdk.pipelines.utilities.spark.time_string_parsing import parse_time_string_to_ms
from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.interfaces import (
MonitoringBaseInterface,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)
from src.sdk.python.rtdip_sdk.pipelines.utilities.spark.time_string_parsing import (
parse_time_string_to_ms,
)


class IdentifyMissingDataInterval(MonitoringBaseInterface):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@
from pyspark.sql import DataFrame as PySparkDataFrame
from pyspark.sql import functions as F

from rtdip_sdk.pipelines.data_quality.monitoring.interfaces import MonitoringBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from rtdip_sdk.pipelines.utilities.spark.time_string_parsing import parse_time_string_to_ms
from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.interfaces import (
MonitoringBaseInterface,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)
from src.sdk.python.rtdip_sdk.pipelines.utilities.spark.time_string_parsing import (
parse_time_string_to_ms,
)


class IdentifyMissingDataPattern(MonitoringBaseInterface):
Expand Down
13 changes: 0 additions & 13 deletions src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.prediction.arima import ArimaPrediction
from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.prediction.arima import (
ArimaPrediction,
)


@pytest.fixture(scope="session")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from pyspark.sql import SparkSession

from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.k_sigma_anomaly_detection import \
KSigmaAnomalyDetection
from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.k_sigma_anomaly_detection import (
KSigmaAnomalyDetection,
)

# Normal data mean=10 stddev=5 + 3 anomalies
# fmt: off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.normalization.denormalization import \
Denormalization
from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.normalization.normalization import \
NormalizationBaseClass
from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.normalization.normalization_mean import \
NormalizationMean
from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.normalization.normalization_minmax import \
NormalizationMinMax
from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.normalization.denormalization import (
Denormalization,
)
from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.normalization.normalization import (
NormalizationBaseClass,
)
from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.normalization.normalization_mean import (
NormalizationMean,
)
from src.sdk.python.rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.normalization.normalization_minmax import (
NormalizationMinMax,
)


@pytest.fixture(scope="session")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from io import StringIO
import logging

from rtdip_sdk.pipelines.data_quality.monitoring.spark.check_value_ranges import (
from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.spark.check_value_ranges import (
CheckValueRanges,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest
from pyspark.sql import SparkSession
from rtdip_sdk.pipelines.data_quality.monitoring.spark.flatline_detection import (
from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.spark.flatline_detection import (
FlatlineDetection,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pytest_mock import MockerFixture
from pyspark.sql import SparkSession

from rtdip_sdk.pipelines.data_quality.monitoring.spark.great_expectations_data_quality import (
from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.spark.great_expectations_data_quality import (
GreatExpectationsDataQuality,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest
from pyspark.sql import SparkSession
from rtdip_sdk.pipelines.data_quality.monitoring.spark.identify_missing_data_interval import (
from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.spark.identify_missing_data_interval import (
IdentifyMissingDataInterval,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pyspark.sql.types import StructType, StructField, StringType


from rtdip_sdk.pipelines.data_quality.monitoring.spark.identify_missing_data_pattern import (
from src.sdk.python.rtdip_sdk.pipelines.data_quality.monitoring.spark.identify_missing_data_pattern import (
IdentifyMissingDataPattern,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ def sample_data(spark):
# Test cases
def test_cols_to_vector(sample_data):
df = sample_data
# Pass the DataFrame to ColsToVector
cols_to_vector = ColsToVector(df=df, input_cols=["Value"], output_col="features")
# Pass the DataFrame to ColumnsToVector
cols_to_vector = ColumnsToVector(df=df, input_cols=["Value"], output_col="features")
transformed_df = cols_to_vector.transform()

assert "features" in transformed_df.columns
Expand All @@ -180,8 +180,8 @@ def test_cols_to_vector(sample_data):

def test_polynomial_features(sample_data):
df = sample_data
# Convert 'Value' to a vector using ColsToVector
cols_to_vector = ColsToVector(df=df, input_cols=["Value"], output_col="features")
# Convert 'Value' to a vector using ColumnsToVector
cols_to_vector = ColumnsToVector(df=df, input_cols=["Value"], output_col="features")
vectorized_df = cols_to_vector.transform()

polynomial_features = PolynomialFeatures(
Expand All @@ -197,8 +197,8 @@ def test_polynomial_features(sample_data):

def test_linear_regression(sample_data):
df = sample_data
# Use ColsToVector to assemble features into a single vector column
cols_to_vector = ColsToVector(df=df, input_cols=["Value"], output_col="features")
# Use ColumnsToVector to assemble features into a single vector column
cols_to_vector = ColumnsToVector(df=df, input_cols=["Value"], output_col="features")
df = cols_to_vector.transform()
linear_regression = LinearRegression(
df, features_col="features", label_col="Value", prediction_col="prediction"
Expand Down

0 comments on commit 53bcfc1

Please sign in to comment.