Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get duplicate records in kafka #105

Open
njalan opened this issue Feb 20, 2023 · 1 comment
Open

Get duplicate records in kafka #105

njalan opened this issue Feb 20, 2023 · 1 comment

Comments

@njalan
Copy link

njalan commented Feb 20, 2023

Below is my connect-file-source.properties:

name=local-file-source1
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=file:///data/test_file
topic=connect-test1
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
policy.sleepy.sleep=50000
policy.recursive=true
poll.interval.ms=0
policy.regexp=.*
policy.batch_size=0
#policy.cleanup=none
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader
file_reader.batch_size=0

when I run
echo "bbb" >>/data/test_file/1.txt
echo "ddd" >> /data/test_file/1.txt

I got below from kafka:
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"value"}],"optional":false},"payload":{"value":"bbb"}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"value"}],"optional":false},"payload":{"value":"bbb"}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"value"}],"optional":false},"payload":{"value":"ddd"}}
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"value"}],"optional":false},"payload":{"value":"ddd"}}

below is log:
nect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:01,087] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/=] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:01,088] INFO [local-file-source1|task-0] SleepyPolicy Seeking to offset [2] for file [file:/data/test_file/7.txt]. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:242)
[2023-02-20 13:21:01,110] INFO [local-file-source1|task-0] FsSourceTask Processing records for file [path = file:/data/test_file/7.txt, length = 16, blocks = [[offset = 0, length = 16, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:93)
[2023-02-20 13:21:01,131] INFO [local-file-source1|task-0] [Producer clientId=connector-producer-local-file-source1-0] Resetting the last seen epoch of partition connect-test1-0 to 0 since the associated topicId changed from null to pOAivNaKReC7FstKHYOn_A (org.apache.kafka.clients.Metadata:402)

[2023-02-20 13:21:11,218] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/1.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:11,218] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/2.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:11,218] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/3.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:11,219] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/6.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:11,219] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/5.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:11,219] INFO [local-file-source1|task-0] SleepyPolicy Skipping file [file:/data/test_file/=] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:285)
[2023-02-20 13:21:11,219] INFO [local-file-source1|task-0] SleepyPolicy Seeking to offset [2] for file [file:/data/test_file/7.txt]. (com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy:242)
[2023-02-20 13:21:11,220] INFO [local-file-source1|task-0] FsSourceTask Processing records for file [path = file:/data/test_file/7.txt, length = 24, blocks = [[offset = 0, length

@pracilkk
Copy link

pracilkk commented Apr 26, 2023

@mmolimar , Do you have any further updates or plan to fix this issue? After upgrading to the master branch we see duplicate data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants