Skip to content

Commit

Permalink
Test with activate version disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Nov 30, 2024
1 parent 72565cd commit e8adcad
Showing 1 changed file with 68 additions and 0 deletions.
68 changes: 68 additions & 0 deletions tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ def sqlite_sample_target_hard_delete(sqlite_target_test_config):
)


@pytest.fixture
def sqlite_sample_target_no_activate_version(sqlite_target_test_config):
"""Get a sample target object with hard_delete disabled."""
return SQLiteTarget(config={**sqlite_target_test_config, "activate_version": False})


@pytest.fixture
def sqlite_target_add_record_metadata(sqlite_target_test_config):
"""Get a sample target object with add_record_metadata enabled."""
Expand Down Expand Up @@ -282,6 +288,68 @@ def test_sqlite_activate_version(
finalize=True,
)

# Check that the record metadata was added
db_path = sqlite_sample_target_hard_delete.config["path_to_db"]
engine = sa.create_engine(f"sqlite:///{db_path}")
meta = sa.MetaData()
meta.reflect(bind=engine)
table = meta.tables[test_tbl]

assert "_sdc_table_version" in table.columns
assert type(table.columns["_sdc_table_version"].type) is sa.INTEGER

assert "_sdc_deleted_at" in table.columns
assert type(table.columns["_sdc_deleted_at"].type) is sa.DATETIME


def test_sqlite_no_activate_version(
sqlite_sample_target_no_activate_version: SQLTarget,
):
"""Test handling the activate_version message for the SQLite target.
Test performs the following actions:
- Sends an activate_version message for a table that doesn't exist (which should
have no effect)
"""
test_tbl = f"zzz_tmp_{str(uuid4()).split('-')[-1]}"
schema_msg = {
"type": "SCHEMA",
"stream": test_tbl,
"schema": th.PropertiesList(th.Property("col_a", th.StringType())).to_dict(),
}

tap_output = "\n".join(
json.dumps(msg)
for msg in [
schema_msg,
{"type": "ACTIVATE_VERSION", "stream": test_tbl, "version": 12345},
{
"type": "RECORD",
"stream": test_tbl,
"record": {"col_a": "samplerow1"},
"version": 12345,
},
]
)

target_sync_test(
sqlite_sample_target_no_activate_version,
input=StringIO(tap_output),
finalize=True,
)

# Check that the record metadata was added
db_path = sqlite_sample_target_no_activate_version.config["path_to_db"]
engine = sa.create_engine(f"sqlite:///{db_path}")
meta = sa.MetaData()
meta.reflect(bind=engine)
table = meta.tables[test_tbl]

assert "col_a" in table.columns
assert "_sdc_table_version" not in table.columns
assert "_sdc_deleted_at" not in table.columns


def test_sqlite_add_record_metadata(sqlite_target_add_record_metadata: SQLTarget):
"""Test handling the activate_version message for the SQLite target.
Expand Down

0 comments on commit e8adcad

Please sign in to comment.