Skip to content

Commit

Permalink
Solve-Smell: just pull-out the method from JdbcSinkConfig to JdbcConf…
Browse files Browse the repository at this point in the history
…ig, the methods are validateDeleteEnabled, validatePKModeAgainstPKFields
  • Loading branch information
JASH-PATEL-6566 committed Nov 30, 2024
1 parent 0a308a7 commit f7ed7ac
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 53 deletions.
72 changes: 68 additions & 4 deletions src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package io.aiven.connect.jdbc.config;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.TimeZone;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.types.Password;

import io.aiven.connect.jdbc.util.TimeZoneValidator;
Expand Down Expand Up @@ -186,4 +186,68 @@ protected static void defineSqlQuoteIdentifiers(final ConfigDef configDef, final
JdbcConfig.SQL_QUOTE_IDENTIFIERS_DISPLAY
);
}

protected static void validatePKModeAgainstPKFields(final Config config, final String pkMode, final String pkFields) {
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

final ConfigValue pkModeConfigValue = configValues.get(pkMode);
final ConfigValue pkFieldsConfigValue = configValues.get(pkFields);

if (pkModeConfigValue == null || pkFieldsConfigValue == null) {
return;
}

final String mode = (String) pkModeConfigValue.value();
final List<String> fields = (List<String>) pkFieldsConfigValue.value();

if (mode == null) {
return;
}

switch (mode.toLowerCase()) {
case "none":
if (fields != null && !fields.isEmpty()) {
pkFieldsConfigValue.addErrorMessage(
"Primary key fields should not be set when pkMode is 'none'."
);
}
break;
case "kafka":
if (fields == null || fields.size() != 3) {
pkFieldsConfigValue.addErrorMessage(
"Primary key fields must be set with three fields "
+ "(topic, partition, offset) when pkMode is 'kafka'."
);
}
break;
case "record_key":
case "record_value":
if (fields == null || fields.isEmpty()) {
pkFieldsConfigValue.addErrorMessage(
"Primary key fields must be set when pkMode is 'record_key' or 'record_value'."
);
}
break;
default:
pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + mode);
break;
}
}

protected static void validateDeleteEnabled(final Config config, final String deleteEnabledKey, final String pkModeKey) {
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

final ConfigValue deleteEnabledConfigValue = configValues.get(deleteEnabledKey);
final boolean deleteEnabled = (boolean) deleteEnabledConfigValue.value();

final ConfigValue pkModeConfigValue = configValues.get(pkModeKey);
final String pkMode = (String) pkModeConfigValue.value();

if (deleteEnabled && !"record_key".equalsIgnoreCase(pkMode)) {
deleteEnabledConfigValue.addErrorMessage("Delete support only works with pk.mode=record_key");
}
}

}
53 changes: 4 additions & 49 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,63 +460,17 @@ public String toString() {
public static void main(final String... args) {
System.out.println("=========================================");
System.out.println("JDBC Sink connector Configuration Options");
System.out.println("=========================================");
System.out.println("============================ =============");
System.out.println();
System.out.println(CONFIG_DEF.toEnrichedRst());
}

public static void validateDeleteEnabled(final Config config) {
// Collect all configuration values
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

// Check if DELETE_ENABLED is true
final ConfigValue deleteEnabledConfigValue = configValues.get(JdbcSinkConfig.DELETE_ENABLED);
final boolean deleteEnabled = (boolean) deleteEnabledConfigValue.value();

// Check if PK_MODE is RECORD_KEY
final ConfigValue pkModeConfigValue = configValues.get(JdbcSinkConfig.PK_MODE);
final String pkMode = (String) pkModeConfigValue.value();

if (deleteEnabled && !JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY.name().equalsIgnoreCase(pkMode)) {
deleteEnabledConfigValue.addErrorMessage("Delete support only works with pk.mode=record_key");
}
JdbcConfig.validateDeleteEnabled(config, DELETE_ENABLED, PK_MODE);
}

public static void validatePKModeAgainstPKFields(final Config config) {
// Collect all configuration values
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

final ConfigValue pkModeConfigValue = configValues.get(JdbcSinkConfig.PK_MODE);
final ConfigValue pkFieldsConfigValue = configValues.get(JdbcSinkConfig.PK_FIELDS);

if (pkModeConfigValue == null || pkFieldsConfigValue == null) {
return; // If either pkMode or pkFields are not configured, there's nothing to validate
}

final String pkMode = (String) pkModeConfigValue.value();
final List<String> pkFields = (List<String>) pkFieldsConfigValue.value();

if (pkMode == null) {
return; // If pkMode is null, skip validation
}

switch (pkMode.toLowerCase()) {
case "none":
validateNoPKFields(pkFieldsConfigValue, pkFields);
break;
case "kafka":
validateKafkaPKFields(pkFieldsConfigValue, pkFields);
break;
case "record_key":
case "record_value":
validatePKFieldsRequired(pkFieldsConfigValue, pkFields);
break;
default:
pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + pkMode);
break;
}
JdbcConfig.validatePKModeAgainstPKFields(config, PK_MODE, PK_FIELDS);
}

private static void validateNoPKFields(final ConfigValue pkFieldsConfigValue, final List<String> pkFields) {
Expand All @@ -527,6 +481,7 @@ private static void validateNoPKFields(final ConfigValue pkFieldsConfigValue, fi
}
}


private static void validateKafkaPKFields(final ConfigValue pkFieldsConfigValue, final List<String> pkFields) {
if (pkFields == null || pkFields.size() != 3) {
pkFieldsConfigValue.addErrorMessage(
Expand Down

0 comments on commit f7ed7ac

Please sign in to comment.