Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-47529: Migrate Prompt Processing PipelinesConfig to YAML #222

Merged
merged 7 commits into from
Nov 25, 2024
28 changes: 24 additions & 4 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import sys
import time
import signal
import yaml
import uuid

import boto3
Expand Down Expand Up @@ -75,15 +76,34 @@
kafka_group_id = str(uuid.uuid4())
# The topic on which to listen to updates to image_bucket
bucket_topic = os.environ.get("BUCKET_TOPIC", "rubin-prompt-processing")
# The preprocessing pipelines to execute and the conditions in which to choose them.
pre_pipelines = PipelinesConfig(os.environ["PREPROCESSING_PIPELINES_CONFIG"])
# The main pipelines to execute and the conditions in which to choose them.
main_pipelines = PipelinesConfig(os.environ["MAIN_PIPELINES_CONFIG"])

_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)


def _config_from_yaml(yaml_string):
"""Initialize a PipelinesConfig from a YAML-formatted string.

Parameters
----------
yaml_string : `str`
A YAML representation of the structured config. See
`~activator.config.PipelineConfig` for details.

Returns
-------
config : `activator.config.PipelineConfig`
The corresponding config object.
"""
return PipelinesConfig(yaml.safe_load(yaml_string))


# The preprocessing pipelines to execute and the conditions in which to choose them.
pre_pipelines = _config_from_yaml(os.environ["PREPROCESSING_PIPELINES_CONFIG"])
# The main pipelines to execute and the conditions in which to choose them.
main_pipelines = _config_from_yaml(os.environ["MAIN_PIPELINES_CONFIG"])


def find_local_repos(base_path):
"""Search for existing local repos.

Expand Down
236 changes: 134 additions & 102 deletions python/activator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import collections
import collections.abc
import os
import re
import typing

from .visit import FannedOutVisit

Expand All @@ -41,138 +41,171 @@ class PipelinesConfig:

Parameters
----------
config : `str`
A string describing pipeline selection criteria. The current format is
a space-delimited list of mappings, each of which has the format
``(survey="<survey>")=[<pipelines>]``. The zero or more pipelines are
comma-delimited, and each pipeline path may contain environment
variables. The list may be replaced by the keyword "None" to mean no
pipeline should be run. No key or value may contain the "=" character,
and no two pipelines may share the same name.
See examples below.

Notes
-----
While it is not expected that there will ever be more than one
PipelinesConfig instance in a program's lifetime, this class is *not* a
singleton and objects must be passed explicitly to the code that
needs them.
config : sequence [mapping]
A sequence of mappings ("nodes"), each with the following keys:

``"survey"``
The survey that triggers the pipelines (`str`, optional).
``"pipelines"``
A list of zero or more pipelines (sequence [`str`] or `None`). Each
pipeline path may contain environment variables, and the list of
pipelines may be replaced by `None` to mean no pipeline should
be run.

Nodes are arranged by precedence, i.e., the first node that matches a
visit is used. A node containing only ``pipelines`` is unconstrained
and matches *any* visit.

Examples
--------
A single-survey, single-pipeline config:

>>> PipelinesConfig('(survey="TestSurvey")=[/etc/pipelines/SingleFrame.yaml]') # doctest: +ELLIPSIS
>>> PipelinesConfig([{"survey": "TestSurvey",
... "pipelines": ["/etc/pipelines/SingleFrame.yaml"]},
... ]) # doctest: +ELLIPSIS
<config.PipelinesConfig object at 0x...>

A config with multiple surveys and pipelines, and environment variables:

>>> PipelinesConfig('(survey="TestSurvey")=[/etc/pipelines/ApPipe.yaml, /etc/pipelines/ISR.yaml] '
... '(survey="Camera Test")=[${AP_PIPE_DIR}/pipelines/LSSTComCam/Isr.yaml] '
... '(survey="")=[${AP_PIPE_DIR}/pipelines/LSSTComCam/Isr.yaml] ')
... # doctest: +ELLIPSIS
>>> PipelinesConfig([{"survey": "TestSurvey",
... "pipelines": ["/etc/pipelines/ApPipe.yaml", "/etc/pipelines/ISR.yaml"]},
... {"survey": "Camera Test",
... "pipelines": ["${AP_PIPE_DIR}/pipelines/LSSTComCam/Isr.yaml"]},
... {"survey": "",
... "pipelines": ["${AP_PIPE_DIR}/pipelines/LSSTComCam/Isr.yaml"]},
... ]) # doctest: +ELLIPSIS
<config.PipelinesConfig object at 0x...>

A config that omits a pipeline for non-sky data:

>>> PipelinesConfig('(survey="TestSurvey")=[/etc/pipelines/ApPipe.yaml] '
... '(survey="Dome Flats")=None ') # doctest: +ELLIPSIS
>>> PipelinesConfig([{"survey": "TestSurvey",
"pipelines": ["/etc/pipelines/ApPipe.yaml"]},
{"survey": "Dome Flats",
"pipelines": None},
... ]) # doctest: +ELLIPSIS
<config.PipelinesConfig object at 0x...>
"""

def __init__(self, config: str):
class _Spec:
"""A single case of which pipelines should be run in particular
circumstances.

Parameters
----------
config : mapping [`str`]
A config node with the same keys as documented for the
`PipelinesConfig` constructor.
"""

def __init__(self, config: collections.abc.Mapping[str, typing.Any]):
specs = dict(config)
try:
pipelines_value = specs.pop('pipelines')
if pipelines_value is None or pipelines_value == "None":
self._filenames = []
elif isinstance(pipelines_value, str): # Strings are sequences!
raise ValueError(f"Pipelines spec must be list or None, got {pipelines_value}")
elif isinstance(pipelines_value, collections.abc.Sequence):
if any(not isinstance(x, str) for x in pipelines_value):
raise ValueError(f"Pipeline list {pipelines_value} has invalid paths.")
self._filenames = pipelines_value
else:
raise ValueError(f"Pipelines spec must be list or None, got {pipelines_value}")
self._check_pipelines(self._filenames)

survey_value = specs.pop('survey', None)
if isinstance(survey_value, str) or survey_value is None:
self._survey = survey_value
else:
raise ValueError(f"{survey_value} is not a valid survey name.")

if specs:
raise ValueError(f"Got unexpected keywords: {specs.keys()}")
except KeyError as e:
raise ValueError from e

@staticmethod
def _check_pipelines(pipelines: collections.abc.Sequence[str]):
"""Test the correctness of a list of pipelines.

At present, the only test is that no two pipelines have the same
filename, which is used as a pipeline ID elsewhere in the Prompt
Processing service.

Parameters
----------
pipelines : sequence [`str`]

Raises
------
ValueError
Raised if the pipeline list is invalid.
"""
filenames = collections.Counter(os.path.splitext(os.path.basename(path))[0] for path in pipelines)
duplicates = [filename for filename, num in filenames.items() if num > 1]
if duplicates:
raise ValueError(f"Pipeline names must be unique, found multiple copies of {duplicates}.")

def matches(self, visit: FannedOutVisit) -> bool:
"""Test whether a visit matches the conditions for this spec.

Parameters
----------
visit : `activator.visit.FannedOutVisit`
The visit to test against this spec.

Returns
-------
matches : `bool`
`True` if the visit meets all conditions, `False` otherwise.
"""
if self._survey is not None and self._survey != visit.survey:
return False
return True

@property
def pipeline_files(self) -> collections.abc.Sequence[str]:
"""An ordered list of pipelines to run in this spec (sequence [`str`]).
"""
return self._filenames

def __init__(self, config: collections.abc.Sequence):
if not config:
raise ValueError("Must configure at least one pipeline.")

self._mapping = self._parse_config(config)

for pipelines in self._mapping.values():
self._check_pipelines(pipelines)
self._specs = self._expand_config(config)

