Skip to content

Commit

Permalink
Adds support for keda by selecting platform based on environment vari…
Browse files Browse the repository at this point in the history
…able. A timeout is defined for the kafka message polling to obtain a fan out next visit message. Adds conditional for to obtain timing information from knative. For Keda timing information is from the Kafka message header.
  • Loading branch information
dspeck1 committed Nov 26, 2024
1 parent dd18caa commit a2d4dee
Showing 1 changed file with 151 additions and 4 deletions.
155 changes: 151 additions & 4 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import signal
import yaml
import uuid
import socket

import boto3
from botocore.handlers import validate_bucket_name
import cloudevents.http
import confluent_kafka as kafka
import flask
import activator.repo_tracker

from .config import PipelinesConfig
from .exception import GracefulShutdownInterrupt, IgnorableVisit, InvalidVisitError, \
Expand All @@ -52,8 +54,13 @@
)
from .repo_tracker import LocalRepoTracker
from .visit import FannedOutVisit
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka import KafkaException, KafkaError


# Platform that prompt processing will run on
platform = os.environ["PLATFORM"]
# The short name for the instrument.
instrument_name = os.environ["RUBIN_INSTRUMENT"]
# The skymap to use in the central repo
Expand All @@ -76,6 +83,28 @@
kafka_group_id = str(uuid.uuid4())
# The topic on which to listen to updates to image_bucket
bucket_topic = os.environ.get("BUCKET_TOPIC", "rubin-prompt-processing")
# Kafka Schema Registry URL for next visit fan out messages
fan_out_schema_registry_url = os.environ["FAN_OUT_SCHEMA_REGISTRY_URL"]
# Kafka cluster with next visit fanned out messages.
fan_out_kafka_cluster = os.environ["FAN_OUT_KAFKA_CLUSTER"]
# Kafka group for next visit fan out messages.
fan_out_kafka_group_id = os.environ["FAN_OUT_KAFKA_GROUP_ID"]
# Kafka topic for next visit fan out messages.
fan_out_kafka_topic = os.environ["FAN_OUT_KAFKA_TOPIC"]
# Kafka topic offset for next visit fan out messages.
fan_out_kafka_topic_offset = os.environ["FAN_OUT_KAFKA_TOPIC_OFFSET"]
# Kafka Fan Out SASL Mechansim.
fan_out_kafka_sasl_mechanism = os.environ["FAN_OUT_KAFKA_SASL_MECHANISM"]
# Kafka Fan Out Security Protocol.
fan_out_kafka_security_protocol = os.environ["FAN_OUT_KAFKA_SECURITY_PROTOCOL"]
# Kafka Fan Out Consumer Username.
fan_out_kafka_sasl_username = os.environ["FAN_OUT_KAFKA_SASL_USERNAME"]
# Kafka Fan Out Consumer Password.
fan_out_kafka_sasl_password = os.environ["FAN_OUT_KAFKA_SASL_PASSWORD"]
# Offset for Kafka bucket notification.
bucket_notification_kafka_offset_reset = os.environ["BUCKET_NOTIFICATION_KAFKA_OFFSET_RESET"]
# Time to wait for fanned out messages before spawning new pod.
fanned_out_msg_listen_timeout = int(os.environ.get("FANNED_OUT_MSG_LISTEN_TIMEOUT", 300))

_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -127,7 +156,7 @@ def _get_consumer():
return kafka.Consumer({
"bootstrap.servers": kafka_cluster,
"group.id": kafka_group_id,
"auto.offset.reset": "latest", # default, but make explicit
"auto.offset.reset": bucket_notification_kafka_offset_reset,
})


Expand Down Expand Up @@ -195,6 +224,110 @@ def create_app():
sys.exit(3)


def dict_to_fanned_out_visit(obj, ctx):
"""
Converts object literal(dict) to a Fanned Out instance.
Args:
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
obj (dict): Object literal(dict)
"""

if obj is None:
return None

return FannedOutVisit(salIndex=obj['salIndex'],
scriptSalIndex=obj['scriptSalIndex'],
groupId=obj['groupId'],
coordinateSystem=obj['coordinateSystem'],
position=obj['position'],
startTime=obj['startTime'],
rotationSystem=obj['rotationSystem'],
cameraAngle=obj['cameraAngle'],
filters=obj['filters'],
dome=obj['dome'],
duration=obj['duration'],
nimages=obj['nimages'],
instrument=obj['instrument'],
survey=obj['survey'],
totalCheckpoints=obj['totalCheckpoints'],
detector=obj['detector'],
private_sndStamp=obj['private_sndStamp'])


def keda_start():

setup_usdf_logger(
labels={"instrument": instrument_name},
)

# Initialize local registry
registry = activator.repo_tracker.LocalRepoTracker.get()
registry.init_tracker()

# Check initialization and abort early
_get_consumer()
_get_storage_client()
_get_central_butler()
_get_local_repo()

# Initiialize schema registry for fan out
fan_out_schema_registry_conf = {'url': fan_out_schema_registry_url}
fan_out_schema_registry_client = SchemaRegistryClient(fan_out_schema_registry_conf)

fan_out_avro_deserializer = AvroDeserializer(schema_registry_client=fan_out_schema_registry_client,
from_dict=dict_to_fanned_out_visit)

fan_out_consumer_conf = {
"bootstrap.servers": fan_out_kafka_cluster,
"group.id": fan_out_kafka_group_id,
"auto.offset.reset": fan_out_kafka_topic_offset,
"sasl.mechanism": fan_out_kafka_sasl_mechanism,
"security.protocol": fan_out_kafka_security_protocol,
"sasl.username": fan_out_kafka_sasl_username,
"sasl.password": fan_out_kafka_sasl_password,
'enable.auto.commit': False
}

fan_out_consumer = kafka.Consumer(fan_out_consumer_conf, logger=_log)

_log.info("starting fan out consumer")

try:
fan_out_consumer.subscribe([fan_out_kafka_topic])

fan_out_listen_start_time = time.time()

while time.time() - fan_out_listen_start_time < fanned_out_msg_listen_timeout:
fan_out_consumer.subscribe([fan_out_kafka_topic])
fan_out_message = fan_out_consumer.poll(timeout=5)
if fan_out_message is None:
continue
if fan_out_message.error():
if fan_out_message.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
_log.warning('reached end of offset')
elif fan_out_message.error():
raise KafkaException(fan_out_message.error())
else:
deserialized_fan_out_visit = fan_out_avro_deserializer(fan_out_message.value(),
SerializationContext(
fan_out_message.topic(),
MessageField.VALUE))
_log.info("Unpacked message as %r.", deserialized_fan_out_visit)
_log.info("Message timestamp %r", fan_out_message.timestamp())
fan_out_kafka_msg_timestamp = fan_out_message.timestamp()
fan_out_to_prompt_time = int(time.time() * 1000) - fan_out_kafka_msg_timestamp[1]
_log.info("Seconds since fan out message delivered %r", fan_out_to_prompt_time/1000)
fan_out_consumer.commit(message=fan_out_message, asynchronous=False)
fan_out_consumer.close()
process_visit(deserialized_fan_out_visit)
_log.info("Processing completed for %s", socket.gethostname())
break
finally:
_log.info("Finished listening for fanned out messages")


def _graceful_shutdown(signum: int, stack_frame):
"""Signal handler for cases where the service should gracefully shut down.
Expand Down Expand Up @@ -268,6 +401,12 @@ def parse_next_visit(http_request):
raise ValueError("no CloudEvent received")
if not event.data:
raise ValueError("empty CloudEvent received")

# Calculate time to load knative and receive message based on time header from knative request
_log.info("Fan out send event at %s", event['time'])
fan_out_knative_msg_timestamp = float(event['time'])
fan_out_to_prompt_time = time.time() - fan_out_knative_msg_timestamp
_log.info("Seconds since fan out message delivered %r", fan_out_to_prompt_time)

# Message format is determined by the nextvisit-start deployment.
data = json.loads(event.data)
Expand Down Expand Up @@ -589,8 +728,16 @@ def server_error(e: Exception) -> tuple[str, int]:
def main():
# This function is only called in test environments. Container
# deployments call `create_app()()` through Gunicorn.
app = create_app()
app.run(host="127.0.0.1", port=8080, debug=True)
if platform == "knative":
_log.info("starting knative instance")
app = create_app()
app.run(host="127.0.0.1", port=8080, debug=True)
# starts keda instance of the application
elif platform == "keda":
_log.info("starting keda instance")
keda_start()
else:
_log.info("no platform defined")


if __name__ == "__main__":
Expand Down

0 comments on commit a2d4dee

Please sign in to comment.