Skip to content

Commit

Permalink
fix: duplicate record test (#47)
Browse files Browse the repository at this point in the history
Closes #41

The challenge is that we're using a merge statement which is
successfully deduplicating against what already exists in the target
table but within the batch of records in the stage there are also dupes.
The test was failing because no data existed in the destination table so
we weren't updating any records, only inserting, but within our staging
file we had multiple primary keys ID 1 and 2 so they all get inserting
and the result is duplicates in the destination table.

The way I fixed it in this PR is by adding a qualify row_num = 1 to
deduplicate within our staging file select query. It uses the SEQ8
function, which I've never used before, to order the records based on
their place in the file i.e. the bottom of the table takes precedence
over the top. I looks to work as expected but it feels a little sketchy,
I wonder if unsorted streams would have issues where the wrong record
gets selected. Ideally the user would tell us a sort by column to know
how to take the latest.

---------

Co-authored-by: Ken Payne <ken@meltano.com>
  • Loading branch information
pnadolny13 and Ken Payne authored Jun 14, 2023
1 parent e1bb4c8 commit 5511ccd
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
4 changes: 3 additions & 1 deletion target_snowflake/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,13 @@ def _get_merge_from_stage_statement(
not_matched_insert_values = ", ".join(
[f's.{col}' for col in formatted_properties]
)
dedup_cols = ", ".join([key for key in formatted_key_properties])
dedup = f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {dedup_cols} ORDER BY SEQ8() DESC) = 1"
return (
text(
f"merge into {full_table_name} d using "
+ f"(select {', '.join(column_selections)} from '@~/target-snowflake/{sync_id}'"
+ f"(file_format => {file_format})) s "
+ f"(file_format => {file_format}) {dedup}) s "
+ f"on {join_expr} "
+ f"when matched then update set {matched_clause} "
+ f"when not matched then insert ({not_matched_insert_cols}) "
Expand Down
3 changes: 1 addition & 2 deletions tests/test_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,7 @@ def validate(self) -> None:
SnowflakeTargetCamelcaseComplexSchema,
SnowflakeTargetCamelcaseTest,
TargetCliPrintsTest,
# TODO: bug https://github.com/MeltanoLabs/target-snowflake/issues/41
# SnowflakeTargetDuplicateRecords,
SnowflakeTargetDuplicateRecords,
SnowflakeTargetEncodedStringData,
SnowflakeTargetInvalidSchemaTest,
# Not available in the SDK yet
Expand Down

0 comments on commit 5511ccd

Please sign in to comment.