diff --git a/python/activator/activator.py b/python/activator/activator.py index 47ec126e..d4c35e3a 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -32,12 +32,14 @@ import signal import yaml import uuid +import socket import boto3 from botocore.handlers import validate_bucket_name import cloudevents.http import confluent_kafka as kafka import flask +import activator.repo_tracker from .config import PipelinesConfig from .exception import GracefulShutdownInterrupt, IgnorableVisit, InvalidVisitError, \ @@ -52,8 +54,13 @@ ) from .repo_tracker import LocalRepoTracker from .visit import FannedOutVisit +from confluent_kafka.serialization import SerializationContext, MessageField +from confluent_kafka.schema_registry import SchemaRegistryClient +from confluent_kafka.schema_registry.avro import AvroDeserializer +from confluent_kafka import KafkaException, KafkaError - +# Platform that prompt processing will run on +platform = os.environ["PLATFORM"] # The short name for the instrument. instrument_name = os.environ["RUBIN_INSTRUMENT"] # The skymap to use in the central repo @@ -76,6 +83,28 @@ kafka_group_id = str(uuid.uuid4()) # The topic on which to listen to updates to image_bucket bucket_topic = os.environ.get("BUCKET_TOPIC", "rubin-prompt-processing") +# Kafka Schema Registry URL for next visit fan out messages +fan_out_schema_registry_url = os.environ["FAN_OUT_SCHEMA_REGISTRY_URL"] +# Kafka cluster with next visit fanned out messages. +fan_out_kafka_cluster = os.environ["FAN_OUT_KAFKA_CLUSTER"] +# Kafka group for next visit fan out messages. +fan_out_kafka_group_id = os.environ["FAN_OUT_KAFKA_GROUP_ID"] +# Kafka topic for next visit fan out messages. +fan_out_kafka_topic = os.environ["FAN_OUT_KAFKA_TOPIC"] +# Kafka topic offset for next visit fan out messages. +fan_out_kafka_topic_offset = os.environ["FAN_OUT_KAFKA_TOPIC_OFFSET"] +# Kafka Fan Out SASL Mechansim. +fan_out_kafka_sasl_mechanism = os.environ["FAN_OUT_KAFKA_SASL_MECHANISM"] +# Kafka Fan Out Security Protocol. +fan_out_kafka_security_protocol = os.environ["FAN_OUT_KAFKA_SECURITY_PROTOCOL"] +# Kafka Fan Out Consumer Username. +fan_out_kafka_sasl_username = os.environ["FAN_OUT_KAFKA_SASL_USERNAME"] +# Kafka Fan Out Consumer Password. +fan_out_kafka_sasl_password = os.environ["FAN_OUT_KAFKA_SASL_PASSWORD"] +# Offset for Kafka bucket notification. +bucket_notification_kafka_offset_reset = os.environ["BUCKET_NOTIFICATION_KAFKA_OFFSET_RESET"] +# Time to wait for fanned out messages before spawning new pod. +fanned_out_msg_listen_timeout = int(os.environ.get("FANNED_OUT_MSG_LISTEN_TIMEOUT", 300)) _log = logging.getLogger("lsst." + __name__) _log.setLevel(logging.DEBUG) @@ -127,7 +156,7 @@ def _get_consumer(): return kafka.Consumer({ "bootstrap.servers": kafka_cluster, "group.id": kafka_group_id, - "auto.offset.reset": "latest", # default, but make explicit + "auto.offset.reset": bucket_notification_kafka_offset_reset, }) @@ -195,6 +224,110 @@ def create_app(): sys.exit(3) +def dict_to_fanned_out_visit(obj, ctx): + """ + Converts object literal(dict) to a Fanned Out instance. + Args: + ctx (SerializationContext): Metadata pertaining to the serialization + operation. + obj (dict): Object literal(dict) + """ + + if obj is None: + return None + + return FannedOutVisit(salIndex=obj['salIndex'], + scriptSalIndex=obj['scriptSalIndex'], + groupId=obj['groupId'], + coordinateSystem=obj['coordinateSystem'], + position=obj['position'], + startTime=obj['startTime'], + rotationSystem=obj['rotationSystem'], + cameraAngle=obj['cameraAngle'], + filters=obj['filters'], + dome=obj['dome'], + duration=obj['duration'], + nimages=obj['nimages'], + instrument=obj['instrument'], + survey=obj['survey'], + totalCheckpoints=obj['totalCheckpoints'], + detector=obj['detector'], + private_sndStamp=obj['private_sndStamp']) + + +def keda_start(): + + setup_usdf_logger( + labels={"instrument": instrument_name}, + ) + + # Initialize local registry + registry = activator.repo_tracker.LocalRepoTracker.get() + registry.init_tracker() + + # Check initialization and abort early + _get_consumer() + _get_storage_client() + _get_central_butler() + _get_local_repo() + + # Initiialize schema registry for fan out + fan_out_schema_registry_conf = {'url': fan_out_schema_registry_url} + fan_out_schema_registry_client = SchemaRegistryClient(fan_out_schema_registry_conf) + + fan_out_avro_deserializer = AvroDeserializer(schema_registry_client=fan_out_schema_registry_client, + from_dict=dict_to_fanned_out_visit) + + fan_out_consumer_conf = { + "bootstrap.servers": fan_out_kafka_cluster, + "group.id": fan_out_kafka_group_id, + "auto.offset.reset": fan_out_kafka_topic_offset, + "sasl.mechanism": fan_out_kafka_sasl_mechanism, + "security.protocol": fan_out_kafka_security_protocol, + "sasl.username": fan_out_kafka_sasl_username, + "sasl.password": fan_out_kafka_sasl_password, + 'enable.auto.commit': False + } + + fan_out_consumer = kafka.Consumer(fan_out_consumer_conf, logger=_log) + + _log.info("starting fan out consumer") + + try: + fan_out_consumer.subscribe([fan_out_kafka_topic]) + + fan_out_listen_start_time = time.time() + + while time.time() - fan_out_listen_start_time < fanned_out_msg_listen_timeout: + fan_out_consumer.subscribe([fan_out_kafka_topic]) + fan_out_message = fan_out_consumer.poll(timeout=5) + if fan_out_message is None: + continue + if fan_out_message.error(): + if fan_out_message.error().code() == KafkaError._PARTITION_EOF: + # End of partition event + _log.warning('reached end of offset') + elif fan_out_message.error(): + raise KafkaException(fan_out_message.error()) + else: + deserialized_fan_out_visit = fan_out_avro_deserializer(fan_out_message.value(), + SerializationContext( + fan_out_message.topic(), + MessageField.VALUE)) + _log.info("Unpacked message as %r.", deserialized_fan_out_visit) + _log.info("Message timestamp %r", fan_out_message.timestamp()) + fan_out_kafka_msg_timestamp = fan_out_message.timestamp() + fan_out_to_prompt_time = int(time.time() * 1000) - fan_out_kafka_msg_timestamp[1] + _log.info("Seconds since fan out message delivered %r", fan_out_to_prompt_time/1000) + fan_out_consumer.commit(message=fan_out_message, asynchronous=False) + fan_out_consumer.close() + process_visit(deserialized_fan_out_visit) + _log.info("Processing completed for %s", socket.gethostname()) + break + finally: + _log.info("Finished listening for fanned out messages") + + def _graceful_shutdown(signum: int, stack_frame): """Signal handler for cases where the service should gracefully shut down. @@ -268,6 +401,12 @@ def parse_next_visit(http_request): raise ValueError("no CloudEvent received") if not event.data: raise ValueError("empty CloudEvent received") + + # Calculate time to load knative and receive message based on time header from knative request + _log.info("Fan out send event at %s", event['time']) + fan_out_knative_msg_timestamp = float(event['time']) + fan_out_to_prompt_time = time.time() - fan_out_knative_msg_timestamp + _log.info("Seconds since fan out message delivered %r", fan_out_to_prompt_time) # Message format is determined by the nextvisit-start deployment. data = json.loads(event.data) @@ -589,8 +728,16 @@ def server_error(e: Exception) -> tuple[str, int]: def main(): # This function is only called in test environments. Container # deployments call `create_app()()` through Gunicorn. - app = create_app() - app.run(host="127.0.0.1", port=8080, debug=True) + if platform == "knative": + _log.info("starting knative instance") + app = create_app() + app.run(host="127.0.0.1", port=8080, debug=True) + # starts keda instance of the application + elif platform == "keda": + _log.info("starting keda instance") + keda_start() + else: + _log.info("no platform defined") if __name__ == "__main__":