Skip to content

Commit

Permalink
feat: Job backend configuration options
Browse files Browse the repository at this point in the history
* Make Config a subclass of UserDict

* Remove stray import

* Move metrics report API call into the reporter

* Update sample apps

* Update Django 2.2.21 sample app with Poetry

* Update sample apps' README for clarity

* Improve Config.update for nested dicts

* Implement support for QUEUES and MAX_QUEUES

* Remove unnecessary test script

* Remove stray print()

* Include information about configuring task processing integrations

* Remove internal configuration options from README examples

* Move collector config defaults into collector's module
  • Loading branch information
karls authored Mar 27, 2023
1 parent a35a38a commit e2d3262
Show file tree
Hide file tree
Showing 35 changed files with 690 additions and 118 deletions.
73 changes: 57 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,8 @@ Optionally, you can customize Judoscale in `settings.py`:

```python
JUDOSCALE = {
# LOG_LEVEL defaults to ENV["LOG_LEVEL"] or "INFO".
# Log level defaults to ENV["LOG_LEVEL"] or "INFO".
"LOG_LEVEL": "DEBUG",

# API_BASE_URL defaults to ENV["JUDOSCALE_URL"], which is set for you when you install Judoscale.
# This is only exposed for testing purposes.
"API_BASE_URL": "https://example.com",

# REPORT_INTERVAL_SECONDS defaults to 10 seconds.
"REPORT_INTERVAL_SECONDS": 5,
}
```

Expand Down Expand Up @@ -91,15 +84,8 @@ Optionally, you can override Judoscale's own configuration via your application'

```python
JUDOSCALE = {
# LOG_LEVEL defaults to ENV["LOG_LEVEL"] or "INFO".
# Log level defaults to ENV["LOG_LEVEL"] or "INFO".
"LOG_LEVEL": "DEBUG",

# API_BASE_URL defaults to ENV["JUDOSCALE_URL"], which is set for you when you install Judoscale.
# This is only exposed for testing purposes.
"API_BASE_URL": "https://example.com",

# REPORT_INTERVAL_SECONDS defaults to 10 seconds.
"REPORT_INTERVAL_SECONDS": 5,
}
```

Expand Down Expand Up @@ -143,6 +129,34 @@ If you need to change the Judoscale integration configuration, you can pass a di
judoscale_celery(celery_app, extra_config={"LOG_LEVEL": "DEBUG"})
```

An example configuration dictionary accepted by `extra_config`:

```py
{
"LOG_LEVEL": "INFO",

# In addition to global configuration options for the Judoscale
# integration above, you can also specify the following configuration
# options for the Celery integration.
"CELERY": {
# Enable (default) or disable the Celery integration
"ENABLED": True,

# Report metrics on up to MAX_QUEUES queues.
# The list of discovered queues are sorted by the length
# of the queue name (shortest first) and metrics are
# reported for the first MAX_QUEUES queues.
# Defaults to 20.
"MAX_QUEUES": 20,

# Specify a list of known queues to report metrics for.
# MAX_QUEUES is still honoured.
# Defaults to empty list (report metrics for discovered queues).
"QUEUES": []
}
}
```

> :warning: **NOTE:** Calling `judoscale_celery` turns on sending [`task-sent`](https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-send-sent-event) events. This is required for the Celery integration with Judoscale to work.
### Judoscale with Celery and Flask
Expand Down Expand Up @@ -205,6 +219,33 @@ If you need to change the Judoscale integration configuration, you can pass a di
judoscale_rq(redis, extra_config={"LOG_LEVEL": "DEBUG"})
```

An example configuration dictionary accepted by `extra_config`:

```py
{
"LOG_LEVEL": "INFO",

# In addition to global configuration options for the Judoscale
# integration above, you can also specify the following configuration
# options for the RQ integration.
"RQ": {
# Enable (default) or disable the RQ integration
"ENABLED": True,

# Report metrics on up to MAX_QUEUES queues.
# The list of discovered queues are sorted by the length
# of the queue name (shortest first) and metrics are
# reported for the first MAX_QUEUES queues.
# Defaults to 20.
"MAX_QUEUES": 20,

# Specify a list of known queues to report metrics for.
# MAX_QUEUES is still honoured.
# Defaults to empty list (report metrics for discovered queues).
"QUEUES": []
}
```

### Judoscale with RQ and Flask

The recommended way to initialise Judoscale for RQ is in the application factory:
Expand Down
2 changes: 1 addition & 1 deletion judoscale/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def before_publish(*args, properties={}, **kwargs):
def judoscale_celery(celery: Celery, extra_config: Mapping = {}) -> None:
celery.conf.task_send_sent_event = True

judoconfig.merge(extra_config)
judoconfig.update(extra_config)
collector = CeleryMetricsCollector(config=judoconfig, broker=celery)
adapter = Adapter(
identifier="judoscale-celery",
Expand Down
23 changes: 19 additions & 4 deletions judoscale/celery/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from threading import Thread
from typing import List, Optional, Set

import redis.exceptions as re
from celery import Celery
from kombu import Connection
from redis import Redis
Expand All @@ -13,6 +12,12 @@
from judoscale.core.metric import Metric
from judoscale.core.metrics_collectors import JobMetricsCollector

DEFAULTS = {
"ENABLED": True,
"MAX_QUEUES": 20,
"QUEUES": [],
}


class TaskSentHandler(Thread):
def __init__(
Expand All @@ -27,7 +32,7 @@ def __init__(
super().__init__(*args, daemon=True, **kwargs)

def task_sent(self, event):
self.collector.queues.add(event["queue"])
self.collector._celery_queues.add(event["queue"])

def run(self):
logger.debug("Starting TaskSentHandler")
Expand All @@ -42,6 +47,8 @@ class CeleryMetricsCollector(JobMetricsCollector):
def __init__(self, config: Config, broker: Celery):
super().__init__(config=config)

self.config["CELERY"] = {**DEFAULTS, **self.config.get("RQ", {})}

self.broker = broker
connection = self.broker.connection_for_read()
if connection.transport.driver_name != "redis":
Expand All @@ -56,7 +63,7 @@ def __init__(self, config: Config, broker: Celery):
"Unsupported Redis server version. Minimum Redis version is 6.0."
)

self.queues: Set[str] = set()
self._celery_queues: Set[str] = set()
self.task_sent_handler = TaskSentHandler(self, connection)
logger.debug(f"Redis is at {self.redis.connection_pool}")

Expand All @@ -66,14 +73,22 @@ def __init__(self, config: Config, broker: Celery):
for q in self.redis.scan_iter(match="[^_]*", _type="list")
}
logger.debug(f"Found initial queues: {list(user_queues)}")
self.queues = user_queues - system_queues
self._celery_queues = user_queues - system_queues
self.task_sent_handler.start()

@property
def is_supported_redis_version(self):
major_version = int(self.redis.info()["redis_version"].split(".")[0])
return major_version >= 6

@property
def adapter_config(self):
return self.config["CELERY"]

@property
def _queues(self) -> List[str]:
return list(self._celery_queues)

def oldest_task(self, queue: str) -> Optional[dict]:
"""
Get the oldest task from the queue.
Expand Down
22 changes: 0 additions & 22 deletions judoscale/core/adapter_api_client.py

This file was deleted.

52 changes: 34 additions & 18 deletions judoscale/core/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import logging
import os
from collections import UserDict
from typing import Mapping

from judoscale.core.logger import logger

DEFAULTS = {"REPORT_INTERVAL_SECONDS": 10, "LOG_LEVEL": "WARN"}


class RuntimeContainer:
def __init__(self, service_name, instance, service_type):
Expand All @@ -23,14 +26,21 @@ def __str__(self):
return f"{self.service_name}.{self.instance}"


class Config:
class Config(UserDict):
def __init__(
self, runtime_container: RuntimeContainer, api_base_url: str, log_level: str
self, runtime_container: RuntimeContainer, api_base_url: str, env: Mapping
):
self.runtime_container = runtime_container
self.log_level = log_level
self.report_interval_seconds = 10
self.api_base_url = api_base_url
initialdata = dict(
DEFAULTS,
RUNTIME_CONTAINER=runtime_container,
API_BASE_URL=api_base_url,
)

for key in {"LOG_LEVEL", "RQ", "CELERY"}:
if key in env:
initialdata[key] = env[key]

super().__init__(initialdata)
self._prepare_logging()

@classmethod
Expand All @@ -40,7 +50,7 @@ def initialize(cls, env: Mapping = os.environ):
elif env.get("RENDER_INSTANCE_ID"):
return cls.for_render(env)
else:
return cls(None, "", "INFO")
return cls(None, "", env)

@classmethod
def for_heroku(cls, env: Mapping):
Expand All @@ -49,8 +59,7 @@ def for_heroku(cls, env: Mapping):

runtime_container = RuntimeContainer(service_name, instance, service_type)
api_base_url = env.get("JUDOSCALE_URL")
log_level = env.get("LOG_LEVEL", "INFO").upper()
return cls(runtime_container, api_base_url, log_level)
return cls(runtime_container, api_base_url, env)

@classmethod
def for_render(cls, env: Mapping):
Expand All @@ -60,23 +69,30 @@ def for_render(cls, env: Mapping):

runtime_container = RuntimeContainer(service_id, instance, service_type)
api_base_url = f"https://adapter.judoscale.com/api/{service_id}"
log_level = env.get("LOG_LEVEL", "INFO").upper()
return cls(runtime_container, api_base_url, log_level)

def merge(self, settings: Mapping):
for key, value in settings.items():
setattr(self, key.lower(), value)
return cls(runtime_container, api_base_url, env)

def update(self, new_config: Mapping):
for k, v in new_config.items():
if k in self and isinstance(self[k], dict) and isinstance(v, dict):
self[k].update(v)
else:
self[k] = v
self._prepare_logging()

def merge(self, new_config: Mapping):
logger.warning("Config.merge() is deprecated. Use Config.update() instead.")
self.update(new_config)

@property
def for_report(self):
# Only include the config options we want to include in the report
return {
"log_level": self.log_level,
"report_interval_seconds": self.report_interval_seconds,
"log_level": self["LOG_LEVEL"],
"report_interval_seconds": self["REPORT_INTERVAL_SECONDS"],
}

def _prepare_logging(self):
log_level = logging.getLevelName(self.log_level.upper())
log_level = logging.getLevelName(self["LOG_LEVEL"].upper())
logger.setLevel(log_level)

if not logger.handlers:
Expand Down
45 changes: 42 additions & 3 deletions judoscale/core/metrics_collectors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List, Protocol
from typing import List, Protocol, Set

from judoscale.core.config import Config
from judoscale.core.logger import logger
from judoscale.core.metric import Metric
from judoscale.core.metrics_store import MetricsStore

Expand Down Expand Up @@ -30,7 +31,7 @@ def __init__(self, config: Config):

@property
def should_collect(self):
return self.config.runtime_container.is_web_instance
return self.config["RUNTIME_CONTAINER"].is_web_instance

def add(self, metric: Metric):
"""
Expand All @@ -47,9 +48,47 @@ def collect(self) -> List[Metric]:


class JobMetricsCollector(MetricsCollector):
"""
Base class for job metrics collectors.
This class is not meant to be instantiated directly.
"""

def __init__(self, config: Config):
super().__init__(config=config)

@property
def adapter_config(self):
raise NotImplementedError("Implement `adapter_config` in a subclass.")

@property
def _queues(self) -> List[str]:
return list()

@property
def queues(self) -> Set[str]:
if configured_queues := self.adapter_config["QUEUES"]:
return self.limit_max_queues(configured_queues)
else:
return self.limit_max_queues(self._queues)

@property
def should_collect(self):
return not self.config.runtime_container.is_redundant_instance
return (
self.adapter_config["ENABLED"]
and not self.config["RUNTIME_CONTAINER"].is_redundant_instance
)

def limit_max_queues(self, queues: List[str]) -> Set[str]:
"""
Limit the number of queues to collect metrics for.
"""

max_queues = self.adapter_config["MAX_QUEUES"]

if len(queues) > max_queues:
logger.warning(
f"{self.__class__.__name__} reporting only {max_queues} queues max, "
f"skipping the rest ({len(queues) - max_queues})."
)

return set(sorted(queues, key=lambda q: len(q))[:max_queues])
Loading

0 comments on commit e2d3262

Please sign in to comment.