Skip to content

Commit

Permalink
Revert "#26 Hotfix: Switched to correct pyspark DF format"
Browse files Browse the repository at this point in the history
This reverts commit 673afa8.
  • Loading branch information
dh1542 committed Nov 26, 2024
1 parent 673afa8 commit 32b2080
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@
import logging

import pandas

from pandas import DataFrame
from datetime import datetime

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType


class DataFrameLogHandler(logging.Handler):
"""
Expand All @@ -41,19 +38,9 @@ class DataFrameLogHandler(logging.Handler):
"""

logs_df: DataFrame = None
spark = None

def __init__(self):
self.spark = SparkSession.builder.appName("Dataframe Log Handler").getOrCreate()
df_schema = StructType(
[
StructField("timestamp", DateType(), True),
StructField("name", StringType(), True),
StructField("level", StringType(), True),
StructField("message", StringType(), True),
]
)
self.logs_df = self.spark.createDataFrame([], schema=df_schema)
self.logs_df = DataFrame(columns=["timestamp", "name", "level", "message"])
super().__init__()

def emit(self, record: logging.LogRecord) -> None:
Expand All @@ -65,8 +52,10 @@ def emit(self, record: logging.LogRecord) -> None:
"message": record.msg,
}

new_log_df_row = self.spark.createDataFrame([log_entry])
self.logs_df = self.logs_df.union(new_log_df_row)
new_log_df_row = pandas.DataFrame(
log_entry, columns=["timestamp", "name", "level", "message"], index=[0]
)
self.logs_df = pandas.concat([self.logs_df, new_log_df_row], ignore_index=True)

def get_logs_as_df(self) -> DataFrame:
return self.logs_df
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import os

import pytest
from pyspark.sql import DataFrame
from pandas import DataFrame
from pyspark.sql import SparkSession

from src.sdk.python.rtdip_sdk.pipelines.logging.spark.runtime_log_collector import (
Expand All @@ -38,8 +38,8 @@ def spark():
spark.stop()


def test_logger_manager_basic_function(spark):
df = spark.createDataFrame([(1, "2024-02-11 00:00:00.000")], ["Index", "EventTime"])
def test_logger_manager_basic_function():
df = DataFrame()
monitor = IdentifyMissingDataInterval(
df=df,
interval="10s",
Expand Down Expand Up @@ -78,7 +78,7 @@ def test_df_output(spark, caplog):

result_df = log_collector.get_logs_as_df()

assert result_df.count() == 6
assert result_df.shape[0] == 6


def test_file_logging(spark, caplog):
Expand Down

0 comments on commit 32b2080

Please sign in to comment.