From c99f492718bfab89134a708f9b5d0235f3fc9d90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Wed, 10 Jan 2024 17:52:07 -0600 Subject: [PATCH] feat: Support fanning out parent record into multiple child syncs --- singer_sdk/streams/core.py | 29 ++++++++-- tests/core/test_parent_child.py | 99 +++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 6 deletions(-) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 459b9e7615..567d7284d1 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -1026,17 +1026,18 @@ def _process_record( partition_context: The partition context. """ partition_context = partition_context or {} - child_context = copy.copy( - self.get_child_context(record=record, context=child_context), - ) for key, val in partition_context.items(): # Add state context to records if not already present if key not in record: record[key] = val - # Sync children, except when primary mapper filters out the record - if self.stream_maps[0].get_filter_result(record): - self._sync_children(child_context) + for context in self.generate_child_contexts( + record=record, + context=child_context, + ): + # Sync children, except when primary mapper filters out the record + if self.stream_maps[0].get_filter_result(record): + self._sync_children(copy.copy(context)) def _sync_records( # noqa: C901 self, @@ -1289,6 +1290,22 @@ def get_child_context(self, record: dict, context: dict | None) -> dict | None: return context or record + def generate_child_contexts( + self, + record: dict, + context: dict | None, + ) -> t.Iterable[dict | None]: + """Generate child contexts. + + Args: + record: Individual record in the stream. + context: Stream partition or context dictionary. + + Yields: + A child context for each child stream. + """ + yield self.get_child_context(record=record, context=context) + # Abstract Methods @abc.abstractmethod diff --git a/tests/core/test_parent_child.py b/tests/core/test_parent_child.py index 7fd01a153a..8cbb971bcf 100644 --- a/tests/core/test_parent_child.py +++ b/tests/core/test_parent_child.py @@ -167,3 +167,102 @@ def test_child_deselected_parent(tap_with_deselected_parent: MyTap): assert all(msg["type"] == SingerMessageType.RECORD for msg in child_record_messages) assert all(msg["stream"] == child_stream.name for msg in child_record_messages) assert all("pid" in msg["record"] for msg in child_record_messages) + + +def test_one_parent_many_children(tap: MyTap): + """Test tap output with parent stream deselected.""" + + class ParentMany(Stream): + """A parent stream.""" + + name = "parent_many" + schema: t.ClassVar[dict] = { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "children": {"type": "array", "items": {"type": "integer"}}, + }, + } + + def get_records( + self, + context: dict | None, # noqa: ARG002 + ) -> t.Iterable[dict | tuple[dict, dict | None]]: + yield {"id": "1", "children": [1, 2, 3]} + + def generate_child_contexts( + self, + record: dict, + context: dict | None, # noqa: ARG002 + ) -> t.Iterable[dict | None]: + for child_id in record["children"]: + yield {"child_id": child_id, "pid": record["id"]} + + class ChildMany(Stream): + """A child stream.""" + + name = "child_many" + schema: t.ClassVar[dict] = { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "pid": {"type": "integer"}, + }, + } + parent_stream_type = ParentMany + + def get_records(self, context: dict | None): + """Get dummy records.""" + yield { + "id": context["child_id"], + "composite_id": f"{context['pid']}-{context['child_id']}", + } + + class MyTapMany(Tap): + """A tap with streams having a parent-child relationship.""" + + name = "my-tap-many" + + def discover_streams(self): + """Discover streams.""" + return [ + ParentMany(self), + ChildMany(self), + ] + + tap = MyTapMany() + parent_stream = tap.streams["parent_many"] + child_stream = tap.streams["child_many"] + + messages = _get_messages(tap) + + # Parent schema is emitted + assert messages[1] + assert messages[1]["type"] == SingerMessageType.SCHEMA + assert messages[1]["stream"] == parent_stream.name + assert messages[1]["schema"] == parent_stream.schema + + # Child schemas are emitted + schema_messages = messages[2:9:3] + assert schema_messages + assert all(msg["type"] == SingerMessageType.SCHEMA for msg in schema_messages) + assert all(msg["stream"] == child_stream.name for msg in schema_messages) + assert all(msg["schema"] == child_stream.schema for msg in schema_messages) + + # Child records are emitted + child_record_messages = messages[3:10:3] + assert child_record_messages + assert all(msg["type"] == SingerMessageType.RECORD for msg in child_record_messages) + assert all(msg["stream"] == child_stream.name for msg in child_record_messages) + assert all("pid" in msg["record"] for msg in child_record_messages) + + # State messages are emitted + state_messages = messages[4:11:3] + assert state_messages + assert all(msg["type"] == SingerMessageType.STATE for msg in state_messages) + + # Parent record is emitted + assert messages[11] + assert messages[11]["type"] == SingerMessageType.RECORD + + raise AssertionError