Skip to content

Commit

Permalink
#80: Refactoring and moved packages
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Hoffmann <dominik151099@outlook.de>
  • Loading branch information
dh1542 committed Nov 27, 2024
1 parent fa41bfb commit 08e84f8
Show file tree
Hide file tree
Showing 45 changed files with 60 additions and 127 deletions.
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ nav:
- Monitoring:
- Data Quality:
- Great Expectations:
- Data Quality Monitoring: sdk/code-reference/pipelines/monitoring/spark/data_quality/great_expectations.md
- Data Quality Monitoring: sdk/code-reference/pipelines/monitoring/spark/data_manipulation/great_expectations.md
- Jobs: sdk/pipelines/jobs.md
- Deploy:
- Databricks Workflows: sdk/pipelines/deploy/databricks.md
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
# limitations under the License.

from abc import abstractmethod
from ..interfaces import PipelineComponentBaseInterface
from rtdip_sdk.pipelines.interfaces import PipelineComponentBaseInterface
from pyspark.sql import DataFrame


class WranglerBaseInterface(PipelineComponentBaseInterface):
class DataManipulationBaseInterface(PipelineComponentBaseInterface):
@abstractmethod
def filter(self) -> DataFrame:
pass
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 RTDIP
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@
from pyspark.sql.functions import desc
from pyspark.sql import DataFrame as PySparkDataFrame

from ...interfaces import WranglerBaseInterface
from ...._pipeline_utils.models import Libraries, SystemType
from ...interfaces import DataManipulationBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType


class DuplicateDetection(WranglerBaseInterface):
class DuplicateDetection(DataManipulationBaseInterface):
"""
Cleanses a PySpark DataFrame from duplicates.
Example
--------
```python
from rtdip_sdk.pipelines.monitoring.spark.data_quality.duplicate_detection import DuplicateDetection
from rtdip_sdk.pipelines.monitoring.spark.data_manipulation.duplicate_detection import DuplicateDetection
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import desc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

from ...._pipeline_utils.models import Libraries, SystemType
from ...interfaces import WranglerBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from ...interfaces import DataManipulationBaseInterface


class IntervalFiltering(WranglerBaseInterface):
class IntervalFiltering (DataManipulationBaseInterface):
"""
Cleanses a DataFrame by removing rows outside a specified interval window.
Example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import mean, stddev, abs, col
from ...interfaces import WranglerBaseInterface
from ...._pipeline_utils.models import Libraries, SystemType
from ...interfaces import DataManipulationBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType


class KSigmaAnomalyDetection(WranglerBaseInterface):
class KSigmaAnomalyDetection(DataManipulationBaseInterface):
"""
Anomaly detection with the k-sigma method. This method either computes the mean and standard deviation, or the median and the median absolute deviation (MAD) of the data.
The k-sigma method then filters out all data points that are k times the standard deviation away from the mean, or k times the MAD away from the median.
Expand All @@ -27,7 +27,7 @@ class KSigmaAnomalyDetection(WranglerBaseInterface):
Example
--------
```python
from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.k_sigma_anomaly_detection import KSigmaAnomalyDetection
from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_manipulation.k_sigma_anomaly_detection import KSigmaAnomalyDetection
spark = ... # SparkSession
df = ... # Get a PySpark DataFrame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import numpy as np
from datetime import timedelta
from typing import List
from ...interfaces import WranglerBaseInterface
from ...._pipeline_utils.models import Libraries, SystemType
from ...interfaces import DataManipulationBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType


class MissingValueImputation(WranglerBaseInterface):
class MissingValueImputation(DataManipulationBaseInterface):
"""
Imputes missing values in a univariate time series creating a continuous curve of data points. For that, the
time intervals of each individual source is calculated, to then insert empty records at the missing timestamps with
Expand All @@ -36,7 +36,7 @@ class MissingValueImputation(WranglerBaseInterface):
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType, StructField, StringType
from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.missing_value_imputation import (
from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_manipulation.missing_value_imputation import (
MissingValueImputation,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark.sql import DataFrame as PySparkDataFrame
from .....data_wranglers.interfaces import WranglerBaseInterface
from ....._pipeline_utils.models import Libraries, SystemType
from rtdip_sdk.pipelines.data_quality.data_manipulation.interfaces import DataManipulationBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from .normalization import (
NormalizationBaseClass,
)


class Denormalization(WranglerBaseInterface):
class Denormalization(DataManipulationBaseInterface):
"""
#TODO
Applies the appropriate denormalization method to revert values to their original scale.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
from abc import abstractmethod
from pyspark.sql import DataFrame as PySparkDataFrame
from typing import List
from .....data_wranglers.interfaces import WranglerBaseInterface
from ....._pipeline_utils.models import Libraries, SystemType
from rtdip_sdk.pipelines.data_quality.data_manipulation.interfaces import DataManipulationBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType


class NormalizationBaseClass(WranglerBaseInterface):
class NormalizationBaseClass(DataManipulationBaseInterface):
"""
A base class for applying normalization techniques to multiple columns in a PySpark DataFrame.
This class serves as a framework to support various normalization methods (e.g., Z-Score, Min-Max, and Mean),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
from pyspark.sql import DataFrame as PySparkDataFrame, SparkSession
from statsmodels.tsa.arima.model import ARIMA, ARIMAResults

from ....interfaces import WranglerBaseInterface
from ....._pipeline_utils.models import Libraries, SystemType
from ....interfaces import DataManipulationBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType


class ArimaPrediction(WranglerBaseInterface):
class ArimaPrediction(DataManipulationBaseInterface):
"""
Extends a column in given DataFrame with a ARIMA model.
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from abc import abstractmethod

from pyspark.sql import DataFrame
from ..interfaces import PipelineComponentBaseInterface
from rtdip_sdk.pipelines.interfaces import PipelineComponentBaseInterface


class MonitoringBaseInterface(PipelineComponentBaseInterface):
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from functools import reduce
from operator import or_

from ...interfaces import MonitoringBaseInterface
from ...._pipeline_utils.models import Libraries, SystemType
from rtdip_sdk.pipelines.data_quality.monitoring.interfaces import MonitoringBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType


class CheckValueRanges(MonitoringBaseInterface):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from pyspark.sql.functions import col, when, lag, count, sum
from pyspark.sql.window import Window

from ...interfaces import MonitoringBaseInterface
from ...._pipeline_utils.models import Libraries, SystemType
from rtdip_sdk.pipelines.data_quality.monitoring.interfaces import MonitoringBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType


class FlatlineDetection(MonitoringBaseInterface):
Expand All @@ -38,7 +38,7 @@ class FlatlineDetection(MonitoringBaseInterface):
Example:
```python
from rtdip_sdk.pipelines.monitoring.spark.data_quality.flatline_detection import FlatlineDetection
from rtdip_sdk.pipelines.monitoring.spark.data_manipulation.flatline_detection import FlatlineDetection
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("FlatlineDetectionExample").getOrCreate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

import great_expectations as gx
from pyspark.sql import DataFrame, SparkSession
from ...interfaces import MonitoringBaseInterface
from ...._pipeline_utils.models import Libraries, SystemType
from rtdip_sdk.pipelines.data_quality.monitoring.interfaces import MonitoringBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from great_expectations.checkpoint import (
Checkpoint,
)
Expand All @@ -32,7 +32,7 @@ class GreatExpectationsDataQuality(MonitoringBaseInterface):
Example
--------
```python
from src.sdk.python.rtdip_sdk.monitoring.data_quality.great_expectations.python.great_expectations_data_quality import GreatExpectationsDataQuality
from src.sdk.python.rtdip_sdk.monitoring.data_manipulation.great_expectations.python.great_expectations_data_quality import GreatExpectationsDataQuality
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from pyspark.sql import functions as F
from pyspark.sql.window import Window

from ...interfaces import MonitoringBaseInterface
from ...._pipeline_utils.models import Libraries, SystemType
from ....utilities.spark.time_string_parsing import parse_time_string_to_ms
from rtdip_sdk.pipelines.data_quality.monitoring.interfaces import MonitoringBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from rtdip_sdk.pipelines.utilities.spark.time_string_parsing import parse_time_string_to_ms


class IdentifyMissingDataInterval(MonitoringBaseInterface):
Expand All @@ -43,7 +43,7 @@ class IdentifyMissingDataInterval(MonitoringBaseInterface):
Example
--------
```python
from rtdip_sdk.pipelines.monitoring.spark.data_quality import IdentifyMissingDataInterval
from rtdip_sdk.pipelines.monitoring.spark.data_manipulation import IdentifyMissingDataInterval
from pyspark.sql import SparkSession
missing_data_monitor = IdentifyMissingDataInterval(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from pyspark.sql import DataFrame as PySparkDataFrame
from pyspark.sql import functions as F

from ...interfaces import MonitoringBaseInterface
from ...._pipeline_utils.models import Libraries, SystemType
from ....utilities.spark.time_string_parsing import parse_time_string_to_ms
from rtdip_sdk.pipelines.data_quality.monitoring.interfaces import MonitoringBaseInterface
from rtdip_sdk.pipelines._pipeline_utils.models import Libraries, SystemType
from rtdip_sdk.pipelines.utilities.spark.time_string_parsing import parse_time_string_to_ms


class IdentifyMissingDataPattern(MonitoringBaseInterface):
Expand Down
8 changes: 0 additions & 8 deletions src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .spark.data_quality.duplicate_detection import *
from .spark.data_quality.normalization.normalization import *
from .spark.data_quality.normalization.normalization_mean import *
from .spark.data_quality.normalization.normalization_minmax import *
from .spark.data_quality.normalization.normalization_zscore import *
from .spark.data_quality.normalization.denormalization import *
from .spark.data_quality.prediction.arima import *
from .spark.data_quality.missing_value_imputation import *
17 changes: 0 additions & 17 deletions src/sdk/python/rtdip_sdk/pipelines/monitoring/__init__.py

This file was deleted.

13 changes: 0 additions & 13 deletions src/sdk/python/rtdip_sdk/pipelines/monitoring/spark/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 RTDIP
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -10,4 +10,4 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta

import numpy as np
import pandas as pd
Expand All @@ -20,7 +19,7 @@
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

from src.sdk.python.rtdip_sdk.pipelines.data_wranglers import ArimaPrediction
from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.prediction.arima import ArimaPrediction


@pytest.fixture(scope="session")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.duplicate_detection import (
from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.duplicate_detection import (
DuplicateDetection,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import pytest

from pyspark.sql import SparkSession
from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.interval_filtering import (
from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.interval_filtering import (
IntervalFiltering,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pyspark.sql import SparkSession

from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.k_sigma_anomaly_detection import (
from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.k_sigma_anomaly_detection import (
KSigmaAnomalyDetection,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from pyspark.sql.functions import col, unix_timestamp, abs as A
from pyspark.sql.types import StructType, StructField, StringType

from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.missing_value_imputation import (
from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.data_quality.missing_value_imputation import (
MissingValueImputation,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta

import pytest

from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

from src.sdk.python.rtdip_sdk.pipelines.data_wranglers import (
from rtdip_sdk.pipelines.data_wranglers import (
NormalizationBaseClass,
Denormalization,
NormalizationMinMax,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 RTDIP
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
Empty file.
Loading

0 comments on commit 08e84f8

Please sign in to comment.