From 32b20808b6758899aeb12a2ddd41cf8acddbe1f6 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Tue, 26 Nov 2024 11:46:17 +0100 Subject: [PATCH] Revert "#26 Hotfix: Switched to correct pyspark DF format" This reverts commit 673afa8309c831f529a0ea8f8f0fb47b6a5f6b0e. --- .../spark/dataframe/dataframe_log_handler.py | 23 +++++-------------- .../pipelines/logging/test_log_collection.py | 8 +++---- 2 files changed, 10 insertions(+), 21 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py index 66024643f..cb8af94fc 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py +++ b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py @@ -14,12 +14,9 @@ import logging import pandas - +from pandas import DataFrame from datetime import datetime -from pyspark.sql import DataFrame, SparkSession -from pyspark.sql.types import StructType, StructField, StringType, DateType - class DataFrameLogHandler(logging.Handler): """ @@ -41,19 +38,9 @@ class DataFrameLogHandler(logging.Handler): """ logs_df: DataFrame = None - spark = None def __init__(self): - self.spark = SparkSession.builder.appName("Dataframe Log Handler").getOrCreate() - df_schema = StructType( - [ - StructField("timestamp", DateType(), True), - StructField("name", StringType(), True), - StructField("level", StringType(), True), - StructField("message", StringType(), True), - ] - ) - self.logs_df = self.spark.createDataFrame([], schema=df_schema) + self.logs_df = DataFrame(columns=["timestamp", "name", "level", "message"]) super().__init__() def emit(self, record: logging.LogRecord) -> None: @@ -65,8 +52,10 @@ def emit(self, record: logging.LogRecord) -> None: "message": record.msg, } - new_log_df_row = self.spark.createDataFrame([log_entry]) - self.logs_df = self.logs_df.union(new_log_df_row) + new_log_df_row = pandas.DataFrame( + log_entry, columns=["timestamp", "name", "level", "message"], index=[0] + ) + self.logs_df = pandas.concat([self.logs_df, new_log_df_row], ignore_index=True) def get_logs_as_df(self) -> DataFrame: return self.logs_df diff --git a/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py b/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py index e2157cb9c..89fd77b7e 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py @@ -14,7 +14,7 @@ import os import pytest -from pyspark.sql import DataFrame +from pandas import DataFrame from pyspark.sql import SparkSession from src.sdk.python.rtdip_sdk.pipelines.logging.spark.runtime_log_collector import ( @@ -38,8 +38,8 @@ def spark(): spark.stop() -def test_logger_manager_basic_function(spark): - df = spark.createDataFrame([(1, "2024-02-11 00:00:00.000")], ["Index", "EventTime"]) +def test_logger_manager_basic_function(): + df = DataFrame() monitor = IdentifyMissingDataInterval( df=df, interval="10s", @@ -78,7 +78,7 @@ def test_df_output(spark, caplog): result_df = log_collector.get_logs_as_df() - assert result_df.count() == 6 + assert result_df.shape[0] == 6 def test_file_logging(spark, caplog):