From fe4d24949795d53a2937628b8a65e21b81586b2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 24 Sep 2024 10:54:06 -0600 Subject: [PATCH] feat: Taps can now opt-in to emitting `ACTIVATE_VERSION` messages for streams with `FULL_TABLE` replication --- singer_sdk/helpers/_batch.py | 3 +++ singer_sdk/sinks/sql.py | 2 +- singer_sdk/streams/core.py | 22 ++++++++++++++++++++-- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/singer_sdk/helpers/_batch.py b/singer_sdk/helpers/_batch.py index 2e4ae4615..2f7d8da6b 100644 --- a/singer_sdk/helpers/_batch.py +++ b/singer_sdk/helpers/_batch.py @@ -92,6 +92,9 @@ class SDKBatchMessage(Message): manifest: list[str] = field(default_factory=list) """The manifest of files in the batch.""" + version: int | None = None + """If syncing in FULL_TABLE mode, the start time as an epoch timestamp int.""" + def __post_init__(self) -> None: if isinstance(self.encoding, dict): self.encoding = BaseBatchFileEncoding.from_dict(self.encoding) diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 0f7695ef0..6472ed710 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -91,7 +91,7 @@ def schema_name(self) -> str | None: Returns: The target schema name. """ - # Look for a default_target_scheme in the configuraion fle + # Look for a default_target_scheme in the configuration fle default_target_schema: str = self.config.get("default_target_schema", None) parts = self.stream_name.split("-") diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index c65bc87dc..653fe960b 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -6,6 +6,7 @@ import copy import datetime import json +import time import typing as t from os import PathLike from pathlib import Path @@ -136,6 +137,8 @@ def __init__( self._config: dict = dict(tap.config) self._tap = tap self._tap_state = tap.state + self._stream_version: int | None = None + self._initialized_at = tap.initialized_at self._tap_input_catalog: singer.Catalog | None = None self._stream_maps: list[StreamMap] | None = None self.forced_replication_method: str | None = None @@ -452,7 +455,14 @@ def get_replication_key_signpost( Returns: Max allowable bookmark value for this stream's replication key. """ - return utc_now() if self.is_timestamp_replication_key else None + return ( + datetime.datetime.fromtimestamp( + self._initialized_at, + tz=datetime.timezone.utc, + ) + if self.is_timestamp_replication_key + else None + ) @property def schema_filepath(self) -> Path | Traversable | None: @@ -811,6 +821,10 @@ def _write_state_message(self) -> None: self._last_emitted_state = copy.deepcopy(self.tap_state) self._is_state_flushed = True + def _write_activate_version_message(self, full_table_version: int) -> None: # noqa: PLR6301 + """Write out an ACTIVATE_VERSION message.""" + singer.write_message(singer.ActivateVersionMessage(full_table_version)) + def _generate_schema_messages( self, ) -> t.Generator[singer.SchemaMessage, None, None]: @@ -876,7 +890,7 @@ def _generate_record_messages( yield singer.RecordMessage( stream=stream_map.stream_alias, record=mapped_record, - version=None, + version=self._stream_version, time_extracted=utc_now(), ) @@ -1222,6 +1236,10 @@ def sync(self, context: types.Context | None = None) -> None: if self.selected: self._write_schema_message() + if self.replication_method == REPLICATION_FULL_TABLE: + self._stream_version = int(time.time()) + self._write_activate_version_message(self._stream_version) + try: batch_config = self.get_batch_config(self.config) if batch_config: