Skip to content

Commit

Permalink
Merge pull request #12785 from rtibbles/reentrant_logging
Browse files Browse the repository at this point in the history
When running under the Kolibri process bus, use a logging queue to prevent reentrant logging errors and file contention
  • Loading branch information
marcellamaki authored Nov 8, 2024
2 parents 7af653e + fe42a1c commit a26a75b
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 15 deletions.
3 changes: 2 additions & 1 deletion kolibri/core/tasks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ def __job_storage():
""" :type: Storage """


def initialize_workers():
def initialize_workers(log_queue=None):
logger.info("Starting async task workers.")
return Worker(
connection=connection,
regular_workers=conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"],
high_workers=conf.OPTIONS["Tasks"]["HIGH_PRIORITY_WORKERS"],
log_queue=log_queue,
)
10 changes: 5 additions & 5 deletions kolibri/core/tasks/test/taskrunner/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from kolibri.core.tasks import compat
from kolibri.utils import multiprocessing_compat


@pytest.fixture(params=[False, True], autouse=True)
Expand All @@ -24,7 +24,7 @@ class local(object):
from threading import local # noqa
from concurrent.futures import ThreadPoolExecutor as PoolExecutor # noqa

monkeypatch.setattr(compat, "Thread", Thread)
monkeypatch.setattr(compat, "Event", Event)
monkeypatch.setattr(compat, "local", local)
monkeypatch.setattr(compat, "PoolExecutor", PoolExecutor)
monkeypatch.setattr(multiprocessing_compat, "Thread", Thread)
monkeypatch.setattr(multiprocessing_compat, "Event", Event)
monkeypatch.setattr(multiprocessing_compat, "local", local)
monkeypatch.setattr(multiprocessing_compat, "PoolExecutor", PoolExecutor)
2 changes: 1 addition & 1 deletion kolibri/core/tasks/test/taskrunner/test_job_running.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import pytest

from kolibri.core.tasks.compat import Event
from kolibri.core.tasks.exceptions import JobNotFound
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import State
Expand All @@ -13,6 +12,7 @@
from kolibri.core.tasks.utils import get_current_job
from kolibri.core.tasks.utils import import_path_to_callable
from kolibri.core.tasks.worker import Worker
from kolibri.utils.multiprocessing_compat import Event


@pytest.fixture
Expand Down
6 changes: 3 additions & 3 deletions kolibri/core/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

from kolibri.core.sqlite.utils import check_sqlite_integrity
from kolibri.core.sqlite.utils import repair_sqlite_db
from kolibri.core.tasks import compat
from kolibri.core.tasks.exceptions import UserCancelledError
from kolibri.utils import conf
from kolibri.utils import multiprocessing_compat
from kolibri.utils.options import FD_PER_THREAD
from kolibri.utils.system import get_fd_limit

Expand All @@ -27,7 +27,7 @@
# An object on which to store data about the current job
# So far the only use is to track the job, but other metadata
# could be added.
current_state_tracker = SimpleLazyObject(compat.local)
current_state_tracker = SimpleLazyObject(multiprocessing_compat.local)


def get_current_job():
Expand Down Expand Up @@ -76,7 +76,7 @@ def __init__(self, func, thread_name, wait_between_runs=1, *args, **kwargs):
:param thread_name: the name of the thread to use during logging and debugging
:param wait_between_runs: how many seconds to wait in between func calls.
"""
self.shutdown_event = compat.Event()
self.shutdown_event = multiprocessing_compat.Event()
self.thread_name = thread_name
self.thread_id = uuid.uuid4().hex
self.logger = logging.getLogger(
Expand Down
17 changes: 13 additions & 4 deletions kolibri/core/tasks/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@

from django.db import connection as django_connection

from kolibri.core.tasks.compat import PoolExecutor
from kolibri.core.tasks.constants import Priority
from kolibri.core.tasks.storage import Storage
from kolibri.core.tasks.utils import db_connection
from kolibri.core.tasks.utils import InfiniteLoopThread
from kolibri.utils.multiprocessing_compat import PoolExecutor

logger = logging.getLogger(__name__)


def execute_job(
job_id, worker_host=None, worker_process=None, worker_thread=None, worker_extra=None
job_id,
worker_host=None,
worker_process=None,
worker_thread=None,
worker_extra=None,
log_queue=None,
):
"""
Call the function stored in the job.func.
Expand All @@ -36,7 +41,7 @@ def execute_job(
django_connection.close()


def execute_job_with_python_worker(job_id):
def execute_job_with_python_worker(job_id, log_queue=None):
"""
Call execute_job but additionally with the current host, process and thread information taken
directly from python internals.
Expand All @@ -50,11 +55,12 @@ def execute_job_with_python_worker(job_id):
worker_host=socket.gethostname(),
worker_process=str(os.getpid()),
worker_thread=str(threading.get_ident()),
log_queue=log_queue,
)


class Worker(object):
def __init__(self, connection, regular_workers=2, high_workers=1):
def __init__(self, connection, regular_workers=2, high_workers=1, log_queue=None):
# Internally, we use concurrent.future.Future to run and track
# job executions. We need to keep track of which future maps to which
# job they were made from, and we use the job_future_mapping dict to do
Expand All @@ -74,6 +80,8 @@ def __init__(self, connection, regular_workers=2, high_workers=1):
# High workers run only 'high' priority jobs.
self.regular_workers = regular_workers
self.max_workers = regular_workers + high_workers
# Track any log queue that is passed in
self.log_queue = log_queue

self.workers = self.start_workers()
self.job_checker = self.start_job_checker()
Expand Down Expand Up @@ -191,6 +199,7 @@ def start_next_job(self, job):
future = self.workers.submit(
execute_job_with_python_worker,
job_id=job.job_id,
log_queue=self.log_queue,
)

# Check if the job ID already exists in the future_job_mapping dictionary
Expand Down
153 changes: 153 additions & 0 deletions kolibri/utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import logging
import os
from logging.handlers import QueueHandler
from logging.handlers import QueueListener
from logging.handlers import TimedRotatingFileHandler
from typing import Dict
from typing import List
from typing import Optional


GET_FILES_TO_DELETE = "getFilesToDelete"
Expand All @@ -18,6 +23,62 @@
}


# Type definition for mapping of logger names to their handlers
LoggerHandlerMap = Dict[str, List[logging.Handler]]


class LoggerAwareQueueHandler(QueueHandler):
"""
A QueueHandler that adds the logger name to the record so that it
can be properly handled in the listener.
"""

def __init__(self, queue, logger_name: str):
super().__init__(queue)
self.logger_name = logger_name

def prepare(self, record: logging.LogRecord) -> logging.LogRecord:
"""Prepare a record for queuing, ensuring it can be pickled if needed"""
# Get Queue class at runtime to check if we need pickle safety
from kolibri.utils.multiprocessing_compat import use_multiprocessing

# Only do pickle-safety preparation for logging if we're using multiprocessing
if use_multiprocessing():
if hasattr(record, "exc_info") and record.exc_info:
record.exc_text = (
logging.getLogger()
.handlers[0]
.formatter.formatException(record.exc_info)
)
record.exc_info = None
if hasattr(record, "args"):
record.args = tuple(str(arg) for arg in record.args)

record = super().prepare(record)
record._logger_name = self.logger_name
return record


class LoggerAwareQueueListener(QueueListener):
"""A QueueListener that routes records to their original logger's handlers"""

def __init__(self, queue, logger_handlers: LoggerHandlerMap):
super().__init__(queue)
self.logger_handlers = logger_handlers

def handle(self, record: logging.LogRecord) -> None:
"""Handle a record by sending it to the original logger's handlers"""
logger_name = getattr(record, "_logger_name", "")
handlers = self.logger_handlers.get(logger_name, [])

for handler in handlers:
try:
if record.levelno >= handler.level:
handler.handle(record)
except Exception:
handler.handleError(record)


class EncodingStreamHandler(logging.StreamHandler):
"""
A custom stream handler that encodes the log message to the specified encoding.
Expand Down Expand Up @@ -312,3 +373,95 @@ def get_logging_config(LOG_ROOT, debug=False, debug_database=False):
admin_logger_handlers = admin_logger.setdefault("handlers", [])
admin_logger_handlers.append("mail_admins")
return config


# Track if queue logging has been initialized for the current process
_queue_logging_initialized_for_process = False


class QueueLoggingInitializedError(RuntimeError):
pass


def _replace_handlers_with_queue(queue) -> LoggerHandlerMap:
"""
Internal function to replace all logger handlers with queue handlers.
Returns a dict of the original logger handlers for the listener to consume.
"""
global _queue_logging_initialized_for_process

if _queue_logging_initialized_for_process:
raise QueueLoggingInitializedError(
"Queue logging has already been initialized for this process"
)

logger_handlers: LoggerHandlerMap = {}

# Set up logging for all loggers
for logger_name in list(logging.root.manager.loggerDict.keys()) + [""]:
logger = logging.getLogger(logger_name)
if logger.handlers:
# Store the original handlers
logger_handlers[logger_name] = logger.handlers[:]

# Remove existing handlers
for handler in logger.handlers[:]:
logger.removeHandler(handler)

# Add queue handler
queue_handler = LoggerAwareQueueHandler(queue, logger_name)
logger.addHandler(queue_handler)

_queue_logging_initialized_for_process = True

return logger_handlers


def setup_queue_logging() -> LoggerAwareQueueListener:
"""
Sets up queue-based logging for the main process.
Returns the queue listener which can be used to stop logging and clean up.
"""
# Import Queue at function scope to avoid import order issues
from kolibri.utils.multiprocessing_compat import Queue

# Create queue using Kolibri's compatibility Queue
log_queue = Queue()

# Replace handlers and get original configurations
logger_handlers = _replace_handlers_with_queue(log_queue)

# Create and start listener with collected handlers
listener = LoggerAwareQueueListener(log_queue, logger_handlers)
listener.start()

return listener


def setup_worker_logging(queue) -> None:
"""Sets up logging in a worker to use the queue if not already configured."""
try:
_replace_handlers_with_queue(queue)
except QueueLoggingInitializedError:
pass


def cleanup_queue_logging(listener: Optional[LoggerAwareQueueListener]) -> None:
"""
Stops the queue listener and cleans up multiprocessing resources if needed.
"""
if not listener:
return

# Stop the listener to ensure pending logs are processed
listener.stop()

# Clean up queue if it's a multiprocessing queue
from kolibri.utils.multiprocessing_compat import use_multiprocessing

if use_multiprocessing():
try:
listener.queue.close()
listener.queue.join_thread()
except (ValueError, AttributeError):
pass
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import multiprocessing
import threading
from concurrent import futures
from queue import Queue as ThreadingQueue

from kolibri.utils.conf import OPTIONS

Expand Down Expand Up @@ -29,6 +30,12 @@ def Event(*args, **kwargs):
return threading.Event(*args, **kwargs)


def Queue(*args, **kwargs):
if use_multiprocessing():
return multiprocessing.Queue(*args, **kwargs)
return ThreadingQueue(*args, **kwargs)


class _Local(object):
"""
Dummy class to use for a local object for multiprocessing
Expand Down
21 changes: 20 additions & 1 deletion kolibri/utils/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from .system import pid_exists
from kolibri.utils import conf
from kolibri.utils.android import on_android
from kolibri.utils.logger import cleanup_queue_logging
from kolibri.utils.logger import setup_queue_logging

try:
FileNotFoundError
Expand Down Expand Up @@ -274,7 +276,9 @@ def START(self):
from kolibri.core.tasks.main import initialize_workers

# Initialize the iceqube engine to handle queued tasks
self.worker = initialize_workers()
# Add a loose coupling between our LogPlugin and the ServicesPlugin
# by getting any log_queue that might be present on the bus
self.worker = initialize_workers(log_queue=getattr(self.bus, "log_queue", None))

def STOP(self):
if self.worker is not None:
Expand Down Expand Up @@ -534,9 +538,24 @@ def ENTER(self):


class LogPlugin(SimplePlugin):
def ENTER(self):
# Do this setup during INITIAL, so we wait
# until after any WSGI application has been
# imported, as that will trigger Django setup
# which will reinitialize logging, and override
# what we are doing here.
self.queue_listener = setup_queue_logging()
self.bus.log_queue = self.queue_listener.queue

def log(self, msg, level):
logger.log(level, msg)

def EXITED(self):
cleanup_queue_logging(self.queue_listener)

# Set this to priority 100 so that it gets executed after any other EXITED handlers.
EXITED.priority = 100


class SignalHandler(BaseSignalHandler):
def __init__(self, bus):
Expand Down

0 comments on commit a26a75b

Please sign in to comment.