-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Debezium streaming integration #170
Open
smallgod
wants to merge
14
commits into
openmrs:main
Choose a base branch
from
smallgod:debezium-streaming-integration
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 4 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
83f9287
testing issue with dependencies conflict
smallgod d118ae2
Resolve Debezium-mysql conflict issue
smallgod 78e2015
Setup Debezium with offsets
smallgod bb981dc
Setup Debezium with offsets
smallgod 237c710
Adds externally tested files
smallgod ec33ef0
more cleaning up
smallgod 38d84fd
Remove old implementation of debezium
smallgod fb15c89
modified scripts to read from events table
smallgod 6b5f269
implement call to mamba database to execute script
smallgod b8b8c26
Make offsetbackingstore singleton and other modifications
smallgod 984b57a
Resolve circular dependency issue
smallgod 917506d
Resolve missing db event table issue
smallgod cb76736
Resolve issues with incremental failing also remove START TRANSACTION…
smallgod f3ee6b0
adds encounter table to table list
smallgod File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,11 @@ | ||
package org.openmrs.module.mambacore.api.dao.impl; | ||
|
||
import org.openmrs.module.dbevent.DbEventSource; | ||
import org.openmrs.module.dbevent.DbEventSourceConfig; | ||
import org.openmrs.module.dbevent.EventContext; | ||
import org.openmrs.module.mambacore.api.dao.FlattenDatabaseDao; | ||
import org.openmrs.module.mambacore.db.ConnectionPoolManager; | ||
import org.openmrs.module.mambacore.db.debezium.MyEventConsumer; | ||
import org.openmrs.module.mambacore.util.MambaETLProperties; | ||
import org.openmrs.module.mambacore.util.StringReplacerUtil; | ||
import org.slf4j.Logger; | ||
|
@@ -17,6 +21,7 @@ | |
import java.sql.Connection; | ||
import java.sql.PreparedStatement; | ||
import java.sql.SQLException; | ||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
@@ -29,13 +34,33 @@ public class JdbcFlattenDatabaseDao implements FlattenDatabaseDao { | |
private static final String MYSQL_COMMENT_REGEX = "--.*(?=\\n)"; | ||
private static final String DELIMITER = "~-~-"; | ||
|
||
//private DebeziumListener debeziumListener = new DebeziumListener(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delete commented code |
||
|
||
/** | ||
* Deploy MambaETL stored procedures | ||
*/ | ||
@Override | ||
public void deployMambaEtl() { | ||
|
||
log.info("Deploying MambaETL..."); | ||
MambaETLProperties props = MambaETLProperties.getInstance(); | ||
log.info("Deploying MambaETL, scheduled @interval: " + props.getInterval() + " seconds..."); | ||
executeSqlScript(props); | ||
log.info("Done deploying MambaETL..."); | ||
log.info("MambaETL deployed (with interval: " + props.getInterval() + "s )..."); | ||
} | ||
|
||
/** | ||
* Stream in database changes using Debezium | ||
*/ | ||
@Override | ||
public void streamInDatabaseChanges() { | ||
//debeziumListener.startListening(); | ||
EventContext ctx = new EventContext(); | ||
DbEventSourceConfig config = new DbEventSourceConfig(100002, "mamba-debezium", ctx); | ||
config.configureTablesToInclude(Arrays.asList("obs", "patient", "encounter", "encounter_type", "location")); | ||
DbEventSource eventSource = new DbEventSource(config); | ||
MyEventConsumer consumer = new MyEventConsumer(); | ||
eventSource.setEventConsumer(consumer); | ||
eventSource.start(); | ||
} | ||
|
||
private void executeSqlScript(MambaETLProperties props) { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ | |
import org.slf4j.LoggerFactory; | ||
import org.springframework.transaction.annotation.Transactional; | ||
|
||
import javax.annotation.PreDestroy; | ||
//import javax.annotation.PreDestroy; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delete commented code |
||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.TimeUnit; | ||
|
@@ -38,6 +38,7 @@ public void setupEtl() { | |
executorService.submit(() -> { | ||
try { | ||
dao.deployMambaEtl(); | ||
dao.streamInDatabaseChanges(); | ||
} catch (Exception e) { | ||
log.error("Error deploying Mamba ETL", e); | ||
} | ||
|
@@ -46,7 +47,7 @@ public void setupEtl() { | |
|
||
|
||
@Override | ||
@PreDestroy | ||
//@PreDestroy | ||
public void shutdownEtlThread() { | ||
executorService.shutdown(); | ||
try { | ||
|
91 changes: 91 additions & 0 deletions
91
api/src/main/java/org/openmrs/module/mambacore/db/debezium/CustomFileOffsetBackingStore.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/** | ||
* This Source Code Form is subject to the terms of the Mozilla Public License, | ||
* v. 2.0. If a copy of the MPL was not distributed with this file, You can | ||
* obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under | ||
* the terms of the Healthcare Disclaimer located at http://openmrs.org/license. | ||
* <p> | ||
* Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS | ||
* graphic logo is a trademark of OpenMRS Inc. | ||
*/ | ||
package org.openmrs.module.mambacore.db.debezium; | ||
|
||
import org.apache.kafka.connect.json.JsonConverter; | ||
import org.apache.kafka.connect.json.JsonConverterConfig; | ||
import org.apache.kafka.connect.storage.FileOffsetBackingStore; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Collections; | ||
|
||
/** | ||
* Custom {@link FileOffsetBackingStore} that only saves the offset if no exception was encountered | ||
* while processing a source record read by debezium from the MySQL binlog to ensure no binlog entry | ||
* goes unprocessed. | ||
*/ | ||
public class CustomFileOffsetBackingStore extends FileOffsetBackingStore { | ||
|
||
protected static final Logger log = LoggerFactory.getLogger(CustomFileOffsetBackingStore.class); | ||
private static final JsonConverter KEY_CONVERTER = new JsonConverter(); | ||
private static boolean disabled = false; | ||
|
||
private OffsetUtils offsetUtils; | ||
|
||
public CustomFileOffsetBackingStore() { | ||
super(KEY_CONVERTER); | ||
KEY_CONVERTER.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), true); | ||
} | ||
|
||
/** | ||
* Disables offset storage | ||
*/ | ||
public static void disable() { | ||
disabled = true; | ||
log.debug("Disabled saving of offsets"); | ||
} | ||
|
||
/** | ||
* Re-enables offset storage | ||
*/ | ||
public static void reset() { | ||
disabled = false; | ||
} | ||
|
||
/** | ||
* @see FileOffsetBackingStore#save() | ||
*/ | ||
@Override | ||
protected void save() { | ||
|
||
synchronized (CustomFileOffsetBackingStore.class) { | ||
|
||
if (disabled) { | ||
log.warn("Skipping saving of offset because an error was encountered while processing a change event"); | ||
return; | ||
} | ||
log.debug("Saving binlog offset"); | ||
super.save(); | ||
} | ||
} | ||
|
||
/** | ||
* @see FileOffsetBackingStore#start() | ||
*/ | ||
@Override | ||
public synchronized void start() { | ||
|
||
doStart(); | ||
|
||
try { | ||
//The offset file structure changed from that generated by previous versions therefore, we need to | ||
//transform any existing offset file to match the new structure otherwise remote sites will lose any | ||
//events that are recorded between pre-upgrade and post-upgrade application runs of the sender. | ||
offsetUtils.transformOffsetIfNecessary(data); | ||
} catch (Exception e) { | ||
throw new RuntimeException("An error occurred while verifying the existing debezium offset file data", e); | ||
} | ||
} | ||
|
||
protected void doStart() { | ||
super.start(); | ||
} | ||
} |
112 changes: 112 additions & 0 deletions
112
api/src/main/java/org/openmrs/module/mambacore/db/debezium/DbChangeToEvent.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
/** | ||
* This Source Code Form is subject to the terms of the Mozilla Public License, | ||
* v. 2.0. If a copy of the MPL was not distributed with this file, You can | ||
* obtain one at http://mozilla.org/MPL/2.0/. OpenMRS is also distributed under | ||
* the terms of the Healthcare Disclaimer located at http://openmrs.org/license. | ||
* <p> | ||
* Copyright (C) OpenMRS Inc. OpenMRS is a registered trademark and the OpenMRS | ||
* graphic logo is a trademark of OpenMRS Inc. | ||
*/ | ||
package org.openmrs.module.mambacore.db.debezium; | ||
|
||
import io.debezium.DebeziumException; | ||
import io.debezium.engine.ChangeEvent; | ||
import org.apache.kafka.connect.data.Struct; | ||
import org.apache.kafka.connect.source.SourceRecord; | ||
|
||
import java.util.Optional; | ||
import java.util.function.Function; | ||
|
||
/** | ||
* Utility function that converts a Debezium {@link ChangeEvent} to a {@link DbEvent}. | ||
*/ | ||
public class DbChangeToEvent | ||
implements Function<ChangeEvent<SourceRecord, SourceRecord>, DbEvent> { | ||
|
||
private static final String BEFORE_FIELD = "before"; | ||
private static final String AFTER_FIELD = "after"; | ||
private static final String SOURCE_FIELD = "source"; | ||
private static final String OPERATION_FIELD = "op"; | ||
private static final String TIMESTAMP_FIELD = "ts_ms"; | ||
private static final String SNAPSHOT_FIELD = "snapshot"; | ||
private static final String TABLE_FIELD = "table"; | ||
private static final String NAME_FIELD = "name"; | ||
|
||
@Override | ||
public DbEvent apply(ChangeEvent<SourceRecord, SourceRecord> changeEvent) { | ||
|
||
SourceRecord record = Optional.ofNullable(changeEvent) | ||
.map(ChangeEvent::value) | ||
.orElseThrow(() -> new DebeziumException("ChangeEvent value is null")); | ||
|
||
Struct keyStruct = getStruct(record.key(), "key"); | ||
Struct valueStruct = getStruct(record.value(), "value"); | ||
Struct sourceStruct = getStruct(valueStruct, SOURCE_FIELD); | ||
|
||
validateKeyStruct(keyStruct); | ||
|
||
ObjectMap primaryKey = new ObjectMapImpl(keyStruct); | ||
ObjectMap previousState = new ObjectMapImpl(valueStruct.getStruct(BEFORE_FIELD)); | ||
ObjectMap newState = new ObjectMapImpl(valueStruct.getStruct(AFTER_FIELD)); | ||
ObjectMap source = new ObjectMapImpl(sourceStruct); | ||
|
||
DbOperation operation = DbOperation.convertToEnum(getString(valueStruct, OPERATION_FIELD)); | ||
ObjectMap values = (operation == DbOperation.DELETE) ? previousState : newState; | ||
|
||
Long timestamp = getLong(valueStruct, TIMESTAMP_FIELD); | ||
String tableName = getString(sourceStruct, TABLE_FIELD); | ||
String sourceName = getString(sourceStruct, NAME_FIELD); | ||
DbSnapshot snapshot = DbSnapshot.convertToEnum(getString(sourceStruct, SNAPSHOT_FIELD)); | ||
|
||
return new DbEvent(primaryKey, | ||
previousState, | ||
newState, | ||
source, | ||
values, | ||
tableName, | ||
sourceName, | ||
operation, | ||
timestamp, | ||
snapshot); | ||
} | ||
|
||
/** | ||
* Helper method to retrieve a {@link Struct} and throw an appropriate exception if it is null. | ||
*/ | ||
private Struct getStruct(Object object, String structName) { | ||
return Optional.ofNullable((Struct) object) | ||
.orElseThrow(() -> new DebeziumException(structName + " struct is null")); | ||
} | ||
|
||
/** | ||
* Helper method to retrieve a String field value from a {@link Struct}. | ||
*/ | ||
private String getString(Struct struct, String fieldName) { | ||
return Optional.ofNullable(struct.getString(fieldName)) | ||
.orElseThrow(() -> new DebeziumException(fieldName + " field is missing or null")); | ||
} | ||
|
||
/** | ||
* Helper method to retrieve a Long field value from a {@link Struct}. | ||
*/ | ||
private Long getLong(Struct struct, String fieldName) { | ||
return Optional.ofNullable(struct.getInt64(fieldName)) | ||
.orElseThrow(() -> new DebeziumException(fieldName + " field is missing or null")); | ||
} | ||
|
||
/** | ||
* Validates the {@link Struct} for the key, ensuring it contains a single primary key. | ||
*/ | ||
private void validateKeyStruct(Struct keyStruct) { | ||
|
||
int keyFieldsSize = keyStruct.schema().fields().size(); | ||
|
||
if (keyFieldsSize == 0) { | ||
throw new DebeziumException("Tables with no primary key column are not supported"); | ||
} | ||
|
||
if (keyFieldsSize > 1) { | ||
throw new DebeziumException("Tables with composite primary keys are not supported"); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please delete the comments