From 5511ccdb5ce34548fe0fc8c6a8dbfbea71fc57f1 Mon Sep 17 00:00:00 2001 From: Pat Nadolny Date: Wed, 14 Jun 2023 18:09:11 -0400 Subject: [PATCH] fix: duplicate record test (#47) Closes https://github.com/MeltanoLabs/target-snowflake/issues/41 The challenge is that we're using a merge statement which is successfully deduplicating against what already exists in the target table but within the batch of records in the stage there are also dupes. The test was failing because no data existed in the destination table so we weren't updating any records, only inserting, but within our staging file we had multiple primary keys ID 1 and 2 so they all get inserting and the result is duplicates in the destination table. The way I fixed it in this PR is by adding a qualify row_num = 1 to deduplicate within our staging file select query. It uses the SEQ8 function, which I've never used before, to order the records based on their place in the file i.e. the bottom of the table takes precedence over the top. I looks to work as expected but it feels a little sketchy, I wonder if unsorted streams would have issues where the wrong record gets selected. Ideally the user would tell us a sort by column to know how to take the latest. --------- Co-authored-by: Ken Payne --- target_snowflake/connector.py | 4 +++- tests/test_impl.py | 3 +-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/target_snowflake/connector.py b/target_snowflake/connector.py index c8ec2ea..bbffa40 100644 --- a/target_snowflake/connector.py +++ b/target_snowflake/connector.py @@ -270,11 +270,13 @@ def _get_merge_from_stage_statement( not_matched_insert_values = ", ".join( [f's.{col}' for col in formatted_properties] ) + dedup_cols = ", ".join([key for key in formatted_key_properties]) + dedup = f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {dedup_cols} ORDER BY SEQ8() DESC) = 1" return ( text( f"merge into {full_table_name} d using " + f"(select {', '.join(column_selections)} from '@~/target-snowflake/{sync_id}'" - + f"(file_format => {file_format})) s " + + f"(file_format => {file_format}) {dedup}) s " + f"on {join_expr} " + f"when matched then update set {matched_clause} " + f"when not matched then insert ({not_matched_insert_cols}) " diff --git a/tests/test_impl.py b/tests/test_impl.py index 770058b..7ccdf78 100644 --- a/tests/test_impl.py +++ b/tests/test_impl.py @@ -357,8 +357,7 @@ def validate(self) -> None: SnowflakeTargetCamelcaseComplexSchema, SnowflakeTargetCamelcaseTest, TargetCliPrintsTest, - # TODO: bug https://github.com/MeltanoLabs/target-snowflake/issues/41 - # SnowflakeTargetDuplicateRecords, + SnowflakeTargetDuplicateRecords, SnowflakeTargetEncodedStringData, SnowflakeTargetInvalidSchemaTest, # Not available in the SDK yet