From 992f6090f3f3c393e5b7cbb8fbeb9f540491e006 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 14:32:31 -0600 Subject: [PATCH] feat(targets): Added a new built-in setting `activate_version` for targets to optionally disable processing of `ACTIVATE_VERSION` messages --- singer_sdk/helpers/capabilities.py | 8 ++++++++ singer_sdk/sinks/core.py | 9 +++++++++ singer_sdk/target_base.py | 13 +++++++++++++ 3 files changed, 30 insertions(+) diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index 0f5c42918..b23ae0f00 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -144,6 +144,14 @@ description="The default target database schema name to use for all streams.", ), ).to_dict() +ACTIVATE_VERSION_CONFIG = PropertiesList( + Property( + "activate_version", + BooleanType, + title="Process `ACTIVATE_VERSION` messages", + description="Whether to process `ACTIVATE_VERSION` messages.", + ), +).to_dict() ADD_RECORD_METADATA_CONFIG = PropertiesList( Property( "add_record_metadata", diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 5070655d4..7d9d24a3c 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -399,6 +399,15 @@ def include_sdc_metadata_properties(self) -> bool: """ return self.config.get("add_record_metadata", False) + @property + def process_activate_version_messages(self) -> bool: + """Check if activate version messages should be processed. + + Returns: + True if activate version messages should be processed. + """ + return self.config.get("activate_version", True) + @property def datetime_error_treatment(self) -> DatetimeErrorTreatmentEnum: """Return a treatment to use for datetime parse errors: ERROR. MAX, or NULL. diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 8907b6201..b361698ce 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -454,6 +454,19 @@ def _process_activate_version_message(self, message_dict: dict) -> None: for stream_map in self.mapper.stream_maps[stream_name]: sink = self.get_sink(stream_map.stream_alias) + if not sink.process_activate_version_messages: + self.logger.warning( + "Activate version messages are not enabled for '%s'. Ignoring.", + stream_map.stream_alias, + ) + continue + if not sink.include_sdc_metadata_properties: + self.logger.warning( + "ACTIVATE_VERSION requires _sdc_* metadata properties to be " + "included. Set `add_record_metadata` to `True` if you wanna use " + "this feature." + ) + continue sink.activate_version(message_dict["version"]) def _process_batch_message(self, message_dict: dict) -> None: