From a79cff8c94d8780353b620060eb5931f1ba91bab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Wed, 10 Jan 2024 13:36:36 -0600 Subject: [PATCH] fix(targets)!: Default handling of `ACTIVATE_VERSION` messages to soft deletes and add new `SQLConnector.delete_old_versions` method (#2105) * fix(targets): Default handling of `ACTIVATE_VERSION` messages to soft deletes * Test SQL target capabilities info --- singer_sdk/connectors/sql.py | 25 ++++++++++++++ singer_sdk/helpers/capabilities.py | 8 +++++ singer_sdk/sinks/sql.py | 14 ++++---- singer_sdk/target_base.py | 11 +++++- tests/core/targets/__init__.py | 0 tests/core/targets/test_target_sql.py | 50 +++++++++++++++++++++++++++ tests/samples/test_target_sqlite.py | 8 ++--- 7 files changed, 103 insertions(+), 13 deletions(-) create mode 100644 tests/core/targets/__init__.py create mode 100644 tests/core/targets/test_target_sql.py diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index ae5ff78db..2263516bc 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -1189,3 +1189,28 @@ def deserialize_json(self, json_str: str) -> object: .. versionadded:: 0.31.0 """ return json.loads(json_str, parse_float=decimal.Decimal) + + def delete_old_versions( + self, + *, + full_table_name: str, + version_column_name: str, + current_version: int, + ) -> None: + """Hard-deletes any old version rows from the table. + + This is used to clean up old versions when an ACTIVATE_VERSION message is + received. + + Args: + full_table_name: The fully qualified table name. + version_column_name: The name of the version column. + current_version: The current ACTIVATE version of the table. + """ + with self._connect() as conn, conn.begin(): + conn.execute( + sa.text( + f"DELETE FROM {full_table_name} " # noqa: S608 + f"WHERE {version_column_name} <= {current_version}", + ), + ) diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index c3d124093..b18d53748 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -106,6 +106,14 @@ description="Add metadata to records.", ), ).to_dict() +TARGET_HARD_DELETE_CONFIG = PropertiesList( + Property( + "hard_delete", + BooleanType(), + description="Hard delete records.", + default=False, + ), +).to_dict() class TargetLoadMethods(str, Enum): diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index eb6dcfef6..3a37570d6 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -378,14 +378,12 @@ def activate_version(self, new_version: int) -> None: sql_type=sa.types.Integer(), ) - if self.config.get("hard_delete", True): - with self.connector._connect() as conn, conn.begin(): # noqa: SLF001 - conn.execute( - sa.text( - f"DELETE FROM {self.full_table_name} " # noqa: S608 - f"WHERE {self.version_column_name} <= {new_version}", - ), - ) + if self.config.get("hard_delete", False): + self.connector.delete_old_versions( + full_table_name=self.full_table_name, + version_column_name=self.version_column_name, + current_version=new_version, + ) return if not self.connector.column_exists( diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 01aed9891..7a80d71e0 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -19,6 +19,7 @@ from singer_sdk.helpers.capabilities import ( ADD_RECORD_METADATA_CONFIG, BATCH_CONFIG, + TARGET_HARD_DELETE_CONFIG, TARGET_LOAD_METHOD_CONFIG, TARGET_SCHEMA_CONFIG, CapabilitiesEnum, @@ -636,7 +637,12 @@ def capabilities(self) -> list[CapabilitiesEnum]: A list of capabilities supported by this target. """ sql_target_capabilities: list[CapabilitiesEnum] = super().capabilities - sql_target_capabilities.extend([TargetCapabilities.TARGET_SCHEMA]) + sql_target_capabilities.extend( + [ + TargetCapabilities.TARGET_SCHEMA, + TargetCapabilities.HARD_DELETE, + ] + ) return sql_target_capabilities @@ -668,6 +674,9 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: if TargetCapabilities.TARGET_SCHEMA in capabilities: _merge_missing(TARGET_SCHEMA_CONFIG, config_jsonschema) + if TargetCapabilities.HARD_DELETE in capabilities: + _merge_missing(TARGET_HARD_DELETE_CONFIG, config_jsonschema) + super().append_builtin_config(config_jsonschema) @final diff --git a/tests/core/targets/__init__.py b/tests/core/targets/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/core/targets/test_target_sql.py b/tests/core/targets/test_target_sql.py new file mode 100644 index 000000000..fd71c0aeb --- /dev/null +++ b/tests/core/targets/test_target_sql.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +import typing as t + +import pytest + +from singer_sdk.helpers.capabilities import CapabilitiesEnum, TargetCapabilities +from singer_sdk.target_base import SQLTarget + + +class SQLTargetMock(SQLTarget): + name = "sql-target-mock" + + def __init_subclass__( + cls, + *, + capabilities: t.Iterable[CapabilitiesEnum], + **kwargs: t.Any, + ): + super().__init_subclass__(**kwargs) + cls.capabilities = [*capabilities] + cls.config_jsonschema = {"properties": {}} + + +@pytest.mark.parametrize( + "capabilities,expected_settings", + [ + pytest.param([], set(), id="no capabilities"), + pytest.param( + [TargetCapabilities.TARGET_SCHEMA], + {"default_target_schema"}, + id="default schema", + ), + pytest.param( + [TargetCapabilities.HARD_DELETE], + {"hard_delete"}, + id="hard delete", + ), + ], +) +def test_target_about_info( + capabilities: list[CapabilitiesEnum], + expected_settings: set[str], +): + class MyTarget(SQLTargetMock, capabilities=capabilities): + pass + + about = MyTarget._get_about_info() + default_settings = {"add_record_metadata", "load_method"} + assert set(about.settings["properties"]) == expected_settings | default_settings diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index bddcbcf07..0d34c60a1 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -48,9 +48,9 @@ def sqlite_sample_target(sqlite_target_test_config): @pytest.fixture -def sqlite_sample_target_soft_delete(sqlite_target_test_config): +def sqlite_sample_target_hard_delete(sqlite_target_test_config): """Get a sample target object with hard_delete disabled.""" - return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": False}) + return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": True}) @pytest.fixture @@ -217,7 +217,7 @@ def test_sqlite_column_addition(sqlite_sample_target: SQLTarget): def test_sqlite_activate_version( sqlite_sample_target: SQLTarget, - sqlite_sample_target_soft_delete: SQLTarget, + sqlite_sample_target_hard_delete: SQLTarget, ): """Test handling the activate_version message for the SQLite target. @@ -249,7 +249,7 @@ def test_sqlite_activate_version( target_sync_test(sqlite_sample_target, input=StringIO(tap_output), finalize=True) target_sync_test( - sqlite_sample_target_soft_delete, + sqlite_sample_target_hard_delete, input=StringIO(tap_output), finalize=True, )