You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.
We recently had a crash of Pipelinewise due to OOM that resulted in missing data in Snowflake. Going through the source code I found what I believe to be the likely cause of the problem:
# # set flushed state if it's not defined or there are no records so far
ifnotflushed_stateorsum(row_count.values()) ==0:
flushed_state=copy.deepcopy(state)
Line 322 makes the assumption that the first state message is also the flushed state. In the case of the Postgres tap and log-based replication this is not the case. By default, the first state message gets emitted after 10000 changes have been processed. Consider the following scenario:
2 tables synced via logical replication
batch size is 15,000
flush_all_streams is false
LSN in the persisted state for all tables is 10 when beginning the run
Tap processes 9000 inserts for table A and 1000 for table B
Tap emits first state message with LSN now at 20 for both tables. The target sets flushed_state to the content of that schema message
Tap emits 6000 more inserts for table A
batch size for table A is reached and the target flushes that table to Snowflake. Afterwards, it emits the state using flushed_state as the baseline.
Immediately after emitting the state the process crashes
The state file will now contain LSN 20 for both tables although table B has never been flushed to Snowflake and is still at LSN 10. As a result, the next sync will disregard all changes before LSN 20 and the data is lost.
As far as I understand the issue so far, a crash of the pipeline when using CDC with Postgres will always lead to a data loss. The larger the batch sizes the more data gets lost. The only remedy would be setting flush_all_streams to true.
I can think of two solutions:
Before emitting any records, send a state message that contains the persisted state as passed to the tap. Of course, that would only fix it for the Postgres tap and not for other taps that might also have that issue.
Perform a merge when saving the state instead of just overwriting the file. Probably has many negative side effects as the current implementation gives the target the complete authority over the content of the persisted state.
The text was updated successfully, but these errors were encountered:
We recently had a crash of Pipelinewise due to OOM that resulted in missing data in Snowflake. Going through the source code I found what I believe to be the likely cause of the problem:
pipelinewise-target-snowflake/target_snowflake/__init__.py
Lines 316 to 322 in f18c9e8
Line 322 makes the assumption that the first state message is also the flushed state. In the case of the Postgres tap and log-based replication this is not the case. By default, the first state message gets emitted after 10000 changes have been processed. Consider the following scenario:
10
when beginning the run20
for both tables. The target setsflushed_state
to the content of that schema messageflushed_state
as the baseline.20
for both tables although table B has never been flushed to Snowflake and is still at LSN10
. As a result, the next sync will disregard all changes before LSN20
and the data is lost.As far as I understand the issue so far, a crash of the pipeline when using CDC with Postgres will always lead to a data loss. The larger the batch sizes the more data gets lost. The only remedy would be setting
flush_all_streams
totrue
.I can think of two solutions:
The text was updated successfully, but these errors were encountered: