diff --git a/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java b/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java index 89202e1d..8bd63fb2 100644 --- a/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java @@ -17,13 +17,13 @@ package io.aiven.connect.jdbc.config; import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.util.TimeZone; +import java.util.*; +import java.util.stream.Collectors; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.types.Password; import io.aiven.connect.jdbc.util.TimeZoneValidator; @@ -186,4 +186,68 @@ protected static void defineSqlQuoteIdentifiers(final ConfigDef configDef, final JdbcConfig.SQL_QUOTE_IDENTIFIERS_DISPLAY ); } + + protected static void validatePKModeAgainstPKFields(final Config config, final String pkMode, final String pkFields) { + final Map configValues = config.configValues().stream() + .collect(Collectors.toMap(ConfigValue::name, v -> v)); + + final ConfigValue pkModeConfigValue = configValues.get(pkMode); + final ConfigValue pkFieldsConfigValue = configValues.get(pkFields); + + if (pkModeConfigValue == null || pkFieldsConfigValue == null) { + return; + } + + final String mode = (String) pkModeConfigValue.value(); + final List fields = (List) pkFieldsConfigValue.value(); + + if (mode == null) { + return; + } + + switch (mode.toLowerCase()) { + case "none": + if (fields != null && !fields.isEmpty()) { + pkFieldsConfigValue.addErrorMessage( + "Primary key fields should not be set when pkMode is 'none'." + ); + } + break; + case "kafka": + if (fields == null || fields.size() != 3) { + pkFieldsConfigValue.addErrorMessage( + "Primary key fields must be set with three fields " + + "(topic, partition, offset) when pkMode is 'kafka'." + ); + } + break; + case "record_key": + case "record_value": + if (fields == null || fields.isEmpty()) { + pkFieldsConfigValue.addErrorMessage( + "Primary key fields must be set when pkMode is 'record_key' or 'record_value'." + ); + } + break; + default: + pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + mode); + break; + } + } + + protected static void validateDeleteEnabled(final Config config, final String deleteEnabledKey, final String pkModeKey) { + final Map configValues = config.configValues().stream() + .collect(Collectors.toMap(ConfigValue::name, v -> v)); + + final ConfigValue deleteEnabledConfigValue = configValues.get(deleteEnabledKey); + final boolean deleteEnabled = (boolean) deleteEnabledConfigValue.value(); + + final ConfigValue pkModeConfigValue = configValues.get(pkModeKey); + final String pkMode = (String) pkModeConfigValue.value(); + + if (deleteEnabled && !"record_key".equalsIgnoreCase(pkMode)) { + deleteEnabledConfigValue.addErrorMessage("Delete support only works with pk.mode=record_key"); + } + } + } diff --git a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java index 03371b1c..435c5b8f 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java @@ -460,63 +460,17 @@ public String toString() { public static void main(final String... args) { System.out.println("========================================="); System.out.println("JDBC Sink connector Configuration Options"); - System.out.println("========================================="); + System.out.println("============================ ============="); System.out.println(); System.out.println(CONFIG_DEF.toEnrichedRst()); } public static void validateDeleteEnabled(final Config config) { - // Collect all configuration values - final Map configValues = config.configValues().stream() - .collect(Collectors.toMap(ConfigValue::name, v -> v)); - - // Check if DELETE_ENABLED is true - final ConfigValue deleteEnabledConfigValue = configValues.get(JdbcSinkConfig.DELETE_ENABLED); - final boolean deleteEnabled = (boolean) deleteEnabledConfigValue.value(); - - // Check if PK_MODE is RECORD_KEY - final ConfigValue pkModeConfigValue = configValues.get(JdbcSinkConfig.PK_MODE); - final String pkMode = (String) pkModeConfigValue.value(); - - if (deleteEnabled && !JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY.name().equalsIgnoreCase(pkMode)) { - deleteEnabledConfigValue.addErrorMessage("Delete support only works with pk.mode=record_key"); - } + JdbcConfig.validateDeleteEnabled(config, DELETE_ENABLED, PK_MODE); } public static void validatePKModeAgainstPKFields(final Config config) { - // Collect all configuration values - final Map configValues = config.configValues().stream() - .collect(Collectors.toMap(ConfigValue::name, v -> v)); - - final ConfigValue pkModeConfigValue = configValues.get(JdbcSinkConfig.PK_MODE); - final ConfigValue pkFieldsConfigValue = configValues.get(JdbcSinkConfig.PK_FIELDS); - - if (pkModeConfigValue == null || pkFieldsConfigValue == null) { - return; // If either pkMode or pkFields are not configured, there's nothing to validate - } - - final String pkMode = (String) pkModeConfigValue.value(); - final List pkFields = (List) pkFieldsConfigValue.value(); - - if (pkMode == null) { - return; // If pkMode is null, skip validation - } - - switch (pkMode.toLowerCase()) { - case "none": - validateNoPKFields(pkFieldsConfigValue, pkFields); - break; - case "kafka": - validateKafkaPKFields(pkFieldsConfigValue, pkFields); - break; - case "record_key": - case "record_value": - validatePKFieldsRequired(pkFieldsConfigValue, pkFields); - break; - default: - pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + pkMode); - break; - } + JdbcConfig.validatePKModeAgainstPKFields(config, PK_MODE, PK_FIELDS); } private static void validateNoPKFields(final ConfigValue pkFieldsConfigValue, final List pkFields) { @@ -527,6 +481,7 @@ private static void validateNoPKFields(final ConfigValue pkFieldsConfigValue, fi } } + private static void validateKafkaPKFields(final ConfigValue pkFieldsConfigValue, final List pkFields) { if (pkFields == null || pkFields.size() != 3) { pkFieldsConfigValue.addErrorMessage(