Skip to content

Commit

Permalink
Merge pull request #51 from amosproj/feature/#26-Monitoring-Outputs-L…
Browse files Browse the repository at this point in the history
…ogging

Feature/#26 monitoring outputs logging
  • Loading branch information
mollle authored Nov 25, 2024
2 parents b527fa0 + 2fdf381 commit 1dc4a08
Show file tree
Hide file tree
Showing 15 changed files with 526 additions and 25 deletions.
13 changes: 13 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/logging/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
24 changes: 24 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/logging/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod

from pyspark.sql import DataFrame
from ..interfaces import PipelineComponentBaseInterface


class LoggingBaseInterface(PipelineComponentBaseInterface):
@abstractmethod
def get_logs_as_df(self) -> DataFrame:
pass
82 changes: 82 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/logging/logger_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging


from pyspark.pandas.usage_logging.usage_logger import get_logger


class LoggerManager:
"""
Manages creation and storage of all loggers in the application. This is a singleton class.
Please create loggers with the LoggerManager if you want your logs to be handled and stored properly.
Example Usage
--------
```python
logger_manager = LoggerManager()
logger = logger_manager.create_logger("my_logger")
logger.info("This is a log message")
my_logger = logger_manager.get_logger("my_logger")
```
"""

_instance = None
_initialized = False

# dictionary to store all loggers
loggers = {}

def __new__(cls):
if cls._instance is None:
cls._instance = super(LoggerManager, cls).__new__(cls)
return cls._instance

def __init__(self):
if not LoggerManager._initialized:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
LoggerManager._initialized = True

@classmethod
def create_logger(cls, name: str):
"""
Creates a logger with the specified name.
Args:
name (str): The name of the logger.
Returns:
logging.Logger: Configured logger instance.
"""
if name not in cls.loggers:
logger = logging.getLogger(name)
cls.loggers[name] = logger
return logger

return cls.get_logger(name)

@classmethod
def get_logger(cls, name: str):
if name not in cls.loggers:
return None
return cls.loggers[name]

@classmethod
def get_all_loggers(cls) -> dict:
return cls.loggers
13 changes: 13 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/logging/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

import pandas
from pandas import DataFrame
from datetime import datetime


class DataFrameLogHandler(logging.Handler):
"""
Handles logs from attached logger and stores them in a DataFrame at runtime
Args:
logging.Handler: Inherits from logging.Handler
Returns:
returns a DataFrame with logs stored in it
Example
--------
```python
import logging
log_manager = logging.getLogger('log_manager')
"""

logs_df: DataFrame = None

def __init__(self):
self.logs_df = DataFrame(columns=["timestamp", "name", "level", "message"])
super().__init__()

def emit(self, record: logging.LogRecord) -> None:
"""Process and store a log record"""
log_entry = {
"timestamp": datetime.fromtimestamp(record.created),
"name": record.name,
"level": record.levelname,
"message": record.msg,
}

new_log_df_row = pandas.DataFrame(
log_entry, columns=["timestamp", "name", "level", "message"], index=[0]
)
self.logs_df = pandas.concat([self.logs_df, new_log_df_row], ignore_index=True)

def get_logs_as_df(self) -> DataFrame:
return self.logs_df
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

import pandas
from pandas import DataFrame
from datetime import datetime


class FileLogHandler(logging.Handler):
"""
Handles logs from attached logger and stores them in a .log file
Args:
logging.Handler: Inherits from logging.Handler
filename (str): Name of the log file to write to
mode (str): File opening mode ('a' for append, 'w' for write)
Example
--------
```python
import logging
log_manager = logging.getLogger('log_manager')
handler = FileLogHandler('my_logs.log')
log_manager.addHandler(handler)
```
"""

logs_df: DataFrame = None

def __init__(self, file_path: str, mode: str = "a"):
super().__init__()
self.mode = mode
self.file_path = file_path

def emit(self, record: logging.LogRecord) -> None:
"""Process and store a log record in the log file"""
try:
log_entry = {
f"{datetime.fromtimestamp(record.created).isoformat()} | "
f"{record.name} | "
f"{record.levelname} | "
f"{record.msg}\n"
}
with open(self.file_path, self.mode, encoding="utf-8") as log_file:
log_file.write(str(log_entry) + "\n")

except Exception as e:
print(f"Error writing log entry to file: {e}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import os

from pandas import DataFrame
from pandas.io.common import file_path_to_url

from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)
from src.sdk.python.rtdip_sdk.pipelines.logging.interfaces import LoggingBaseInterface
from src.sdk.python.rtdip_sdk.pipelines.logging.logger_manager import LoggerManager
from src.sdk.python.rtdip_sdk.pipelines.logging.spark.dataframe.dataframe_log_handler import (
DataFrameLogHandler,
)
from src.sdk.python.rtdip_sdk.pipelines.logging.spark.log_file.file_log_handler import (
FileLogHandler,
)


class RuntimeLogCollector(LoggingBaseInterface):
"""Collects logs from all loggers in the LoggerManager at runtime."""

logger_manager: LoggerManager = LoggerManager()
df_handler: DataFrameLogHandler = DataFrameLogHandler()

def __init__(self):
pass

@classmethod
def get_logs_as_df(cls) -> DataFrame:
"""Return the DataFrame containing the logs"""
return cls.df_handler.get_logs_as_df()

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

@staticmethod
def system_type() -> SystemType:
pass

@classmethod
def _attach_dataframe_handler_to_loggers(cls) -> None:
"""Attaches the DataFrameLogHandler to the logger."""

loggers = cls.logger_manager.get_all_loggers()

for logger in loggers.values():
# avoid duplicate handlers
if cls.df_handler not in logger.handlers:
logger.addHandler(cls.df_handler)

@classmethod
def _attach_file_handler_to_loggers(
cls, filename: str, path: str = ".", mode: str = "a"
) -> None:
"""Attaches the FileLogHandler to the logger."""

loggers = cls.logger_manager.get_all_loggers()
file_path = os.path.join(path, filename)
file_handler = FileLogHandler(file_path, mode)
for logger in loggers.values():
# avoid duplicate handlers
if file_handler not in logger.handlers:
logger.addHandler(file_handler)
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from pyspark.sql import DataFrame as PySparkDataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
Expand All @@ -22,6 +20,8 @@
from ...._pipeline_utils.models import Libraries, SystemType
from ....utilities.spark.time_string_parsing import parse_time_string_to_ms

from src.sdk.python.rtdip_sdk.pipelines.logging.logger_manager import LoggerManager


class IdentifyMissingDataInterval(MonitoringBaseInterface):
"""
Expand Down Expand Up @@ -70,17 +70,10 @@ def __init__(
self.mad_multiplier = mad_multiplier
self.min_tolerance = min_tolerance

# Configure logging
self.logger = logging.getLogger(self.__class__.__name__)
if not self.logger.handlers:
# Prevent adding multiple handlers in interactive environments
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
# Use global pipeline logger

self.logger_manager = LoggerManager()
self.logger = self.logger_manager.create_logger("IdentifyMissingDataInterval")

@staticmethod
def system_type():
Expand Down
Loading

0 comments on commit 1dc4a08

Please sign in to comment.