Skip to content

Commit

Permalink
feat(targets): Added a new built-in setting activate_version for ta…
Browse files Browse the repository at this point in the history
…rgets to optionally disable processing of `ACTIVATE_VERSION` messages
  • Loading branch information
edgarrmondragon committed Nov 29, 2024
1 parent 3ad4615 commit 992f609
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 0 deletions.
8 changes: 8 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@
description="The default target database schema name to use for all streams.",
),
).to_dict()
ACTIVATE_VERSION_CONFIG = PropertiesList(
Property(
"activate_version",
BooleanType,
title="Process `ACTIVATE_VERSION` messages",
description="Whether to process `ACTIVATE_VERSION` messages.",
),
).to_dict()
ADD_RECORD_METADATA_CONFIG = PropertiesList(
Property(
"add_record_metadata",
Expand Down
9 changes: 9 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,15 @@ def include_sdc_metadata_properties(self) -> bool:
"""
return self.config.get("add_record_metadata", False)

@property
def process_activate_version_messages(self) -> bool:
"""Check if activate version messages should be processed.
Returns:
True if activate version messages should be processed.
"""
return self.config.get("activate_version", True)

@property
def datetime_error_treatment(self) -> DatetimeErrorTreatmentEnum:
"""Return a treatment to use for datetime parse errors: ERROR. MAX, or NULL.
Expand Down
13 changes: 13 additions & 0 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,19 @@ def _process_activate_version_message(self, message_dict: dict) -> None:

for stream_map in self.mapper.stream_maps[stream_name]:
sink = self.get_sink(stream_map.stream_alias)
if not sink.process_activate_version_messages:
self.logger.warning(
"Activate version messages are not enabled for '%s'. Ignoring.",
stream_map.stream_alias,
)
continue
if not sink.include_sdc_metadata_properties:
self.logger.warning(
"ACTIVATE_VERSION requires _sdc_* metadata properties to be "
"included. Set `add_record_metadata` to `True` if you wanna use "
"this feature."
)
continue
sink.activate_version(message_dict["version"])

def _process_batch_message(self, message_dict: dict) -> None:
Expand Down

0 comments on commit 992f609

Please sign in to comment.