Skip to content

Commit

Permalink
feat: SQLTap connector instance shared with streams (#1861)
Browse files Browse the repository at this point in the history
Co-authored-by: Edgar R. M <edgar@meltano.com>
  • Loading branch information
BuzzCutNorman and edgarrmondragon authored Aug 17, 2023
1 parent efca24f commit ce4b9a7
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
if t.TYPE_CHECKING:
from pathlib import PurePath

from singer_sdk.connectors import SQLConnector
from singer_sdk.mapper import PluginMapper
from singer_sdk.streams import SQLStream, Stream

Expand Down Expand Up @@ -612,6 +613,8 @@ class SQLTap(Tap):
# Stream class used to initialize new SQL streams from their catalog declarations.
default_stream_class: type[SQLStream]

_tap_connector: SQLConnector | None = None

def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
"""Initialize the SQL tap.
Expand All @@ -624,6 +627,19 @@ def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
self._catalog_dict: dict | None = None
super().__init__(*args, **kwargs)

@property
def tap_connector(self) -> SQLConnector:
"""The connector object.
Returns:
The connector object.
"""
if self._tap_connector is None:
self._tap_connector = self.default_stream_class.connector_class(
dict(self.config),
)
return self._tap_connector

@property
def catalog_dict(self) -> dict:
"""Get catalog dictionary.
Expand All @@ -637,7 +653,7 @@ def catalog_dict(self) -> dict:
if self.input_catalog:
return self.input_catalog.to_dict()

connector = self.default_stream_class.connector_class(dict(self.config))
connector = self.tap_connector

result: dict[str, list[dict]] = {"streams": []}
result["streams"].extend(connector.discover_catalog_entries())
Expand All @@ -653,6 +669,12 @@ def discover_streams(self) -> list[Stream]:
"""
result: list[Stream] = []
for catalog_entry in self.catalog_dict["streams"]:
result.append(self.default_stream_class(self, catalog_entry))
result.append(
self.default_stream_class(
tap=self,
catalog_entry=catalog_entry,
connector=self.tap_connector,
),
)

return result

0 comments on commit ce4b9a7

Please sign in to comment.