@staticmethod
def _parse_config(config: str) -> collections.abc.Mapping:
"""Turn a config string into structured config information.
def _expand_config(config: collections.abc.Sequence) -> collections.abc.Sequence[_Spec]:
"""Turn a config spec into structured config information.

Parameters
----------
config : `str`
A string describing pipeline selection criteria. The current format
a space-delimited list of mappings, each of which has the format
``(survey="<survey>")=[<pipelines>]``. The zero or more pipelines
are comma-delimited, and each pipeline path may contain environment
variables. The list may be replaced by the keyword "None" to mean
no pipeline should be run. No key or value may contain the "="
character.
config : sequence [mapping]
A sequence of mappings, see class docs for details.

Returns
-------
config : mapping [`str`, `list` [`str`]]
A mapping from the survey type to the pipeline(s) to run for that
survey. A more complex key or container type may be needed in the
future, if other pipeline selection criteria are added.
config : sequence [`PipelinesConfig._Spec`]
A sequence of node objects specifying the pipeline(s) to run and the
conditions in which to run them.

Raises
------
ValueError
Raised if the string cannot be parsed.
Raised if the input config is invalid.
"""
# Use regex instead of str.split, in case keys or values also have
# spaces.
# Allow anything between the [ ] to avoid catastrophic backtracking
# when the input is invalid. If pickier matching is needed in the
# future, use a separate regex for filelist instead of making node
# more complex.
node = re.compile(r'\s*\(survey="(?P<survey>[^"\n=]*)"\)='
r'(?:\[(?P<filelist>[^]]*)\]|none)(?:\s+|$)',
re.IGNORECASE)

items = {}
pos = 0
match = node.match(config, pos)
while match:
if match['filelist']: # exclude None and ""; latter gives unexpected behavior with split
filenames = []
for file in match['filelist'].split(','):
file = file.strip()
if "\n" in file:
raise ValueError(f"Unexpected newline in '{file}'.")
filenames.append(file)
items[match['survey']] = filenames
else:
items[match['survey']] = []

pos = match.end()
match = node.match(config, pos)
if pos != len(config):
raise ValueError(f"Unexpected text at position {pos}: '{config[pos:]}'.")

items = []
for node in config:
items.append(PipelinesConfig._Spec(node))
return items

@staticmethod
def _check_pipelines(pipelines: collections.abc.Sequence[str]):
"""Test the correctness of a list of pipelines.

At present, the only test is that no two pipelines have the same
filename, which is used as a pipeline ID elsewhere in the Prompt
Processing service.

Parameters
----------
pipelines : sequence [`str`]

Raises
------
ValueError
Raised if the pipeline list is invalid.
"""
filenames = collections.Counter(os.path.splitext(os.path.basename(path))[0] for path in pipelines)
duplicates = [filename for filename, num in filenames.items() if num > 1]
if duplicates:
raise ValueError(f"Pipeline names must be unique, found multiple copies of {duplicates}.")

def get_pipeline_files(self, visit: FannedOutVisit) -> str:
def get_pipeline_files(self, visit: FannedOutVisit) -> list[str]:
"""Identify the pipeline to be run, based on the provided visit.

The first node that matches the visit is returned, and no other nodes
are considered even if they would provide a "tighter" match.

Parameters
----------
visit : `activator.visit.FannedOutVisit`
Expand All @@ -184,8 +217,7 @@ def get_pipeline_files(self, visit: FannedOutVisit) -> str:
Path(s) to the configured pipeline file(s). An empty list means
that *no* pipeline should be run on this visit.
"""
try:
values = self._mapping[visit.survey]
except KeyError as e:
raise RuntimeError(f"Unsupported survey: {visit.survey}") from e
return [os.path.expandvars(path) for path in values]
for node in self._specs:
if node.matches(visit):
return [os.path.expandvars(path) for path in node.pipeline_files]
raise RuntimeError(f"Unsupported survey: {visit.survey}")
Loading
Loading