From af2c00e2d6f1fccb7788053b05de741296beb972 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Wed, 21 Aug 2024 20:22:07 -0600 Subject: [PATCH] feat: Interruption and termination signals in taps and targets --- .../sample_tap_countries/countries_streams.py | 11 ++++++++ samples/sample_target_csv/csv_target.py | 5 +++- singer_sdk/plugin_base.py | 25 ++++++++++++++++++- singer_sdk/tap_base.py | 15 +++++++++++ singer_sdk/target_base.py | 19 ++++++++++++++ 5 files changed, 73 insertions(+), 2 deletions(-) diff --git a/samples/sample_tap_countries/countries_streams.py b/samples/sample_tap_countries/countries_streams.py index d0106ff06..13223bc7d 100644 --- a/samples/sample_tap_countries/countries_streams.py +++ b/samples/sample_tap_countries/countries_streams.py @@ -17,6 +17,9 @@ from singer_sdk import typing as th from singer_sdk.streams.graphql import GraphQLStream +if t.TYPE_CHECKING: + from collections.abc import Iterable + SCHEMAS_DIR = importlib.resources.files(__package__) / "schemas" @@ -83,6 +86,14 @@ class CountriesStream(CountriesAPIStream): ), ).to_dict() + # FIXME: revert these changes before merging + def request_records(self, context) -> Iterable[dict]: + import time # noqa: PLC0415 + + time.sleep(60) # Simulate a slow stream + + return super().request_records(context) + class ContinentsStream(CountriesAPIStream): """Continents stream from the Countries API.""" diff --git a/samples/sample_target_csv/csv_target.py b/samples/sample_target_csv/csv_target.py index 6e800970c..635bd8c34 100644 --- a/samples/sample_target_csv/csv_target.py +++ b/samples/sample_target_csv/csv_target.py @@ -13,7 +13,10 @@ class SampleTargetCSV(Target): name = "target-csv" config_jsonschema = th.PropertiesList( th.Property( - "target_folder", th.StringType, required=True, title="Target Folder" + "target_folder", + th.StringType, + default="output", + title="Target Folder", ), th.Property("file_naming_scheme", th.StringType, title="File Naming Scheme"), ).to_dict() diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index db16560d4..31204b737 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -5,13 +5,14 @@ import abc import logging import os +import signal import sys import time import typing as t import warnings from importlib import metadata from pathlib import Path, PurePath -from types import MappingProxyType +from types import FrameType, MappingProxyType import click @@ -190,6 +191,10 @@ def __init__( # Initialization timestamp self.__initialized_at = int(time.time() * 1000) + # Signal handling + signal.signal(signal.SIGINT, self._handle_termination) + signal.signal(signal.SIGTERM, self._handle_termination) + def setup_mapper(self) -> None: """Initialize the plugin mapper for this tap.""" self._mapper = PluginMapper( @@ -416,6 +421,24 @@ def _validate_config(self, *, raise_errors: bool = True) -> list[str]: return errors + def _handle_termination( # pragma: no cover + self, + signum: int, # noqa: ARG002 + frame: FrameType | None, # noqa: ARG002 + ) -> None: + """Handle termination signal. + + Args: + signum: Signal number. + frame: Frame. + + Raises: + click.Abort: If the termination signal is received. + """ + self.logger.info("Gracefully shutting down...") + errmsg = "Received termination signal" + raise click.Abort(errmsg) + @classmethod def print_version( cls: type[PluginBase], diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 82900049c..251a9f797 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -33,6 +33,7 @@ if t.TYPE_CHECKING: from pathlib import PurePath + from types import FrameType from singer_sdk.connectors import SQLConnector from singer_sdk.mapper import PluginMapper @@ -487,6 +488,20 @@ def sync_all(self) -> None: # Command Line Execution + def _handle_termination( # pragma: no cover + self, + signum: int, + frame: FrameType | None, + ) -> None: + """Handle termination signal. + + Args: + signum: Signal number. + frame: Frame. + """ + self.write_message(StateMessage(value=self.state)) + super()._handle_termination(signum, frame) + @classmethod def invoke( # type: ignore[override] cls: type[Tap], diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 8907b6201..5e08ab7b9 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -32,6 +32,7 @@ if t.TYPE_CHECKING: from pathlib import PurePath + from types import FrameType from singer_sdk.connectors import SQLConnector from singer_sdk.mapper import PluginMapper @@ -542,6 +543,24 @@ def _write_state_message(self, state: dict) -> None: # CLI handler + def _handle_termination( # pragma: no cover + self, + signum: int, + frame: FrameType | None, + ) -> None: + """Handle termination signals. + + Args: + signum: Signal number. + frame: Frame object. + """ + self.logger.info( + "Received termination signal %d, draining all sinks...", + signum, + ) + self.drain_all(is_endofpipe=True) + super()._handle_termination(signum, frame) + @classmethod def invoke( # type: ignore[override] cls: type[Target],