Skip to content

Commit

Permalink
fix: Respect standard Singer stream metadata table-key-properties, …
Browse files Browse the repository at this point in the history
…`replication-key` and `forced-replication-method` (#2770)

* fix: Respect standard Singer stream metadata for key properties, replication key and replication method

- https://hub.meltano.com/singer/spec/#metadata
- https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#metadata

* Add test

* Fix typo in docs

* Test non-null replication key override
  • Loading branch information
edgarrmondragon authored Nov 21, 2024
1 parent 8dcf496 commit a030c80
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 5 deletions.
2 changes: 1 addition & 1 deletion docs/implementation/catalog_metadata.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Catalog Metadata

The SDK automatically generates catalog metadata during catalog discovery. Selection rules overrided by a user will be respected.
The SDK automatically generates catalog metadata during catalog discovery. Selection rules overridden by a user will be respected.

Primary key properties may not be deselected, as these are required for `key_properties` to be declared in stream messages.

Expand Down
1 change: 1 addition & 0 deletions singer_sdk/_singerlib/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class StreamMetadata(Metadata):
"""Stream metadata."""

table_key_properties: t.Sequence[str] | None = None
replication_key: str | None = None
forced_replication_method: str | None = None
valid_replication_keys: list[str] | None = None
schema_name: str | None = None
Expand Down
25 changes: 21 additions & 4 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
if t.TYPE_CHECKING:
import logging

from singer_sdk._singerlib.catalog import StreamMetadata
from singer_sdk.helpers import types
from singer_sdk.helpers._compat import Traversable
from singer_sdk.tap_base import Tap
Expand Down Expand Up @@ -1277,10 +1278,26 @@ def apply_catalog(self, catalog: singer.Catalog) -> None:

catalog_entry = catalog.get_stream(self.name)
if catalog_entry:
self.primary_keys = catalog_entry.key_properties
self.replication_key = catalog_entry.replication_key
if catalog_entry.replication_method:
self.forced_replication_method = catalog_entry.replication_method
stream_metadata: StreamMetadata | None
if stream_metadata := catalog_entry.metadata.get(()): # type: ignore[assignment]
table_key_properties = stream_metadata.table_key_properties
table_replication_key = stream_metadata.replication_key
table_replication_method = stream_metadata.forced_replication_method
else:
table_key_properties = None
table_replication_key = None
table_replication_method = None

self.primary_keys = catalog_entry.key_properties or table_key_properties
self.replication_key = (
catalog_entry.replication_key or table_replication_key
)

replication_method = (
catalog_entry.replication_method or table_replication_method
)
if replication_method:
self.forced_replication_method = replication_method

def _get_state_partition_context(
self,
Expand Down
37 changes: 37 additions & 0 deletions tests/core/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,43 @@ def test_stream_apply_catalog(stream: Stream):
assert stream.forced_replication_method == REPLICATION_FULL_TABLE


def test_stream_apply_catalog__singer_standard(stream: Stream):
"""Applying a catalog to a stream should overwrite fields."""
assert stream.primary_keys == []
assert stream.replication_key == "updatedAt"
assert stream.replication_method == REPLICATION_INCREMENTAL
assert stream.forced_replication_method is None

stream.apply_catalog(
catalog=Catalog.from_dict(
{
"streams": [
{
"tap_stream_id": stream.name,
"stream": stream.name,
"schema": stream.schema,
"metadata": [
{
"breadcrumb": [],
"metadata": {
"table-key-properties": ["id"],
"replication-key": "newReplicationKey",
"forced-replication-method": REPLICATION_FULL_TABLE,
},
},
],
},
],
},
),
)

assert stream.primary_keys == ["id"]
assert stream.replication_key == "newReplicationKey"
assert stream.replication_method == REPLICATION_FULL_TABLE
assert stream.forced_replication_method == REPLICATION_FULL_TABLE


@pytest.mark.parametrize(
"stream_name,forced_replication_method,bookmark_value,expected_starting_value",
[
Expand Down

0 comments on commit a030c80

Please sign in to comment.