From be464fa4069dc9373e343d83f84dc0917923aa1b Mon Sep 17 00:00:00 2001 From: Wink <32723967+aiwenmo@users.noreply.github.com> Date: Fri, 22 Nov 2024 00:59:19 +0800 Subject: [PATCH] [Optimization-3932][cdc] Optimize CDCSOURCE and support sink print and mock (#3933) --- .../org/dinky/service/task/FlinkSqlTask.java | 2 + .../org/dinky/cdc/AbstractSinkBuilder.java | 453 +++++---------- .../main/java/org/dinky/cdc/SinkBuilder.java | 3 +- .../dinky/cdc/convert/DataTypeConverter.java | 520 ++++++++++++++++++ .../org/dinky/cdc/kafka/KafkaSinkBuilder.java | 175 +++--- .../dinky/cdc/kafka/KafkaSinkJsonBuilder.java | 97 ++-- .../org/dinky/cdc/print/PrintSinkBuilder.java | 89 +++ .../dinky/cdc/sql/AbstractSqlSinkBuilder.java | 220 +++----- .../org/dinky/cdc/sql/SQLSinkBuilder.java | 107 +--- .../sql/catalog/SQLCatalogSinkBuilder.java | 68 +-- .../services/org.dinky.cdc.SinkBuilder | 3 + .../java/org/dinky/cdc/SinkBuilderTest.java | 48 -- .../cdc/convert/DataTypeConverterTest.java | 58 ++ .../DorisSchemaEvolutionSinkBuilder.java | 80 ++- .../org/dinky/cdc/doris/DorisSinkBuilder.java | 2 - .../org/dinky/cdc/kafka/KafkaSinkBuilder.java | 183 ------ .../services/org.dinky.cdc.SinkBuilder | 2 - .../org/dinky/data/model/FlinkCDCConfig.java | 13 + .../connector/mock/sink/MockSinkFunction.java | 15 +- .../org/dinky/constant/FlinkConstant.java | 2 +- .../data/result/AbstractResultBuilder.java | 6 + .../dinky/data/result/MockResultBuilder.java | 171 +----- .../dinky/data/result/MockResultRunnable.java | 289 ++++++++++ .../org/dinky/data/result/ResultBuilder.java | 3 +- .../org/dinky/data/result/SelectResult.java | 7 + .../data/result/SelectResultBuilder.java | 6 +- .../java/org/dinky/executor/Executor.java | 25 +- .../java/org/dinky/explainer/Explainer.java | 3 +- .../mock/MockStatementExplainer.java | 3 + .../dinky/job/builder/JobExecuteBuilder.java | 2 +- .../dinky/job/builder/JobTransBuilder.java | 2 +- .../dinky/job/runner/JobPipelineRunner.java | 29 +- .../org/dinky/job/runner/JobSqlRunner.java | 2 +- .../trans/ddl/CreateCDCSourceOperation.java | 28 +- dinky-flink/dinky-flink-1.15/pom.xml | 3 +- dinky-flink/dinky-flink-1.16/pom.xml | 3 +- dinky-flink/dinky-flink-1.17/pom.xml | 3 +- dinky-flink/dinky-flink-1.18/pom.xml | 3 +- dinky-flink/dinky-flink-1.19/pom.xml | 3 +- dinky-flink/dinky-flink-1.20/pom.xml | 3 +- .../JobDetailInfoModel/ErrorMsgInfo.tsx | 2 +- .../JobDetailInfoModel/PreViewData.tsx | 8 +- .../components/JobDetailInfoModel/index.tsx | 5 +- .../Service/ExecutionHistory/index.tsx | 17 +- .../Toolbar/Service/Result/index.tsx | 31 +- .../DataStudio/Toolbar/Service/index.tsx | 2 +- 46 files changed, 1547 insertions(+), 1252 deletions(-) create mode 100644 dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/convert/DataTypeConverter.java rename dinky-cdc/{dinky-cdc-plus => dinky-cdc-core}/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java (80%) create mode 100644 dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/print/PrintSinkBuilder.java create mode 100644 dinky-cdc/dinky-cdc-core/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder delete mode 100644 dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/SinkBuilderTest.java create mode 100644 dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/convert/DataTypeConverterTest.java delete mode 100644 dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java create mode 100644 dinky-core/src/main/java/org/dinky/data/result/MockResultRunnable.java diff --git a/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java b/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java index 481f25fe41..e1eacd045c 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/FlinkSqlTask.java @@ -54,10 +54,12 @@ public FlinkSqlTask(TaskDTO task) { @Override public List explain() { + jobManager.setPlanMode(true); return jobManager.explainSql(task.getStatement()).getSqlExplainResults(); } public ObjectNode getJobPlan() { + jobManager.setPlanMode(true); String planJson = jobManager.getJobPlanJson(task.getStatement()); return JsonUtils.parseObject(planJson); } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java index f9c82b3846..0b0f36a747 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java @@ -20,48 +20,31 @@ package org.dinky.cdc; import org.dinky.assertion.Asserts; -import org.dinky.cdc.utils.FlinkStatementUtil; +import org.dinky.cdc.convert.DataTypeConverter; import org.dinky.data.model.Column; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; import org.dinky.data.model.Table; import org.dinky.executor.CustomTableEnvironment; import org.dinky.utils.JsonUtils; +import org.dinky.utils.SplitUtil; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; import org.apache.flink.table.operations.ModifyOperation; -import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BooleanType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.DoubleType; -import org.apache.flink.table.types.logical.FloatType; -import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.SmallIntType; -import org.apache.flink.table.types.logical.TimeType; -import org.apache.flink.table.types.logical.TimestampType; -import org.apache.flink.table.types.logical.TinyIntType; -import org.apache.flink.table.types.logical.VarBinaryType; -import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; -import java.math.BigDecimal; -import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; @@ -70,57 +53,48 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import javax.xml.bind.DatatypeConverter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - public abstract class AbstractSinkBuilder implements SinkBuilder { - protected ObjectMapper objectMapper = new ObjectMapper(); protected static final Logger logger = LoggerFactory.getLogger(AbstractSinkBuilder.class); + protected ObjectMapper objectMapper = new ObjectMapper(); protected FlinkCDCConfig config; + protected StreamExecutionEnvironment env; + protected CustomTableEnvironment customTableEnvironment; protected List modifyOperations = new ArrayList<>(); - private ZoneId sinkTimeZone = ZoneId.of("UTC"); + protected ZoneId sinkTimeZone = ZoneId.systemDefault(); - protected List typeConverterList = null; - - protected AbstractSinkBuilder() { - initTypeConverterList(); - } + protected AbstractSinkBuilder() {} protected AbstractSinkBuilder(FlinkCDCConfig config) { this.config = config; - initTypeConverterList(); } - protected void initTypeConverterList() { - typeConverterList = Lists.newArrayList( - this::convertVarCharType, - this::convertDateType, - this::convertTimeType, - this::convertVarBinaryType, - this::convertBigIntType, - this::convertFloatType, - this::convertDecimalType, - this::convertTimestampType); + protected ZoneId getSinkTimeZone() { + return this.sinkTimeZone; } - public FlinkCDCConfig getConfig() { - return config; + protected void init(StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment) { + this.env = env; + this.customTableEnvironment = customTableEnvironment; + initSinkTimeZone(); } - public void setConfig(FlinkCDCConfig config) { - this.config = config; + private void initSinkTimeZone() { + final String timeZone = config.getSink().get("timezone"); + config.getSink().remove("timezone"); + if (Asserts.isNotNullString(timeZone)) { + sinkTimeZone = ZoneId.of(timeZone); + logger.info("Sink timezone is {}", sinkTimeZone); + } } protected Properties getProperties() { @@ -139,41 +113,81 @@ protected Properties getProperties() { return properties; } - protected SingleOutputStreamOperator deserialize(DataStreamSource dataStreamSource) { + @SuppressWarnings("rawtypes") + protected SingleOutputStreamOperator deserialize(SingleOutputStreamOperator dataStreamSource) { return dataStreamSource .map((MapFunction) value -> objectMapper.readValue(value, Map.class)) - .returns(Map.class); + .returns(Map.class) + .name("Deserializer"); } - protected SingleOutputStreamOperator shunt( - SingleOutputStreamOperator mapOperator, Table table, String schemaFieldName) { - final String tableName = table.getName(); - final String schemaName = table.getSchema(); - return mapOperator.filter((FilterFunction) value -> { - LinkedHashMap source = (LinkedHashMap) value.get("source"); - return tableName.equals(source.get("table").toString()) - && schemaName.equals(source.get(schemaFieldName).toString()); - }); + @SuppressWarnings("rawtypes") + protected SingleOutputStreamOperator partitionByTableAndPrimarykey( + SingleOutputStreamOperator mapOperator, Map tableMap) { + final String schemaFieldName = config.getSchemaFieldName(); + final Map configSplit = config.getSplit(); + mapOperator.partitionCustom( + new Partitioner() { + @Override + public int partition(String key, int numPartitions) { + return Math.abs(key.hashCode()) % numPartitions; + } + }, + map -> { + LinkedHashMap source = (LinkedHashMap) map.get("source"); + String tableName = getMergedTableName(source, schemaFieldName, configSplit); + Table table = tableMap.get(tableName); + List primaryKeys = table.getColumns().stream() + .map(column -> { + if (column.isKeyFlag()) { + return column.getName(); + } + return ""; + }) + .collect(Collectors.toList()); + + return tableName + String.join("_", primaryKeys); + }); + return mapOperator.name("PartitionByPrimaryKey"); + } + + protected String getMergedTableName(LinkedHashMap source, String schemaFieldName, Map split) { + if (Asserts.isNullMap(split)) { + return source.get(schemaFieldName).toString() + "." + + source.get("table").toString(); + } + return SplitUtil.getReValue(source.get(schemaFieldName).toString(), split) + + "." + + SplitUtil.getReValue(source.get("table").toString(), split); } - protected DataStream shunt(SingleOutputStreamOperator processOperator, Table table, OutputTag tag) { - processOperator.forward(); - return processOperator.getSideOutput(tag).forward(); + @SuppressWarnings("rawtypes") + protected SingleOutputStreamOperator shunt( + SingleOutputStreamOperator mapOperator, String schemaName, String tableName) { + final String schemaFieldName = config.getSchemaFieldName(); + return mapOperator + .filter((FilterFunction) value -> { + LinkedHashMap source = (LinkedHashMap) value.get("source"); + return tableName.equals(source.get("table").toString()) + && schemaName.equals(source.get(schemaFieldName).toString()); + }) + .name("Shunt"); } @SuppressWarnings("rawtypes") - protected DataStream buildRowData( + private DataStream buildRowData( SingleOutputStreamOperator filterOperator, List columnNameList, List columnTypeList, String schemaTableName) { return filterOperator .flatMap(sinkRowDataFunction(columnNameList, columnTypeList, schemaTableName)) - .returns(RowData.class); + .returns(RowData.class) + .name("FlatMapRowData"); } @SuppressWarnings("rawtypes") - protected FlatMapFunction sinkRowDataFunction( + private FlatMapFunction sinkRowDataFunction( List columnNameList, List columnTypeList, String schemaTableName) { return (value, out) -> { try { @@ -203,12 +217,15 @@ protected FlatMapFunction sinkRowDataFunction( } @SuppressWarnings("rawtypes") - protected void rowDataCollect( + private void rowDataCollect( List columnNameList, List columnTypeList, Collector out, RowKind rowKind, Map value) { + if (Asserts.isNull(value)) { + return; + } GenericRowData genericRowData = new GenericRowData(rowKind, columnNameList.size()); for (int i = 0; i < columnNameList.size(); i++) { genericRowData.setField( @@ -218,13 +235,16 @@ protected void rowDataCollect( } @SuppressWarnings("rawtypes") - protected Object buildRowDataValues(Map value, RowKind rowKind, String columnName, LogicalType columnType) { - Map data = getOriginRowData(rowKind, value); - return convertValue(data.get(columnName), columnType); + private Object buildRowDataValues(Map value, RowKind rowKind, String columnName, LogicalType columnType) { + Map data = getOriginData(rowKind, value); + return DataTypeConverter.convertToRowData(data.get(columnName), columnType, sinkTimeZone); } @SuppressWarnings("rawtypes") - protected Map getOriginRowData(RowKind rowKind, Map value) { + private Map getOriginData(RowKind rowKind, Map value) { + if (Asserts.isNullMap(value)) { + return Collections.emptyMap(); + } switch (rowKind) { case INSERT: case UPDATE_AFTER: @@ -238,266 +258,74 @@ protected Map getOriginRowData(RowKind rowKind, Map value) { return Collections.emptyMap(); } - public void addSink( - StreamExecutionEnvironment env, + protected void addSink( DataStream rowDataDataStream, Table table, List columnNameList, List columnTypeList) {} - protected List createInsertOperations( - CustomTableEnvironment customTableEnvironment, Table table, String viewName, String tableName) { - String cdcSqlInsert = FlinkStatementUtil.getCDCInsertSql(table, tableName, viewName, config); - logger.info(cdcSqlInsert); - - List operations = customTableEnvironment.getParser().parse(cdcSqlInsert); - logger.info("Create {} FlinkSQL insert into successful...", tableName); - if (operations.isEmpty()) { - return operations; + protected List getSortedSchemaList() { + final List schemaList = config.getSchemaList(); + if (Asserts.isNullCollection(schemaList)) { + throw new IllegalArgumentException("Schema list is empty, please check your configuration and try again."); } - - try { - Operation operation = operations.get(0); - if (operation instanceof ModifyOperation) { - modifyOperations.add((ModifyOperation) operation); + for (Schema schema : schemaList) { + if (Asserts.isNullCollection(schema.getTables())) { + // if schema tables is empty, throw exception + throw new IllegalArgumentException( + "Schema tables is empty, please check your configuration or check your database permission and try again."); } - - } catch (Exception e) { - logger.error("Translate to plan occur exception: {}", e.toString()); - throw e; + // if schema tables is not empty, sort by table name to keep node sort + schema.setTables(schema.getTables().stream() + .sorted(Comparator.comparing(Table::getName)) + .collect(Collectors.toList())); } - return operations; + return schemaList; } + @SuppressWarnings("rawtypes") @Override - public DataStreamSource build( - CDCBuilder cdcBuilder, + public void build( StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource dataStreamSource) { + init(env, customTableEnvironment); + buildPipeline(dataStreamSource, getSortedSchemaList()); + } - final String timeZone = config.getSink().get("timezone"); - config.getSink().remove("timezone"); - if (Asserts.isNotNullString(timeZone)) { - sinkTimeZone = ZoneId.of(timeZone); - logger.info("Sink timezone is {}", sinkTimeZone); - } - - final List schemaList = config.getSchemaList(); - if (Asserts.isNullCollection(schemaList)) { - logger.warn("Schema list is empty, please check your configuration and try again."); - return dataStreamSource; + protected void buildPipeline(DataStreamSource dataStreamSource, List schemaList) { + SingleOutputStreamOperator mapOperator = deserialize(dataStreamSource); + logger.info("Build deserialize successful..."); + final Map tableMap = new LinkedHashMap<>(); + for (Schema schema : schemaList) { + for (Table table : schema.getTables()) { + tableMap.put(table.getSchemaTableName(), table); + } } - - final String schemaFieldName = config.getSchemaFieldName(); - - if (Asserts.isNotNullCollection(schemaList)) { - SingleOutputStreamOperator mapOperator = deserialize(dataStreamSource); - for (Schema schema : schemaList) { - if (Asserts.isNullCollection(schema.getTables())) { - // if schema tables is empty, throw exception - throw new IllegalArgumentException( - "Schema tables is empty, please check your configuration or check your database permission and try again."); - } - // if schema tables is not empty, sort by table name - List tableList = schema.getTables().stream() - .sorted(Comparator.comparing(Table::getName)) - .collect(Collectors.toList()); - for (Table table : tableList) { - SingleOutputStreamOperator filterOperator = shunt(mapOperator, table, schemaFieldName); - - List columnNameList = new ArrayList<>(); - List columnTypeList = new ArrayList<>(); - - buildColumn(columnNameList, columnTypeList, table.getColumns()); - - DataStream rowDataDataStream = - buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName()); - - addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); - } + partitionByTableAndPrimarykey(mapOperator, tableMap); + logger.info("Build partitionBy successful..."); + for (Schema schema : schemaList) { + for (Table table : schema.getTables()) { + SingleOutputStreamOperator singleOutputStreamOperator = + shunt(mapOperator, table.getSchema(), table.getName()); + logger.info("Build shunt successful..."); + final List columnNameList = new ArrayList<>(); + final List columnTypeList = new ArrayList<>(); + buildColumn(columnNameList, columnTypeList, table.getColumns()); + + DataStream rowDataDataStream = buildRowData( + singleOutputStreamOperator, columnNameList, columnTypeList, table.getSchemaTableName()); + logger.info("Build flatRowData successful..."); + addSink(rowDataDataStream, table, columnNameList, columnTypeList); } } - return dataStreamSource; } protected void buildColumn(List columnNameList, List columnTypeList, List columns) { for (Column column : columns) { columnNameList.add(column.getName()); - columnTypeList.add(getLogicalType(column)); - } - } - - public LogicalType getLogicalType(Column column) { - switch (column.getJavaType()) { - case BOOLEAN: - case JAVA_LANG_BOOLEAN: - return new BooleanType(); - case BYTE: - case JAVA_LANG_BYTE: - return new TinyIntType(); - case SHORT: - case JAVA_LANG_SHORT: - return new SmallIntType(); - case LONG: - case JAVA_LANG_LONG: - return new BigIntType(); - case FLOAT: - case JAVA_LANG_FLOAT: - return new FloatType(); - case DOUBLE: - case JAVA_LANG_DOUBLE: - return new DoubleType(); - case DECIMAL: - if (column.getPrecision() == null || column.getPrecision() == 0) { - return new DecimalType(38, column.getScale()); - } else { - return new DecimalType(column.getPrecision(), column.getScale()); - } - case INT: - case INTEGER: - return new IntType(); - case TIME: - case LOCALTIME: - return new TimeType(column.isNullable(), column.getPrecision() == null ? 0 : column.getPrecision()); - case DATE: - case LOCAL_DATE: - return new DateType(); - case LOCAL_DATETIME: - case TIMESTAMP: - if (column.getLength() != null) { - return new TimestampType(column.getLength()); - } else { - return new TimestampType(3); - } - case BYTES: - return new VarBinaryType(Integer.MAX_VALUE); - case STRING: - default: - return new VarCharType(); - } - } - - protected Object convertValue(Object value, LogicalType logicalType) { - if (value == null) { - return null; - } - - for (ConvertType convertType : typeConverterList) { - Optional result = convertType.convert(value, logicalType); - if (result.isPresent()) { - return result.get(); - } - } - return value; - } - - protected Optional convertVarBinaryType(Object value, LogicalType logicalType) { - if (logicalType instanceof VarBinaryType) { - // VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC. - if (value instanceof String) { - return Optional.of(DatatypeConverter.parseBase64Binary(value.toString())); - } - - return Optional.of(value); - } - return Optional.empty(); - } - - protected Optional convertBigIntType(Object value, LogicalType logicalType) { - if (logicalType instanceof BigIntType) { - if (value instanceof Integer) { - return Optional.of(((Integer) value).longValue()); - } - - return Optional.of(value); - } - return Optional.empty(); - } - - protected Optional convertFloatType(Object value, LogicalType logicalType) { - if (logicalType instanceof FloatType) { - if (value instanceof Float) { - return Optional.of(value); - } - - if (value instanceof Double) { - return Optional.of(((Double) value).floatValue()); - } - - return Optional.of(Float.parseFloat(value.toString())); + columnTypeList.add(DataTypeConverter.getLogicalType(column)); } - return Optional.empty(); - } - - protected Optional convertDecimalType(Object value, LogicalType logicalType) { - if (logicalType instanceof DecimalType) { - final DecimalType decimalType = (DecimalType) logicalType; - return Optional.ofNullable(DecimalData.fromBigDecimal( - new BigDecimal((String) value), decimalType.getPrecision(), decimalType.getScale())); - } - return Optional.empty(); - } - - protected Optional convertTimestampType(Object value, LogicalType logicalType) { - if (logicalType instanceof TimestampType) { - if (value instanceof Integer) { - return Optional.of(Instant.ofEpochMilli(((Integer) value).longValue()) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } else if (value instanceof String) { - return Optional.of( - Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime()); - } else { - TimestampType logicalType1 = (TimestampType) logicalType; - if (logicalType1.getPrecision() == 3) { - return Optional.of(Instant.ofEpochMilli((long) value) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } else if (logicalType1.getPrecision() > 3) { - return Optional.of( - Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } - return Optional.of(Instant.ofEpochSecond(((long) value)) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } - } - return Optional.empty(); - } - - protected Optional convertDateType(Object target, LogicalType logicalType) { - if (logicalType instanceof DateType) { - return Optional.of(StringData.fromString(Instant.ofEpochMilli((long) target) - .atZone(ZoneId.systemDefault()) - .toLocalDate() - .toString())); - } - return Optional.empty(); - } - - protected Optional convertTimeType(Object target, LogicalType logicalType) { - if (logicalType instanceof TimeType) { - return Optional.of(StringData.fromString(Instant.ofEpochMilli((long) target) - .atZone(ZoneId.systemDefault()) - .toLocalTime() - .toString())); - } - return Optional.empty(); - } - - protected Optional convertVarCharType(Object target, LogicalType logicalType) { - if (logicalType instanceof VarCharType) { - return Optional.of(StringData.fromString((String) target)); - } - return Optional.empty(); - } - - @FunctionalInterface - public interface ConvertType { - Optional convert(Object target, LogicalType logicalType); } @Override @@ -576,21 +404,6 @@ private Map parseMappingRoute(String mappingRoute) { return mappingRules; } - protected List getPKList(Table table) { - if (Asserts.isNullCollection(table.getColumns())) { - return new ArrayList<>(); - } - - return table.getColumns().stream() - .filter(Column::isKeyFlag) - .map(Column::getName) - .collect(Collectors.toList()); - } - - protected ZoneId getSinkTimeZone() { - return this.sinkTimeZone; - } - protected Map getTableTopicMap() { String topicMapStr = this.config.getSink().get("table.topic.map"); Map tableTopicMap = new HashMap<>(); diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/SinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/SinkBuilder.java index 6bcd11a5a3..ecd521f05d 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/SinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/SinkBuilder.java @@ -32,8 +32,7 @@ public interface SinkBuilder { SinkBuilder create(FlinkCDCConfig config); - DataStreamSource build( - CDCBuilder cdcBuilder, + void build( StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource dataStreamSource); diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/convert/DataTypeConverter.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/convert/DataTypeConverter.java new file mode 100644 index 0000000000..e12c0d6d8a --- /dev/null +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/convert/DataTypeConverter.java @@ -0,0 +1,520 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.cdc.convert; + +import org.dinky.assertion.Asserts; +import org.dinky.data.model.Column; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; + +import java.math.BigDecimal; +import java.math.MathContext; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.DatatypeConverter; + +public class DataTypeConverter { + + static final long MILLISECONDS_PER_SECOND = TimeUnit.SECONDS.toMillis(1); + static final long NANOSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); + static final long NANOSECONDS_PER_DAY = TimeUnit.DAYS.toNanos(1); + public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000 + + public static LogicalType getLogicalType(Column column) { + switch (column.getJavaType()) { + case BOOLEAN: + case JAVA_LANG_BOOLEAN: + return new BooleanType(); + case BYTE: + case JAVA_LANG_BYTE: + return new TinyIntType(); + case SHORT: + case JAVA_LANG_SHORT: + return new SmallIntType(); + case LONG: + case JAVA_LANG_LONG: + return new BigIntType(); + case FLOAT: + case JAVA_LANG_FLOAT: + return new FloatType(); + case DOUBLE: + case JAVA_LANG_DOUBLE: + return new DoubleType(); + case DECIMAL: + if (column.getPrecision() == null || column.getPrecision() == 0) { + return new DecimalType(38, column.getScale()); + } else { + return new DecimalType(column.getPrecision(), column.getScale()); + } + case INT: + case INTEGER: + return new IntType(); + case TIME: + case LOCALTIME: + return new TimeType( + column.isNullable(), + column.getLength() == 0 + ? (column.getPrecision() == null ? 0 : column.getPrecision()) + : column.getLength()); + case DATE: + case LOCAL_DATE: + return new DateType(); + case LOCAL_DATETIME: + case TIMESTAMP: + if (Asserts.isNotNull(column.getLength())) { + return new TimestampType(column.getLength()); + } else { + return new TimestampType(3); + } + case BYTES: + return new VarBinaryType(Integer.MAX_VALUE); + case STRING: + default: + return new VarCharType(Asserts.isNull(column.getLength()) ? Integer.MAX_VALUE : column.getLength()); + } + } + + public static Object convertToRow(Object value, LogicalType logicalType, ZoneId timeZone) { + if (Asserts.isNull(value)) { + return Optional.empty(); + } + switch (logicalType.getTypeRoot()) { + case BOOLEAN: + return convertToBoolean(value); + case TINYINT: + return convertToByte(value); + case SMALLINT: + return convertToShort(value); + case INTEGER: + return convertToInt(value); + case BIGINT: + return convertToLong(value); + case DATE: + return convertToDate(value); + case TIME_WITHOUT_TIME_ZONE: + return convertToTime(value, logicalType, timeZone); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return convertToTimestamp(value, logicalType); + case TIMESTAMP_WITH_TIME_ZONE: + return convertToTimestampWithTimeZone(value, logicalType, timeZone); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return convertToTimestampWithLocalTimeZone(value, logicalType); + case FLOAT: + return convertToFloat(value); + case DOUBLE: + return convertToDouble(value); + case CHAR: + case VARCHAR: + return convertToString(value); + case BINARY: + case VARBINARY: + return convertToBinary(value); + case DECIMAL: + return convertToDecimal(value, logicalType); + case ROW: + return value; + case ARRAY: + case MAP: + default: + throw new UnsupportedOperationException("Unsupported type: " + logicalType); + } + } + + public static Object convertToRowData(Object value, LogicalType logicalType, ZoneId timeZone) { + if (Asserts.isNull(value)) { + return Optional.empty(); + } + switch (logicalType.getTypeRoot()) { + case BOOLEAN: + return convertToBoolean(value); + case TINYINT: + return convertToByte(value); + case SMALLINT: + return convertToShort(value); + case INTEGER: + return convertToInt(value); + case BIGINT: + return convertToLong(value); + case DATE: + return convertToDate(value); + case TIME_WITHOUT_TIME_ZONE: + return convertToTime(value, logicalType, timeZone); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return convertToTimestampData(value, logicalType); + case TIMESTAMP_WITH_TIME_ZONE: + return convertToTimestampDataWithTimeZone(value, logicalType, timeZone); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return convertToTimestampDataWithLocalTimeZone(value, logicalType); + case FLOAT: + return convertToFloat(value); + case DOUBLE: + return convertToDouble(value); + case CHAR: + case VARCHAR: + return convertToStringData(value); + case BINARY: + case VARBINARY: + return convertToBinary(value); + case DECIMAL: + return convertToDecimalData(value, logicalType); + case ROW: + return value; + case ARRAY: + case MAP: + default: + throw new UnsupportedOperationException("Unsupported type: " + logicalType); + } + } + + private static Object convertToBoolean(Object obj) { + if (obj instanceof Boolean) { + return obj; + } else if (obj instanceof Byte) { + return (byte) obj == 1; + } else if (obj instanceof Short) { + return (short) obj == 1; + } else if (obj instanceof Number) { + return obj.equals(1); + } else { + return Boolean.parseBoolean(obj.toString()); + } + } + + private static Object convertToByte(Object obj) { + return Byte.parseByte(obj.toString()); + } + + private static Object convertToShort(Object obj) { + return Short.parseShort(obj.toString()); + } + + private static Object convertToInt(Object obj) { + if (obj instanceof Integer) { + return obj; + } else if (obj instanceof Long) { + return ((Long) obj).intValue(); + } else { + return Integer.parseInt(obj.toString()); + } + } + + private static Object convertToLong(Object obj) { + if (obj instanceof Integer) { + return ((Integer) obj).longValue(); + } else if (obj instanceof Long) { + return obj; + } else { + return Long.parseLong(obj.toString()); + } + } + + private static Object convertToFloat(Object obj) { + if (obj instanceof Float) { + return obj; + } else if (obj instanceof Double) { + return ((Double) obj).floatValue(); + } else { + return Float.parseFloat(obj.toString()); + } + } + + private static Object convertToDouble(Object obj) { + if (obj instanceof Float) { + return ((Float) obj).doubleValue(); + } else if (obj instanceof Double) { + return obj; + } else { + return Double.parseDouble(obj.toString()); + } + } + + private static Object convertToDate(Object obj) { + return toLocalDate(obj); + } + + private static LocalDate toLocalDate(Object obj) { + if (obj instanceof Long) { + // Assume the value is the epoch day number + return LocalDate.ofEpochDay((Long) obj); + } + if (obj instanceof Integer) { + // Assume the value is the epoch day number + return LocalDate.ofEpochDay((Integer) obj); + } + throw new IllegalArgumentException("Unable to convert to LocalDate from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + + private static Object convertToTime(Object obj, LogicalType logicalType, ZoneId timeZone) { + TimeType timeType = (TimeType) logicalType; + if (obj instanceof Number) { + Number number = (Number) obj; + long value = number.longValue(); + if (value > MILLIS_PER_DAY) { + value = value / 1000; + if (value > MILLIS_PER_DAY) { + value = value / 1000; + } + } + if (timeType.getPrecision() == 0) { + return Instant.ofEpochSecond(value).atZone(timeZone).toLocalTime(); + } + if (timeType.getPrecision() == 3) { + return Instant.ofEpochMilli(value).atZone(timeZone).toLocalTime(); + } + if (timeType.getPrecision() == 6) { + return Instant.ofEpochMilli(value % 1000).atZone(timeZone).toLocalTime(); + } + return Instant.ofEpochMilli(value % 1000000).atZone(timeZone).toLocalTime(); + } + throw new IllegalArgumentException("Unable to convert to LocalTime from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + + private static LocalTime toLocalTime(Object obj, ZoneId timeZone) { + if (obj == null) { + return null; + } + if (obj instanceof Long) { + return Instant.ofEpochMilli((long) obj).atZone(timeZone).toLocalTime(); + } + if (obj instanceof LocalTime) { + return (LocalTime) obj; + } + if (obj instanceof LocalDateTime) { + return ((LocalDateTime) obj).toLocalTime(); + } + if (obj instanceof java.sql.Date) { + throw new IllegalArgumentException( + "Unable to convert to LocalDate from a java.sql.Date value '" + obj + "'"); + } + if (obj instanceof java.sql.Time) { + java.sql.Time time = (java.sql.Time) obj; + long millis = (int) (time.getTime() % MILLISECONDS_PER_SECOND); + int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND); + return LocalTime.of(time.getHours(), time.getMinutes(), time.getSeconds(), nanosOfSecond); + } + if (obj instanceof java.sql.Timestamp) { + java.sql.Timestamp timestamp = (java.sql.Timestamp) obj; + return LocalTime.of( + timestamp.getHours(), timestamp.getMinutes(), timestamp.getSeconds(), timestamp.getNanos()); + } + if (obj instanceof java.util.Date) { + java.util.Date date = (java.util.Date) obj; + long millis = (int) (date.getTime() % MILLISECONDS_PER_SECOND); + int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND); + return LocalTime.of(date.getHours(), date.getMinutes(), date.getSeconds(), nanosOfSecond); + } + if (obj instanceof Duration) { + Long value = ((Duration) obj).toNanos(); + if (value >= 0 && value <= NANOSECONDS_PER_DAY) { + return LocalTime.ofNanoOfDay(value); + } else { + throw new IllegalArgumentException( + "Time values must use number of milliseconds greater than 0 and less than 86400000000000"); + } + } + throw new IllegalArgumentException("Unable to convert to LocalTime from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + + private static Object convertToTimestamp(Object obj, LogicalType logicalType) { + if (obj instanceof Integer) { + return Instant.ofEpochMilli(((Integer) obj).longValue()) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } else if (obj instanceof String) { + return Instant.parse((String) obj).atZone(ZoneId.systemDefault()).toLocalDateTime(); + } else if (obj instanceof Long) { + TimestampType logicalType1 = (TimestampType) logicalType; + if (logicalType1.getPrecision() == 3) { + return Instant.ofEpochMilli((long) obj) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } else if (logicalType1.getPrecision() > 3) { + return Instant.ofEpochMilli(((long) obj) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } + return Instant.ofEpochSecond(((long) obj)) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } + throw new IllegalArgumentException("Unable to convert to TIMESTAMP from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + + private static Object convertToTimestampData(Object obj, LogicalType logicalType) { + return TimestampData.fromLocalDateTime((LocalDateTime) convertToTimestamp(obj, logicalType)); + } + + private static Object convertToTimestampWithTimeZone(Object obj, LogicalType logicalType, ZoneId timeZone) { + if (obj instanceof Integer) { + return Instant.ofEpochMilli(((Integer) obj).longValue()) + .atZone(timeZone) + .toLocalDateTime(); + } else if (obj instanceof String) { + return Instant.parse((String) obj).atZone(timeZone).toLocalDateTime(); + } else if (obj instanceof Long) { + TimestampType logicalType1 = (TimestampType) logicalType; + if (logicalType1.getPrecision() == 3) { + return Instant.ofEpochMilli((long) obj).atZone(timeZone).toLocalDateTime(); + } else if (logicalType1.getPrecision() > 3) { + return Instant.ofEpochMilli(((long) obj) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) + .atZone(timeZone) + .toLocalDateTime(); + } + return Instant.ofEpochSecond(((long) obj)).atZone(timeZone).toLocalDateTime(); + } + throw new IllegalArgumentException("Unable to convert to TIMESTAMP from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + + private static Object convertToTimestampDataWithTimeZone(Object obj, LogicalType logicalType, ZoneId timeZone) { + return TimestampData.fromLocalDateTime( + (LocalDateTime) convertToTimestampWithTimeZone(obj, logicalType, timeZone)); + } + + private static Object convertToTimestampWithLocalTimeZone(Object obj, LogicalType logicalType) { + if (obj instanceof Integer) { + return Instant.ofEpochMilli(((Integer) obj).longValue()) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } else if (obj instanceof String) { + return Instant.parse((String) obj).atZone(ZoneId.systemDefault()).toLocalDateTime(); + } else if (obj instanceof Long) { + TimestampType logicalType1 = (TimestampType) logicalType; + if (logicalType1.getPrecision() == 3) { + return Instant.ofEpochMilli((long) obj) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } else if (logicalType1.getPrecision() > 3) { + return Instant.ofEpochMilli(((long) obj) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } + return Instant.ofEpochSecond(((long) obj)) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } + throw new IllegalArgumentException("Unable to convert to TIMESTAMP from unexpected value '" + + obj + + "' of type " + + obj.getClass().getName()); + } + + private static Object convertToTimestampDataWithLocalTimeZone(Object obj, LogicalType logicalType) { + return TimestampData.fromLocalDateTime((LocalDateTime) convertToTimestampWithLocalTimeZone(obj, logicalType)); + } + + private static Object convertToString(Object obj) { + return String.valueOf(obj); + } + + private static Object convertToStringData(Object obj) { + return StringData.fromString(String.valueOf(obj)); + } + + private static Object convertToBinary(Object obj) { + if (obj instanceof byte[]) { + return obj; + } else if (obj instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) obj; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } else if (obj instanceof String) { + return DatatypeConverter.parseBase64Binary(String.valueOf(obj)); + } else { + throw new UnsupportedOperationException( + "Unsupported BYTES value type: " + obj.getClass().getSimpleName()); + } + } + + private static Object convertToDecimal(Object obj, LogicalType logicalType) { + DecimalType decimalType = (DecimalType) logicalType; + if (obj instanceof BigDecimal) { + return obj; + } else if (obj instanceof Number) { + Number number = (Number) obj; + return BigDecimal.valueOf(number.longValue(), decimalType.getScale()); + } else if (obj instanceof String) { + MathContext mathContext = new MathContext(((DecimalType) logicalType).getPrecision()); + return new BigDecimal(String.valueOf(obj), mathContext); + } else { + throw new UnsupportedOperationException( + "Unsupported Decimal value type: " + obj.getClass().getSimpleName()); + } + } + + private static Object convertToDecimalData(Object obj, LogicalType logicalType) { + DecimalType decimalType = (DecimalType) logicalType; + if (obj instanceof BigDecimal) { + return DecimalData.fromBigDecimal( + new BigDecimal(String.valueOf(obj)), decimalType.getPrecision(), decimalType.getScale()); + } else if (obj instanceof Number) { + Number number = (Number) obj; + return DecimalData.fromBigDecimal( + BigDecimal.valueOf(number.longValue(), decimalType.getScale()), + decimalType.getPrecision(), + decimalType.getScale()); + } else if (obj instanceof String) { + return DecimalData.fromBigDecimal( + new BigDecimal(String.valueOf(obj)), decimalType.getPrecision(), decimalType.getScale()); + } else { + throw new UnsupportedOperationException( + "Unsupported Decimal value type: " + obj.getClass().getSimpleName()); + } + } +} diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java index 6caf75a0d7..e975dbc265 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java @@ -40,7 +40,6 @@ import org.dinky.assertion.Asserts; import org.dinky.cdc.AbstractSinkBuilder; -import org.dinky.cdc.CDCBuilder; import org.dinky.cdc.SinkBuilder; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; @@ -63,12 +62,10 @@ import org.apache.flink.util.OutputTag; import java.io.Serializable; -import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.stream.Collectors; /** * MysqlCDCBuilder @@ -76,6 +73,7 @@ public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializable { public static final String KEY_WORD = "datastream-kafka"; + private Properties kafkaProducerConfig; public KafkaSinkBuilder() {} @@ -85,7 +83,6 @@ public KafkaSinkBuilder(FlinkCDCConfig config) { @Override public void addSink( - StreamExecutionEnvironment env, DataStream rowDataDataStream, Table table, List columnNameList, @@ -102,18 +99,93 @@ public SinkBuilder create(FlinkCDCConfig config) { } @Override - public DataStreamSource build( - CDCBuilder cdcBuilder, + public void build( StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource dataStreamSource) { - Properties kafkaProducerConfig = getProperties(); + init(env, customTableEnvironment); + + kafkaProducerConfig = getProperties(); if (Asserts.isNotNullString(config.getSink().get("topic"))) { + singleTopicSink(dataStreamSource); + } else { + multipleTopicSink(dataStreamSource); + } + } + + private void singleTopicSink(DataStreamSource dataStreamSource) { + org.apache.flink.connector.kafka.sink.KafkaSinkBuilder kafkaSinkBuilder = KafkaSink.builder() + .setBootstrapServers(config.getSink().get("brokers")) + .setRecordSerializer(KafkaRecordSerializationSchema.builder() + .setTopic(config.getSink().get("topic")) + .setValueSerializationSchema(new SimpleStringSchema()) + .build()) + .setTransactionalIdPrefix( + config.getSink().get("transactional.id.prefix") == null + ? "" + : config.getSink().get("transactional.id.prefix")) + .setDeliverGuarantee(DeliveryGuarantee.valueOf( + config.getSink().get("delivery.guarantee") == null + ? "NONE" + : config.getSink().get("delivery.guarantee"))); + if (!kafkaProducerConfig.isEmpty()) { + kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); + } + KafkaSink kafkaSink = kafkaSinkBuilder.build(); + dataStreamSource.sinkTo(kafkaSink); + } + + private void multipleTopicSink(DataStreamSource dataStreamSource) { + final Map> tagMap = new LinkedHashMap<>(); + final Map tableMap = new LinkedHashMap<>(); + final Map tableTopicMap = this.getTableTopicMap(); + final ObjectMapper objectMapper = new ObjectMapper(); + + SingleOutputStreamOperator mapOperator = deserialize(dataStreamSource); + logger.info("Build deserialize successful..."); + + for (Schema schema : getSortedSchemaList()) { + for (Table table : schema.getTables()) { + String sinkTableName = getSinkTableName(table); + OutputTag outputTag = new OutputTag(sinkTableName) {}; + tagMap.put(table, outputTag); + tableMap.put(table.getSchemaTableName(), table); + } + } + partitionByTableAndPrimarykey(mapOperator, tableMap); + logger.info("Build partitionBy successful..."); + final String schemaFieldName = config.getSchemaFieldName(); + SingleOutputStreamOperator process = mapOperator.process(new ProcessFunction() { + @Override + public void processElement(Map map, ProcessFunction.Context ctx, Collector out) + throws Exception { + LinkedHashMap source = (LinkedHashMap) map.get("source"); + try { + String result = objectMapper.writeValueAsString(map); + Table table = tableMap.get(source.get(schemaFieldName).toString() + "." + + source.get("table").toString()); + OutputTag outputTag = tagMap.get(table); + ctx.output(outputTag, result); + } catch (Exception e) { + out.collect(objectMapper.writeValueAsString(map)); + } + } + }); + logger.info("Build shunt successful..."); + tagMap.forEach((k, v) -> { + String topic = getSinkTableName(k); + if (tableTopicMap != null) { + String tableName = k.getName(); + String newTopic = tableTopicMap.get(tableName); + if (Asserts.isNotNullString(newTopic)) { + topic = newTopic; + } + } org.apache.flink.connector.kafka.sink.KafkaSinkBuilder kafkaSinkBuilder = KafkaSink.builder() .setBootstrapServers(config.getSink().get("brokers")) .setRecordSerializer(KafkaRecordSerializationSchema.builder() - .setTopic(config.getSink().get("topic")) + .setTopic(topic) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setTransactionalIdPrefix( @@ -127,90 +199,9 @@ public DataStreamSource build( if (!kafkaProducerConfig.isEmpty()) { kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); } + KafkaSink kafkaSink = kafkaSinkBuilder.build(); - dataStreamSource.sinkTo(kafkaSink); - } else { - Map> tagMap = new LinkedHashMap<>(); - Map tableMap = new LinkedHashMap<>(); - Map tableTopicMap = this.getTableTopicMap(); - ObjectMapper objectMapper = new ObjectMapper(); - SingleOutputStreamOperator mapOperator = dataStreamSource - .map(x -> objectMapper.readValue(x, Map.class)) - .returns(Map.class); - final List schemaList = config.getSchemaList(); - - final String schemaFieldName = config.getSchemaFieldName(); - if (Asserts.isNotNullCollection(schemaList)) { - for (Schema schema : schemaList) { - if (Asserts.isNullCollection(schema.getTables())) { - logger.error("Schema:{} tables is empty", schema.getName()); - // if schema tables is empty, throw exception - throw new IllegalArgumentException( - "Schema tables is empty, please check your configuration or check your database permission and try again."); - } - // if schema tables is not empty, sort by table name - List
tableList = schema.getTables().stream() - .sorted(Comparator.comparing(Table::getName)) - .collect(Collectors.toList()); - for (Table table : tableList) { - String sinkTableName = getSinkTableName(table); - OutputTag outputTag = new OutputTag(sinkTableName) {}; - tagMap.put(table, outputTag); - tableMap.put(table.getSchemaTableName(), table); - } - } - SingleOutputStreamOperator process = mapOperator.process(new ProcessFunction() { - - @Override - public void processElement(Map map, ProcessFunction.Context ctx, Collector out) - throws Exception { - LinkedHashMap source = (LinkedHashMap) map.get("source"); - try { - String result = objectMapper.writeValueAsString(map); - Table table = - tableMap.get(source.get(schemaFieldName).toString() + "." - + source.get("table").toString()); - OutputTag outputTag = tagMap.get(table); - ctx.output(outputTag, result); - } catch (Exception e) { - out.collect(objectMapper.writeValueAsString(map)); - } - } - }); - tagMap.forEach((k, v) -> { - String topic = getSinkTableName(k); - if (tableTopicMap != null) { - String tableName = k.getName(); - String newTopic = tableTopicMap.get(tableName); - if (Asserts.isNotNullString(newTopic)) { - topic = newTopic; - } - } - - org.apache.flink.connector.kafka.sink.KafkaSinkBuilder kafkaSinkBuilder = - KafkaSink.builder() - .setBootstrapServers(config.getSink().get("brokers")) - .setRecordSerializer(KafkaRecordSerializationSchema.builder() - .setTopic(topic) - .setValueSerializationSchema(new SimpleStringSchema()) - .build()) - .setTransactionalIdPrefix( - config.getSink().get("transactional.id.prefix") == null - ? "" - : config.getSink().get("transactional.id.prefix")) - .setDeliverGuarantee(DeliveryGuarantee.valueOf( - config.getSink().get("delivery.guarantee") == null - ? "NONE" - : config.getSink().get("delivery.guarantee"))); - if (!kafkaProducerConfig.isEmpty()) { - kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); - } - - KafkaSink kafkaSink = kafkaSinkBuilder.build(); - process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic); - }); - } - } - return dataStreamSource; + process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic); + }); } } diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java similarity index 80% rename from dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java rename to dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java index 83bfdbb81f..c4aac7f2c6 100644 --- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java @@ -21,16 +21,13 @@ import org.dinky.assertion.Asserts; import org.dinky.cdc.AbstractSinkBuilder; -import org.dinky.cdc.CDCBuilder; import org.dinky.cdc.SinkBuilder; +import org.dinky.cdc.convert.DataTypeConverter; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; import org.dinky.data.model.Table; import org.dinky.executor.CustomTableEnvironment; -import org.dinky.utils.ObjectConvertUtil; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -43,12 +40,10 @@ import java.io.Serializable; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.Comparator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -78,55 +73,38 @@ public SinkBuilder create(FlinkCDCConfig config) { @SuppressWarnings("rawtypes") @Override - public DataStreamSource build( - CDCBuilder cdcBuilder, + public void build( StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource dataStreamSource) { try { - SingleOutputStreamOperator mapOperator = - dataStreamSource.map((MapFunction) value -> objectMapper.readValue(value, Map.class)); - final List schemaList = config.getSchemaList(); - final String schemaFieldName = config.getSchemaFieldName(); - if (!Asserts.isNotNullCollection(schemaList)) { - return dataStreamSource; - } - - for (Schema schema : schemaList) { - if (Asserts.isNullCollection(schema.getTables())) { - // if schema tables is empty, throw exception - throw new IllegalArgumentException( - "Schema tables is empty, please check your configuration or check your database permission and try again."); + init(env, customTableEnvironment); + + SingleOutputStreamOperator mapOperator = deserialize(dataStreamSource); + logger.info("Build deserialize successful..."); + List sortedSchemaList = getSortedSchemaList(); + final Map tableMap = new LinkedHashMap<>(); + for (Schema schema : sortedSchemaList) { + for (Table table : schema.getTables()) { + tableMap.put(table.getSchemaTableName(), table); } - // if schema tables is not empty, sort by table name - List
tableList = schema.getTables().stream() - .sorted(Comparator.comparing(Table::getName)) - .collect(Collectors.toList()); - for (Table table : tableList) { + } + partitionByTableAndPrimarykey(mapOperator, tableMap); + logger.info("Build partitionBy successful..."); + for (Schema schema : sortedSchemaList) { + for (Table table : schema.getTables()) { final String tableName = table.getName(); final String schemaName = table.getSchema(); - SingleOutputStreamOperator filterOperator = mapOperator.filter((FilterFunction) value -> { - LinkedHashMap source = (LinkedHashMap) value.get("source"); - return tableName.equals(source.get("table").toString()) - && schemaName.equals(source.get(schemaFieldName).toString()); - }); - String topic = getSinkTableName(table); - if (Asserts.isNotNullString(config.getSink().get("topic"))) { - topic = config.getSink().get("topic"); - } else { - Map tableTopicMap = this.getTableTopicMap(); - if (tableTopicMap != null) { - String newTopic = tableTopicMap.get(tableName); - if (Asserts.isNotNullString(newTopic)) { - topic = newTopic; - } - } - } + SingleOutputStreamOperator singleOutputStreamOperator = + shunt(mapOperator, schemaName, tableName); + logger.info("Build shunt successful..."); + List columnNameList = new LinkedList<>(); List columnTypeList = new LinkedList<>(); buildColumn(columnNameList, columnTypeList, table.getColumns()); - SingleOutputStreamOperator stringOperator = - filterOperator.process(new ProcessFunction() { + + SingleOutputStreamOperator stringOperator = singleOutputStreamOperator + .process(new ProcessFunction() { @Override public void processElement(Map value, Context context, Collector collector) @@ -200,14 +178,29 @@ public void processElement(Map value, Context context, Collector collect collector.collect(objectMapper.writeValueAsString(after)); } } - }); - stringOperator.addSink(new FlinkKafkaProducer(topic, new SimpleStringSchema(), getProperties())); + }) + .name("SerializerWithMetadata"); + + String topic = getSinkTableName(table); + if (Asserts.isNotNullString(config.getSink().get("topic"))) { + topic = config.getSink().get("topic"); + } else { + Map tableTopicMap = this.getTableTopicMap(); + if (tableTopicMap != null) { + String newTopic = tableTopicMap.get(tableName); + if (Asserts.isNotNullString(newTopic)) { + topic = newTopic; + } + } + } + + stringOperator.addSink( + new FlinkKafkaProducer(topic, new SimpleStringSchema(), getProperties())); } } } catch (Exception ex) { logger.error("kafka sink error:", ex); } - return dataStreamSource; } private void initializeObjectMapper() { @@ -219,11 +212,6 @@ private void initializeObjectMapper() { objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); } - @Override - protected Object convertValue(Object value, LogicalType logicalType) { - return ObjectConvertUtil.convertValue(value, logicalType); - } - @SuppressWarnings("rawtypes") private void convertAttr( List columnNameList, @@ -237,7 +225,8 @@ private void convertAttr( for (int i = 0; i < columnNameList.size(); i++) { String columnName = columnNameList.get(i); Object columnNameValue = value.remove(columnName); - Object columnNameNewVal = convertValue(columnNameValue, columnTypeList.get(i)); + Object columnNameNewVal = + DataTypeConverter.convertToRowData(columnNameValue, columnTypeList.get(i), getSinkTimeZone()); value.put(columnName, columnNameNewVal); } value.put("__op", op); diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/print/PrintSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/print/PrintSinkBuilder.java new file mode 100644 index 0000000000..e7b8957678 --- /dev/null +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/print/PrintSinkBuilder.java @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.cdc.print; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import org.dinky.cdc.AbstractSinkBuilder; +import org.dinky.cdc.SinkBuilder; +import org.dinky.data.model.FlinkCDCConfig; +import org.dinky.data.model.Table; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.Serializable; +import java.util.List; + +public class PrintSinkBuilder extends AbstractSinkBuilder implements Serializable { + + public static final String KEY_WORD = "datastream-print"; + + public PrintSinkBuilder() {} + + public PrintSinkBuilder(FlinkCDCConfig config) { + super(config); + } + + @Override + protected void addSink( + DataStream rowDataDataStream, + Table table, + List columnNameList, + List columnTypeList) { + rowDataDataStream + .addSink(new RichSinkFunction() { + @Override + public void invoke(RowData value, Context context) throws Exception { + System.out.println(value.toString()); + } + }) + .name(String.format("Print Sink(table=[%s.%s])", getSinkSchemaName(table), getSinkTableName(table))); + } + + @Override + public String getHandle() { + return KEY_WORD; + } + + @Override + public SinkBuilder create(FlinkCDCConfig config) { + return new PrintSinkBuilder(config); + } +} diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java index d95d4917f8..cbbd03b315 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java @@ -21,25 +21,25 @@ import org.dinky.assertion.Asserts; import org.dinky.cdc.AbstractSinkBuilder; -import org.dinky.cdc.CDCBuilder; +import org.dinky.cdc.convert.DataTypeConverter; +import org.dinky.cdc.utils.FlinkStatementUtil; +import org.dinky.data.model.Column; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; import org.dinky.data.model.Table; -import org.dinky.executor.CustomTableEnvironment; import org.dinky.utils.JsonUtils; import org.dinky.utils.LogUtil; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.Operation; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; @@ -48,18 +48,13 @@ import org.apache.flink.util.OutputTag; import java.io.Serializable; -import java.math.BigDecimal; -import java.time.ZoneId; import java.util.ArrayList; -import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.stream.Collectors; public abstract class AbstractSqlSinkBuilder extends AbstractSinkBuilder implements Serializable { - protected ZoneId sinkTimeZone = ZoneId.of("UTC"); protected AbstractSqlSinkBuilder() {} @@ -68,7 +63,7 @@ protected AbstractSqlSinkBuilder(FlinkCDCConfig config) { } @SuppressWarnings("rawtypes") - protected FlatMapFunction sqlSinkRowFunction( + private FlatMapFunction sinkRowFunction( List columnNameList, List columnTypeList, String schemaTableName) { return (value, out) -> { try { @@ -105,15 +100,21 @@ private void rowCollect( Collector out, RowKind rowKind, Map value) { + if (Asserts.isNull(value)) { + return; + } Row row = Row.withPositions(rowKind, columnNameList.size()); for (int i = 0; i < columnNameList.size(); i++) { - row.setField(i, convertValue(value.get(columnNameList.get(i)), columnTypeList.get(i))); + row.setField( + i, + DataTypeConverter.convertToRow( + value.get(columnNameList.get(i)), columnTypeList.get(i), sinkTimeZone)); } out.collect(row); } @SuppressWarnings("rawtypes") - protected DataStream buildRow( + private DataStream buildRow( DataStream filterOperator, List columnNameList, List columnTypeList, @@ -121,31 +122,24 @@ protected DataStream buildRow( TypeInformation[] typeInformation = TypeConversions.fromDataTypeToLegacyInfo( TypeConversions.fromLogicalToDataType(columnTypeList.toArray(new LogicalType[0]))); - return filterOperator.flatMap( - sqlSinkRowFunction(columnNameList, columnTypeList, schemaTableName), - new RowTypeInfo(typeInformation, columnNameList.toArray(new String[0]))); - } - - @Override - protected Optional convertDecimalType(Object value, LogicalType logicalType) { - if (logicalType instanceof DecimalType) { - return Optional.of(new BigDecimal(String.valueOf(value))); - } - return Optional.empty(); + return filterOperator + .flatMap( + sinkRowFunction(columnNameList, columnTypeList, schemaTableName), + new RowTypeInfo(typeInformation, columnNameList.toArray(new String[0]))) + .name("FlatMapRow"); } @SuppressWarnings("rawtypes") - protected void addTableSinkForTags( - CustomTableEnvironment customTableEnvironment, - Map> tagMap, - SingleOutputStreamOperator processOperator) { + private void addTableSinkForTags( + Map> tagMap, SingleOutputStreamOperator processOperator) { tagMap.forEach((table, tag) -> { final String schemaTableName = table.getSchemaTableName(); try { - DataStream filterOperator = shunt(processOperator, table, tag); - logger.info("Build {} shunt successful...", schemaTableName); - List columnNameList = new ArrayList<>(); - List columnTypeList = new ArrayList<>(); + processOperator.forward(); + DataStream filterOperator = + processOperator.getSideOutput(tag).forward(); + final List columnNameList = new ArrayList<>(); + final List columnTypeList = new ArrayList<>(); buildColumn(columnNameList, columnTypeList, table.getColumns()); DataStream rowDataDataStream = buildRow( filterOperator, columnNameList, columnTypeList, schemaTableName) @@ -153,7 +147,7 @@ protected void addTableSinkForTags( logger.info("Build {} flatMap successful...", schemaTableName); logger.info("Start build {} sink...", schemaTableName); - addTableSink(customTableEnvironment, rowDataDataStream, table); + addTableSink(rowDataDataStream, table); } catch (Exception e) { logger.error("Build {} cdc sync failed...", schemaTableName); logger.error(LogUtil.getError(e)); @@ -162,123 +156,63 @@ protected void addTableSinkForTags( } @SuppressWarnings("rawtypes") - protected SingleOutputStreamOperator createMapSingleOutputStreamOperator( - DataStreamSource dataStreamSource, Map> tagMap, Map tableMap) { + private SingleOutputStreamOperator shunt( + SingleOutputStreamOperator mapOperator, + Map> tagMap, + Map tableMap) { final String schemaFieldName = config.getSchemaFieldName(); - SingleOutputStreamOperator mapOperator = - dataStreamSource.map(x -> objectMapper.readValue(x, Map.class)).returns(Map.class); - Map split = config.getSplit(); - partitionByTableAndPrimarykey(mapOperator, tableMap); - return mapOperator.process(new ProcessFunction() { - @Override - public void processElement(Map map, ProcessFunction.Context ctx, Collector out) { - LinkedHashMap source = (LinkedHashMap) map.get("source"); - try { - String tableName = createTableName(source, schemaFieldName, split); - OutputTag outputTag = tagMap.get(tableMap.get(tableName)); - ctx.output(outputTag, map); - } catch (Exception e) { - logger.error(e.getMessage(), e); - out.collect(map); - } - } - }); + final Map configSplit = config.getSplit(); + return mapOperator + .process(new ProcessFunction() { + @Override + public void processElement(Map map, ProcessFunction.Context ctx, Collector out) { + LinkedHashMap source = (LinkedHashMap) map.get("source"); + try { + String tableName = getMergedTableName(source, schemaFieldName, configSplit); + OutputTag outputTag = tagMap.get(tableMap.get(tableName)); + ctx.output(outputTag, map); + } catch (Exception e) { + logger.error(e.getMessage(), e); + out.collect(map); + } + } + }) + .name("Shunt"); } - protected abstract void addTableSink( - CustomTableEnvironment customTableEnvironment, DataStream rowDataDataStream, Table table); - - /** - * @param source - * @param schemaFieldName - * @param split must keep for flink use. - * @return - */ - protected abstract String createTableName(LinkedHashMap source, String schemaFieldName, Map split); + protected abstract void addTableSink(DataStream rowDataDataStream, Table table); @SuppressWarnings("rawtypes") - @Override - public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { - final String timeZone = config.getSink().get("timezone"); - config.getSink().remove("timezone"); - if (Asserts.isNotNullString(timeZone)) { - sinkTimeZone = ZoneId.of(timeZone); - logger.info("Sink timezone is {}", sinkTimeZone); - } - - final List schemaList = config.getSchemaList(); - if (Asserts.isNullCollection(schemaList)) { - return dataStreamSource; - } - - executeCatalogStatement(customTableEnvironment); + protected void buildPipeline(DataStreamSource dataStreamSource, List schemaList) { + executeCatalogStatement(); + SingleOutputStreamOperator mapOperator = deserialize(dataStreamSource); logger.info("Build deserialize successful..."); - Map> tagMap = new LinkedHashMap<>(); - Map tableMap = new LinkedHashMap<>(); + final Map> tagMap = new LinkedHashMap<>(); + final Map tableMap = new LinkedHashMap<>(); for (Schema schema : schemaList) { - if (Asserts.isNullCollection(schema.getTables())) { - // if schema tables is empty, throw exception - throw new IllegalArgumentException( - "Schema tables is empty, please check your configuration or check your database permission and try again."); - } - // if schema tables is not empty, sort by table name - List
tableList = schema.getTables().stream() - .sorted(Comparator.comparing(Table::getName)) - .collect(Collectors.toList()); - for (Table table : tableList) { - String sinkTableName = getSinkTableName(table); - OutputTag outputTag = new OutputTag(sinkTableName) {}; + for (Table table : schema.getTables()) { + OutputTag outputTag = new OutputTag(getSinkTableName(table)) {}; tagMap.put(table, outputTag); tableMap.put(table.getSchemaTableName(), table); } } - SingleOutputStreamOperator processOperator = - createMapSingleOutputStreamOperator(dataStreamSource, tagMap, tableMap); - addTableSinkForTags(customTableEnvironment, tagMap, processOperator); - + partitionByTableAndPrimarykey(mapOperator, tableMap); + logger.info("Build partitionBy successful..."); + SingleOutputStreamOperator singleOutputStreamOperator = shunt(mapOperator, tagMap, tableMap); + logger.info("Build shunt successful..."); + addTableSinkForTags(tagMap, singleOutputStreamOperator); + logger.info("Build sink successful..."); List> trans = customTableEnvironment.getPlanner().translate(modifyOperations); for (Transformation item : trans) { env.addOperator(item); } logger.info("A total of {} table cdc sync were build successful...", trans.size()); - return dataStreamSource; } - protected void partitionByTableAndPrimarykey( - SingleOutputStreamOperator mapOperator, Map tableMap) { - mapOperator.partitionCustom( - new Partitioner() { - @Override - public int partition(String key, int numPartitions) { - return Math.abs(key.hashCode()) % numPartitions; - } - }, - map -> { - LinkedHashMap source = (LinkedHashMap) map.get("source"); - String tableName = createTableName(source, config.getSchemaFieldName(), config.getSplit()); - Table table = tableMap.get(tableName); - List primaryKeys = table.getColumns().stream() - .map(column -> { - if (column.isKeyFlag()) { - return column.getName(); - } - return ""; - }) - .collect(Collectors.toList()); - - return tableName + String.join("_", primaryKeys); - }); - mapOperator.name("PartitionByPrimarykey"); - } - - protected void executeCatalogStatement(CustomTableEnvironment customTableEnvironment) {} + protected void executeCatalogStatement() {} /** * replace view name middle to under line for flink use view name @@ -292,4 +226,32 @@ public static String replaceViewNameMiddleLineToUnderLine(String viewName) { } return viewName; } + + protected List createInsertOperations(Table table, String viewName, String tableName) { + String cdcSqlInsert = FlinkStatementUtil.getCDCInsertSql(table, tableName, viewName, config); + logger.info(cdcSqlInsert); + + List operations = customTableEnvironment.getParser().parse(cdcSqlInsert); + logger.info("Create {} FlinkSQL insert into successful...", tableName); + if (operations.isEmpty()) { + return operations; + } + + Operation operation = operations.get(0); + if (operation instanceof ModifyOperation) { + modifyOperations.add((ModifyOperation) operation); + } + return operations; + } + + protected List getPKList(Table table) { + if (Asserts.isNullCollection(table.getColumns())) { + return new ArrayList<>(); + } + + return table.getColumns().stream() + .filter(Column::isKeyFlag) + .map(Column::getName) + .collect(Collectors.toList()); + } } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java index b01ff11d06..b869276bd7 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java @@ -23,26 +23,15 @@ import org.dinky.cdc.utils.FlinkStatementUtil; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Table; -import org.dinky.executor.CustomTableEnvironment; -import org.dinky.utils.SplitUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.types.Row; import java.io.Serializable; -import java.time.Instant; -import java.time.LocalDate; -import java.util.Arrays; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Optional; public class SQLSinkBuilder extends AbstractSqlSinkBuilder implements Serializable { @@ -55,19 +44,7 @@ private SQLSinkBuilder(FlinkCDCConfig config) { super(config); } - @Override - protected void initTypeConverterList() { - typeConverterList = Arrays.asList( - this::convertDateType, - this::convertTimestampType, - this::convertFloatType, - this::convertDecimalType, - this::convertBigIntType, - this::convertVarBinaryType); - } - - private String addSourceTableView( - CustomTableEnvironment customTableEnvironment, DataStream rowDataDataStream, Table table) { + private String addSourceTableView(DataStream rowDataDataStream, Table table) { // Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_ String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline()); @@ -78,18 +55,14 @@ private String addSourceTableView( } @Override - protected void addTableSink( - CustomTableEnvironment customTableEnvironment, DataStream rowDataDataStream, Table table) { - String viewName = addSourceTableView(customTableEnvironment, rowDataDataStream, table); + protected void addTableSink(DataStream rowDataDataStream, Table table) { + final String viewName = addSourceTableView(rowDataDataStream, table); + final String sinkSchemaName = getSinkSchemaName(table); + final String sinkTableName = getSinkTableName(table); - // 下游库名称 - String sinkSchemaName = getSinkSchemaName(table); - // 下游表名称 - String sinkTableName = getSinkTableName(table); - - // 这个地方要根据下游表的数量进行生成 + // Multiple sinks and single sink if (CollectionUtils.isEmpty(config.getSinks())) { - addSinkInsert(customTableEnvironment, table, viewName, sinkTableName, sinkSchemaName, sinkTableName); + addSinkInsert(table, viewName, sinkTableName, sinkSchemaName, sinkTableName); } else { for (int index = 0; index < config.getSinks().size(); index++) { String tableName = sinkTableName; @@ -98,27 +71,20 @@ protected void addTableSink( } config.setSink(config.getSinks().get(index)); - addSinkInsert(customTableEnvironment, table, viewName, tableName, sinkSchemaName, sinkTableName); + addSinkInsert(table, viewName, tableName, sinkSchemaName, sinkTableName); } } } private List addSinkInsert( - CustomTableEnvironment customTableEnvironment, - Table table, - String viewName, - String tableName, - String sinkSchemaName, - String sinkTableName) { + Table table, String viewName, String tableName, String sinkSchemaName, String sinkTableName) { String pkList = StringUtils.join(getPKList(table), "."); - String flinkDDL = FlinkStatementUtil.getFlinkDDL(table, tableName, config, sinkSchemaName, sinkTableName, pkList); logger.info(flinkDDL); customTableEnvironment.executeSql(flinkDDL); logger.info("Create {} FlinkSQL DDL successful...", tableName); - - return createInsertOperations(customTableEnvironment, table, viewName, tableName); + return createInsertOperations(table, viewName, tableName); } @Override @@ -130,57 +96,4 @@ public String getHandle() { public SinkBuilder create(FlinkCDCConfig config) { return new SQLSinkBuilder(config); } - - @Override - protected String createTableName(LinkedHashMap source, String schemaFieldName, Map split) { - return SplitUtil.getReValue(source.get(schemaFieldName).toString(), split) - + "." - + SplitUtil.getReValue(source.get("table").toString(), split); - } - - @Override - protected Optional convertDateType(Object value, LogicalType logicalType) { - if (logicalType instanceof DateType) { - if (value instanceof Integer) { - return Optional.of(LocalDate.ofEpochDay((Integer) value)); - } - if (value instanceof Long) { - return Optional.of( - Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDate()); - } - return Optional.of( - Instant.parse(value.toString()).atZone(sinkTimeZone).toLocalDate()); - } - return Optional.empty(); - } - - @Override - protected Optional convertTimestampType(Object value, LogicalType logicalType) { - if (logicalType instanceof TimestampType) { - if (value instanceof Integer) { - return Optional.of(Instant.ofEpochMilli(((Integer) value).longValue()) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } else if (value instanceof String) { - return Optional.of( - Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime()); - } else { - TimestampType logicalType1 = (TimestampType) logicalType; - if (logicalType1.getPrecision() == 3) { - return Optional.of(Instant.ofEpochMilli((long) value) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } else if (logicalType1.getPrecision() > 3) { - return Optional.of( - Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } - return Optional.of(Instant.ofEpochSecond(((long) value)) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } - } - return Optional.empty(); - } } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java index 3d4ce976ee..bc59a989a4 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java @@ -24,20 +24,11 @@ import org.dinky.cdc.utils.FlinkStatementUtil; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Table; -import org.dinky.executor.CustomTableEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.types.Row; import java.io.Serializable; -import java.time.Instant; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; public class SQLCatalogSinkBuilder extends AbstractSqlSinkBuilder implements Serializable { @@ -50,18 +41,7 @@ private SQLCatalogSinkBuilder(FlinkCDCConfig config) { } @Override - protected void initTypeConverterList() { - typeConverterList = Arrays.asList( - this::convertDateType, - this::convertTimestampType, - this::convertDecimalType, - this::convertBigIntType, - this::convertVarBinaryType); - } - - @Override - public void addTableSink( - CustomTableEnvironment customTableEnvironment, DataStream rowDataDataStream, Table table) { + public void addTableSink(DataStream rowDataDataStream, Table table) { String catalogName = config.getSink().get("catalog.name"); String sinkSchemaName = getSinkSchemaName(table); @@ -74,7 +54,7 @@ public void addTableSink( viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream)); logger.info("Create {} temporaryView successful...", viewName); - createInsertOperations(customTableEnvironment, table, viewName, sinkTableName); + createInsertOperations(table, viewName, sinkTableName); } @Override @@ -87,48 +67,8 @@ public SinkBuilder create(FlinkCDCConfig config) { return new SQLCatalogSinkBuilder(config); } - protected void executeCatalogStatement(CustomTableEnvironment customTableEnvironment) { - logger.info("Build catalog successful..."); + protected void executeCatalogStatement() { customTableEnvironment.executeSql(FlinkStatementUtil.getCreateCatalogStatement(config)); - } - - @Override - protected String createTableName(LinkedHashMap source, String schemaFieldName, Map split) { - return source.get(schemaFieldName).toString() + "." - + source.get("table").toString(); - } - - @Override - protected Optional convertDateType(Object value, LogicalType logicalType) { - if (logicalType instanceof DateType) { - if (value instanceof Integer) { - return Optional.of(Instant.ofEpochMilli(((Integer) value).longValue()) - .atZone(sinkTimeZone) - .toLocalDate()); - } - return Optional.of( - Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDate()); - } - return Optional.empty(); - } - - @Override - protected Optional convertTimestampType(Object value, LogicalType logicalType) { - if (logicalType instanceof TimestampType) { - if (value instanceof Integer) { - return Optional.of(Instant.ofEpochMilli(((Integer) value).longValue()) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } - - if (value instanceof String) { - return Optional.of( - Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime()); - } - - return Optional.of( - Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime()); - } - return Optional.empty(); + logger.info("Build catalog successful..."); } } diff --git a/dinky-cdc/dinky-cdc-core/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder b/dinky-cdc/dinky-cdc-core/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder new file mode 100644 index 0000000000..0a2728a1c7 --- /dev/null +++ b/dinky-cdc/dinky-cdc-core/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder @@ -0,0 +1,3 @@ +org.dinky.cdc.print.PrintSinkBuilder +org.dinky.cdc.kafka.KafkaSinkBuilder +org.dinky.cdc.kafka.KafkaSinkJsonBuilder \ No newline at end of file diff --git a/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/SinkBuilderTest.java b/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/SinkBuilderTest.java deleted file mode 100644 index b609a8f103..0000000000 --- a/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/SinkBuilderTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.cdc; - -import org.dinky.cdc.sql.SQLSinkBuilder; - -import org.apache.flink.table.types.logical.TimestampType; - -import org.junit.Assert; -import org.junit.Test; - -/** - * CDCSOURCETest - * - */ -public class SinkBuilderTest { - - @Test - public void convertValueTimestampTest() { - SQLSinkBuilder sqlSinkBuilder = new SQLSinkBuilder(); - Object value0 = sqlSinkBuilder.convertValue(1688946316L, new TimestampType(0)); - Object value3 = sqlSinkBuilder.convertValue(1688946316123L, new TimestampType(3)); - Object value6 = sqlSinkBuilder.convertValue(1688946316123456L, new TimestampType(6)); - String target0 = "2023-07-09T23:45:16"; - String target3 = "2023-07-09T23:45:16.123"; - String target6 = "2023-07-09T23:45:16.123"; - Assert.assertEquals(target0, value0.toString()); - Assert.assertEquals(target3, value3.toString()); - Assert.assertEquals(target6, value6.toString()); - } -} diff --git a/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/convert/DataTypeConverterTest.java b/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/convert/DataTypeConverterTest.java new file mode 100644 index 0000000000..6eac6d75e5 --- /dev/null +++ b/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/convert/DataTypeConverterTest.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.cdc.convert; + +import org.apache.flink.table.types.logical.TimestampType; + +import java.time.ZoneId; + +import org.junit.Assert; + +public class DataTypeConverterTest { + + // todo: check the time zone of Timestamp + // @Test + public void testConvertToRowWithTimestamp() { + Object value0 = DataTypeConverter.convertToRow(1688946316L, new TimestampType(0), ZoneId.systemDefault()); + Object value3 = DataTypeConverter.convertToRow(1688946316123L, new TimestampType(3), ZoneId.systemDefault()); + Object value6 = DataTypeConverter.convertToRow(1688946316123456L, new TimestampType(6), ZoneId.systemDefault()); + String target0 = "2023-07-10T07:45:16"; + String target3 = "2023-07-10T07:45:16.123"; + String target6 = "2023-07-10T07:45:16.123"; + Assert.assertEquals(target0, value0.toString()); + Assert.assertEquals(target3, value3.toString()); + Assert.assertEquals(target6, value6.toString()); + } + + // @Test + public void testConvertToRowDataWithTimestamp() { + Object value0 = DataTypeConverter.convertToRowData(1688946316L, new TimestampType(0), ZoneId.systemDefault()); + Object value3 = + DataTypeConverter.convertToRowData(1688946316123L, new TimestampType(3), ZoneId.systemDefault()); + Object value6 = + DataTypeConverter.convertToRowData(1688946316123456L, new TimestampType(6), ZoneId.systemDefault()); + String target0 = "2023-07-10T07:45:16"; + String target3 = "2023-07-10T07:45:16.123"; + String target6 = "2023-07-10T07:45:16.123"; + Assert.assertEquals(target0, value0.toString()); + Assert.assertEquals(target3, value3.toString()); + Assert.assertEquals(target6, value6.toString()); + } +} diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSchemaEvolutionSinkBuilder.java b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSchemaEvolutionSinkBuilder.java index 7623c26e4e..43d7b65626 100644 --- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSchemaEvolutionSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSchemaEvolutionSinkBuilder.java @@ -21,7 +21,6 @@ import org.dinky.assertion.Asserts; import org.dinky.cdc.AbstractSinkBuilder; -import org.dinky.cdc.CDCBuilder; import org.dinky.cdc.SinkBuilder; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; @@ -41,12 +40,10 @@ import org.apache.flink.util.OutputTag; import java.io.Serializable; -import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.stream.Collectors; public class DorisSchemaEvolutionSinkBuilder extends AbstractSinkBuilder implements Serializable { @@ -70,12 +67,13 @@ public SinkBuilder create(FlinkCDCConfig config) { @SuppressWarnings("rawtypes") @Override - public DataStreamSource build( - CDCBuilder cdcBuilder, + public void build( StreamExecutionEnvironment env, CustomTableEnvironment customTableEnvironment, DataStreamSource dataStreamSource) { + init(env, customTableEnvironment); + Map sink = config.getSink(); Properties properties = getProperties(); @@ -83,51 +81,46 @@ public DataStreamSource build( properties.setProperty("format", "json"); properties.setProperty("read_json_by_line", "true"); - final List schemaList = config.getSchemaList(); - if (!Asserts.isNotNullCollection(schemaList)) { - return dataStreamSource; - } - - SingleOutputStreamOperator mapOperator = - dataStreamSource.map(x -> objectMapper.readValue(x, Map.class)).returns(Map.class); - final String schemaFieldName = config.getSchemaFieldName(); + SingleOutputStreamOperator mapOperator = deserialize(dataStreamSource); + logger.info("Build deserialize successful..."); - Map> tagMap = new LinkedHashMap<>(); - Map tableMap = new LinkedHashMap<>(); - for (Schema schema : schemaList) { - if (Asserts.isNullCollection(schema.getTables())) { - // if schema tables is empty, throw exception - throw new IllegalArgumentException( - "Schema tables is empty, please check your configuration or check your database permission and try again."); + List sortedSchemaList = getSortedSchemaList(); + final Map tableMap = new LinkedHashMap<>(); + for (Schema schema : sortedSchemaList) { + for (Table table : schema.getTables()) { + tableMap.put(table.getSchemaTableName(), table); } - // if schema tables is not empty, sort by table name - List
tableList = schema.getTables().stream() - .sorted(Comparator.comparing(Table::getName)) - .collect(Collectors.toList()); - + } + partitionByTableAndPrimarykey(mapOperator, tableMap); + logger.info("Build partitionBy successful..."); + Map> tagMap = new LinkedHashMap<>(); + for (Schema schema : getSortedSchemaList()) { for (Table table : schema.getTables()) { OutputTag outputTag = new OutputTag(getSinkTableName(table)) {}; tagMap.put(table, outputTag); - tableMap.put(table.getSchemaTableName(), table); } } - - SingleOutputStreamOperator process = mapOperator.process(new ProcessFunction() { - - @Override - public void processElement(Map map, Context ctx, Collector out) throws Exception { - LinkedHashMap source = (LinkedHashMap) map.get("source"); - String result = objectMapper.writeValueAsString(map); - try { - Table table = tableMap.get(source.get(schemaFieldName).toString() - + "." - + source.get("table").toString()); - ctx.output(tagMap.get(table), result); - } catch (Exception e) { - out.collect(result); - } - } - }); + final String schemaFieldName = config.getSchemaFieldName(); + SingleOutputStreamOperator process = mapOperator + .process(new ProcessFunction() { + + @Override + public void processElement(Map map, Context ctx, Collector out) throws Exception { + LinkedHashMap source = (LinkedHashMap) map.get("source"); + String result = objectMapper.writeValueAsString(map); + try { + Table table = + tableMap.get(source.get(schemaFieldName).toString() + + "." + + source.get("table").toString()); + ctx.output(tagMap.get(table), result); + } catch (Exception e) { + out.collect(result); + } + } + }) + .name("Shunt"); + logger.info("Build shunt successful..."); tagMap.forEach((table, v) -> { DorisOptions dorisOptions = DorisOptions.builder() @@ -195,7 +188,6 @@ public void processElement(Map map, Context ctx, Collector out) throws E "Doris Schema Evolution Sink(table=[%s.%s])", getSinkSchemaName(table), getSinkTableName(table))); }); - return dataStreamSource; } @Override diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkBuilder.java b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkBuilder.java index d384e1f69d..c00d33e33e 100644 --- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkBuilder.java @@ -31,7 +31,6 @@ import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; @@ -67,7 +66,6 @@ public SinkBuilder create(FlinkCDCConfig config) { @Override public void addSink( - StreamExecutionEnvironment env, DataStream rowDataDataStream, Table table, List columnNameList, diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java deleted file mode 100644 index 8cdd0df10a..0000000000 --- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.cdc.kafka; - -import org.dinky.assertion.Asserts; -import org.dinky.cdc.AbstractSinkBuilder; -import org.dinky.cdc.CDCBuilder; -import org.dinky.cdc.SinkBuilder; -import org.dinky.data.model.FlinkCDCConfig; -import org.dinky.data.model.Schema; -import org.dinky.data.model.Table; -import org.dinky.executor.CustomTableEnvironment; - -import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.connector.base.DeliveryGuarantee; -import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; -import org.apache.flink.connector.kafka.sink.KafkaSink; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; - -import java.io.Serializable; -import java.util.Comparator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; - -public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializable { - - public static final String KEY_WORD = "datastream-kafka"; - public static final String TRANSACTIONAL_ID = "transactional.id"; - - public KafkaSinkBuilder() {} - - public KafkaSinkBuilder(FlinkCDCConfig config) { - super(config); - } - - @Override - public String getHandle() { - return KEY_WORD; - } - - @Override - public SinkBuilder create(FlinkCDCConfig config) { - return new KafkaSinkBuilder(config); - } - - @SuppressWarnings("rawtypes") - @Override - public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { - Properties kafkaProducerConfig = getProperties(); - if (Asserts.isNotNullString(config.getSink().get("topic"))) { - org.apache.flink.connector.kafka.sink.KafkaSinkBuilder kafkaSinkBuilder = - KafkaSink.builder() - .setBootstrapServers(config.getSink().get("brokers")) - .setRecordSerializer(KafkaRecordSerializationSchema.builder() - .setTopic(config.getSink().get("topic")) - .setValueSerializationSchema(new SimpleStringSchema()) - .build()) - .setTransactionalIdPrefix(config.getSink().get("transactional.id.prefix") == null - ? "" - : config.getSink().get("transactional.id.prefix")) - .setDeliverGuarantee( - DeliveryGuarantee.valueOf(config.getSink().get("delivery.guarantee") == null - ? "NONE" - : config.getSink().get("delivery.guarantee"))); - if (!kafkaProducerConfig.isEmpty()) { - kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); - } - KafkaSink kafkaSink = kafkaSinkBuilder.build(); - dataStreamSource.sinkTo(kafkaSink); - } else { - Map> tagMap = new LinkedHashMap<>(); - Map tableTopicMap = this.getTableTopicMap(); - Map tableMap = new LinkedHashMap<>(); - ObjectMapper objectMapper = new ObjectMapper(); - SingleOutputStreamOperator mapOperator = dataStreamSource - .map(x -> objectMapper.readValue(x, Map.class)) - .returns(Map.class); - final List schemaList = config.getSchemaList(); - - final String schemaFieldName = config.getSchemaFieldName(); - if (Asserts.isNotNullCollection(schemaList)) { - for (Schema schema : schemaList) { - if (Asserts.isNullCollection(schema.getTables())) { - // if schema tables is empty, throw exception - throw new IllegalArgumentException( - "Schema tables is empty, please check your configuration or check your database permission and try again."); - } - // if schema tables is not empty, sort by table name - List
tableList = schema.getTables().stream() - .sorted(Comparator.comparing(Table::getName)) - .collect(Collectors.toList()); - for (Table table : tableList) { - String sinkTableName = getSinkTableName(table); - OutputTag outputTag = new OutputTag(sinkTableName) {}; - tagMap.put(table, outputTag); - tableMap.put(table.getSchemaTableName(), table); - } - } - SingleOutputStreamOperator process = mapOperator.process(new ProcessFunction() { - - @Override - public void processElement(Map map, ProcessFunction.Context ctx, Collector out) - throws Exception { - LinkedHashMap source = (LinkedHashMap) map.get("source"); - try { - String result = objectMapper.writeValueAsString(map); - Table table = - tableMap.get(source.get(schemaFieldName).toString() - + "." - + source.get("table").toString()); - OutputTag outputTag = tagMap.get(table); - ctx.output(outputTag, result); - } catch (Exception e) { - out.collect(objectMapper.writeValueAsString(map)); - } - } - }); - - tagMap.forEach((k, v) -> { - String topic = getSinkTableName(k); - if (tableTopicMap != null) { - String tableName = k.getName(); - String newTopic = tableTopicMap.get(tableName); - if (Asserts.isNotNullString(newTopic)) { - topic = newTopic; - } - } - org.apache.flink.connector.kafka.sink.KafkaSinkBuilder kafkaSinkBuilder = - KafkaSink.builder() - .setBootstrapServers(config.getSink().get("brokers")) - .setRecordSerializer(KafkaRecordSerializationSchema.builder() - .setTopic(topic) - .setValueSerializationSchema(new SimpleStringSchema()) - .build()) - .setTransactionalIdPrefix(config.getSink().get("transactional.id.prefix") == null - ? "" - : config.getSink().get("transactional.id.prefix")) - .setDeliverGuarantee( - DeliveryGuarantee.valueOf(config.getSink().get("delivery.guarantee") == null - ? "NONE" - : config.getSink().get("delivery.guarantee"))); - if (!kafkaProducerConfig.isEmpty()) { - kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); - } - - KafkaSink kafkaSink = kafkaSinkBuilder.build(); - process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic); - }); - } - } - return dataStreamSource; - } -} diff --git a/dinky-cdc/dinky-cdc-plus/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder b/dinky-cdc/dinky-cdc-plus/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder index 763c78ccf0..6bc1916a78 100644 --- a/dinky-cdc/dinky-cdc-plus/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder +++ b/dinky-cdc/dinky-cdc-plus/src/main/resources/META-INF/services/org.dinky.cdc.SinkBuilder @@ -1,4 +1,2 @@ -org.dinky.cdc.kafka.KafkaSinkBuilder -org.dinky.cdc.kafka.KafkaSinkJsonBuilder org.dinky.cdc.doris.DorisSinkBuilder org.dinky.cdc.doris.DorisSchemaEvolutionSinkBuilder \ No newline at end of file diff --git a/dinky-common/src/main/java/org/dinky/data/model/FlinkCDCConfig.java b/dinky-common/src/main/java/org/dinky/data/model/FlinkCDCConfig.java index 29b2e755e1..406954b6bb 100644 --- a/dinky-common/src/main/java/org/dinky/data/model/FlinkCDCConfig.java +++ b/dinky-common/src/main/java/org/dinky/data/model/FlinkCDCConfig.java @@ -65,6 +65,7 @@ public class FlinkCDCConfig { private List> sinks; private List schemaList; private String schemaFieldName; + private boolean isMockTest; public FlinkCDCConfig( String type, @@ -139,6 +140,7 @@ public void init( this.sink = sink; this.sinks = sinks; this.jdbc = jdbc; + this.isMockTest = false; } private boolean isSkip(String key) { @@ -165,6 +167,9 @@ private boolean isSkip(String key) { } public String getSinkConfigurationString() { + if (isMockTest) { + return "'connector' = 'dinky-mock'"; + } return sink.entrySet().stream() .filter(t -> !isSkip(t.getKey())) .map(t -> String.format("'%s' = '%s'", t.getKey(), t.getValue())) @@ -326,4 +331,12 @@ public Map getSplit() { public void setSplit(Map split) { this.split = split; } + + public boolean isMockTest() { + return isMockTest; + } + + public void setMockTest(boolean mockTest) { + isMockTest = mockTest; + } } diff --git a/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockSinkFunction.java b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockSinkFunction.java index 2bb1ecc33d..40b00ac886 100644 --- a/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockSinkFunction.java +++ b/dinky-core/src/main/java/org/dinky/connector/mock/sink/MockSinkFunction.java @@ -19,6 +19,8 @@ package org.dinky.connector.mock.sink; +import org.dinky.constant.FlinkConstant; + import org.apache.flink.api.common.accumulators.SerializedListAccumulator; import org.apache.flink.api.common.typeutils.base.MapSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; @@ -27,6 +29,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,9 +57,19 @@ public void open(Configuration parameters) throws Exception { public void invoke(RowData rowData, Context context) throws Exception { List fieldNames = rowType.getFieldNames(); Map rowDataMap = new HashMap<>(); + rowDataMap.put(FlinkConstant.OP, rowData.getRowKind().shortString()); for (int i = 0; i < fieldNames.size(); i++) { RowData.FieldGetter fieldGetter = RowData.createFieldGetter(rowType.getTypeAt(i), i); - rowDataMap.put(fieldNames.get(i), String.valueOf(fieldGetter.getFieldOrNull(rowData))); + switch (rowType.getTypeAt(i).getTypeRoot()) { + case BINARY: + case VARBINARY: + rowDataMap.put( + fieldNames.get(i), + new String((byte[]) fieldGetter.getFieldOrNull(rowData), StandardCharsets.UTF_8)); + break; + default: + rowDataMap.put(fieldNames.get(i), String.valueOf(fieldGetter.getFieldOrNull(rowData))); + } } rowDataList.add(rowDataMap, new MapSerializer<>(new StringSerializer(), new StringSerializer())); } diff --git a/dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java b/dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java index b280c7cac7..5eed7a0f86 100644 --- a/dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java +++ b/dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java @@ -28,7 +28,7 @@ public interface FlinkConstant { /** 本地模式host */ String LOCAL_HOST = "localhost"; /** changlog op */ - String OP = "op"; + String OP = "__op__"; /** flink 默认端口 **/ Integer FLINK_REST_DEFAULT_PORT = 8081; diff --git a/dinky-core/src/main/java/org/dinky/data/result/AbstractResultBuilder.java b/dinky-core/src/main/java/org/dinky/data/result/AbstractResultBuilder.java index 4d940c0c9a..73045deaba 100644 --- a/dinky-core/src/main/java/org/dinky/data/result/AbstractResultBuilder.java +++ b/dinky-core/src/main/java/org/dinky/data/result/AbstractResultBuilder.java @@ -19,6 +19,12 @@ package org.dinky.data.result; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + public abstract class AbstractResultBuilder { protected String id; + protected final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + 5, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.DiscardOldestPolicy()); } diff --git a/dinky-core/src/main/java/org/dinky/data/result/MockResultBuilder.java b/dinky-core/src/main/java/org/dinky/data/result/MockResultBuilder.java index 0b5ec2885a..9372f46916 100644 --- a/dinky-core/src/main/java/org/dinky/data/result/MockResultBuilder.java +++ b/dinky-core/src/main/java/org/dinky/data/result/MockResultBuilder.java @@ -21,181 +21,50 @@ import org.dinky.assertion.Asserts; import org.dinky.job.JobHandler; -import org.dinky.utils.JsonUtils; -import org.apache.flink.api.common.typeutils.base.MapSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.catalog.ResolvedSchema; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; - -import alluxio.shaded.client.com.google.common.collect.Lists; -import cn.hutool.core.collection.ListUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public class MockResultBuilder extends AbstractResultBuilder implements ResultBuilder { private final Integer maxRowNum; + private final boolean isChangeLog; private final boolean isAutoCancel; - private final String MOCK_RESULT_TABLE_IDENTIFIER = "dinkySinkResultTableIdentifier"; - private final String MOCK_RESULT_COLUMN_IDENTIFIER = "dinkySinkResultColumnIdentifier"; - public MockResultBuilder(String id, Integer maxRowNum, boolean isAutoCancel) { + public MockResultBuilder(String id, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel) { this.id = id; this.maxRowNum = maxRowNum; + this.isChangeLog = isChangeLog; this.isAutoCancel = isAutoCancel; } @Override public IResult getResult(TableResult tableResult) { - Optional Optional = tableResult.getJobClient(); - if (Optional.isPresent()) { - JobClient jobClient = Optional.get(); - // get table identifiers - ResolvedSchema resolvedSchema = tableResult.getResolvedSchema(); - List tableIdentifierList = resolvedSchema.getColumnNames(); - // update row data map - Map>> rowDataMap = new HashMap<>(); - if (tableResult.getJobClient().isPresent()) { - while (!isAllSinkFinished(maxRowNum, rowDataMap, tableIdentifierList)) { - try { - Map accumulatorMap = - jobClient.getAccumulators().get(); - for (String tableIdentifier : tableIdentifierList) { - Object accumulatorObject = accumulatorMap.get(tableIdentifier); - if (accumulatorObject instanceof List) { - List list = (List) accumulatorObject; - for (Object obj : list) { - // deserialize data from accumulator - Map deserialize = deserializeObjFromBytes((byte[]) obj); - // update row data map - List> rowDataList = - rowDataMap.getOrDefault(tableIdentifier, new ArrayList<>()); - rowDataList.add(deserialize); - rowDataMap.put(tableIdentifier, ListUtil.sub(rowDataList, 0, maxRowNum)); - } - } - } - } catch (Exception e) { - // do nothing - } - } - } - if (isAutoCancel) { - try { - jobClient.cancel(); - } catch (Exception e) { - log.error("Cancel job failed, jobId: {}", id); - } - } - return new MockSinkResult(id, rowDataMap); + if (tableResult.getJobClient().isPresent()) { + MockResultRunnable runnable = new MockResultRunnable(tableResult, id, maxRowNum, isChangeLog, isAutoCancel); + threadPoolExecutor.execute(runnable); + return SelectResult.buildSuccess( + tableResult.getJobClient().get().getJobID().toHexString()); } else { - return MockSinkResult.buildFailed(); + return SelectResult.buildFailed(); } } @Override public IResult getResultWithPersistence(TableResult tableResult, JobHandler jobHandler) { - if (Objects.isNull(tableResult)) { - return MockSinkResult.buildFailed(); - } - MockSinkResult mockSinkResult = (MockSinkResult) getResult(tableResult); - // MockSinkResult -> SelectResult - SelectResult selectResult = new SelectResult( - id, - convertSinkRowData2SelectRowData(mockSinkResult.getTableRowData()), - generateMockResultColumns(mockSinkResult.getTableRowData())); - selectResult.setMockSinkResult(true); - selectResult.setDestroyed(Boolean.TRUE); - try { - ResultPool.put(selectResult); - jobHandler.persistResultData(Lists.newArrayList(this.id)); - } finally { - ResultPool.remove(id); - } - return selectResult; - } - - /** - * convert row data of mocked sink result to the type of select result - * - * @param tableRowData row data of {@link MockSinkResult} - * @return row data of {@link SelectResult} - */ - private List> convertSinkRowData2SelectRowData( - Map>> tableRowData) { - List> resultRowData = new ArrayList<>(); - for (Map.Entry>> entry : tableRowData.entrySet()) { - String tableIdentifier = entry.getKey(); - List> rowDataList = entry.getValue(); - for (Map rowDataElement : rowDataList) { - Map selectRowDataElement = new HashMap<>(); - // table name identifier - selectRowDataElement.put(MOCK_RESULT_TABLE_IDENTIFIER, tableIdentifier); - // row data - selectRowDataElement.putAll(rowDataElement); - resultRowData.add(selectRowDataElement); - } - } - return resultRowData; - } - - /** - * check if all sink has finished - * - * @param maxRowNum maximum row num of each table - * @param rowData row data map, key: table name, value: row data - * @param tableIdentifierList table identifier - * @return true if all tables has caught enough rows - */ - private boolean isAllSinkFinished( - int maxRowNum, Map>> rowData, List tableIdentifierList) { - if (tableIdentifierList.size() > rowData.size()) { - return false; - } - for (List> rowDataList : rowData.values()) { - if (Asserts.isNotNull(rowDataList) && rowDataList.size() < maxRowNum) { - return false; - } + if (Asserts.isNull(tableResult)) { + return SelectResult.buildFailed(); } - return true; - } - - /** - * generate mock result column sets - * - * @param tableRowData row data of (@Link MockSinkResult} - * @return column sets {@link SelectResult} - */ - private LinkedHashSet generateMockResultColumns(Map>> tableRowData) { - LinkedHashSet resultColumn = new LinkedHashSet<>(); - for (Map.Entry>> entry : tableRowData.entrySet()) { - String tableIdentifier = entry.getKey(); - List> rowDataList = entry.getValue(); - Set columns = rowDataList.get(0).keySet(); - Map columnElement = new HashMap<>(); - columnElement.put(MOCK_RESULT_TABLE_IDENTIFIER, tableIdentifier); - columnElement.put(MOCK_RESULT_COLUMN_IDENTIFIER, columns); - resultColumn.add(JsonUtils.toJsonString(columnElement)); + if (tableResult.getJobClient().isPresent()) { + MockResultRunnable runnable = new MockResultRunnable(tableResult, id, maxRowNum, isChangeLog, isAutoCancel); + runnable.registerCallback((s, selectResult) -> { + jobHandler.persistResultData(com.google.common.collect.Lists.newArrayList(s)); + }); + threadPoolExecutor.execute(runnable); + return SelectResult.buildMockedResult(id); + } else { + return SelectResult.buildFailed(); } - return resultColumn; - } - - private static Map deserializeObjFromBytes(byte[] byteArr) throws IOException { - MapSerializer mapSerializer = - new MapSerializer<>(new StringSerializer(), new StringSerializer()); - return mapSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(byteArr))); } } diff --git a/dinky-core/src/main/java/org/dinky/data/result/MockResultRunnable.java b/dinky-core/src/main/java/org/dinky/data/result/MockResultRunnable.java new file mode 100644 index 0000000000..3f5cd041fd --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/data/result/MockResultRunnable.java @@ -0,0 +1,289 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.result; + +import org.dinky.assertion.Asserts; +import org.dinky.constant.FlinkConstant; +import org.dinky.utils.JsonUtils; + +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.types.RowKind; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; + +import lombok.extern.slf4j.Slf4j; + +/** + * ResultRunnable + * + * @since 2021/7/1 22:50 + */ +@Slf4j +public class MockResultRunnable implements Runnable { + + private static final String nullColumn = ""; + private final TableResult tableResult; + private final String id; + private final Integer maxRowNum; + private final boolean isChangeLog; + private final boolean isAutoCancel; + private BiConsumer callback; + private final String MOCK_RESULT_TABLE_IDENTIFIER = "dinkySinkResultTableIdentifier"; + private final String MOCK_RESULT_COLUMN_IDENTIFIER = "dinkySinkResultColumnIdentifier"; + private final MapSerializer mapSerializer = + new MapSerializer<>(new StringSerializer(), new StringSerializer()); + + public MockResultRunnable( + TableResult tableResult, String id, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel) { + this.tableResult = tableResult; + this.id = id; + this.maxRowNum = maxRowNum; + this.isChangeLog = isChangeLog; + this.isAutoCancel = isAutoCancel; + } + + public MockResultRunnable registerCallback(BiConsumer callback) { + this.callback = callback; + return this; + } + + @Override + public void run() { + log.info("MockResultRunnable start. Job id: {}", id); + try { + tableResult.getJobClient().ifPresent(jobClient -> { + if (!ResultPool.containsKey(id)) { + ResultPool.put(SelectResult.buildMockedResult(id)); + } + try { + if (isChangeLog) { + catchChangLog(ResultPool.get(id)); + } else { + catchData(ResultPool.get(id)); + } + if (Objects.nonNull(callback)) { + callback.accept(id, ResultPool.get(id)); + } + } catch (Exception e) { + log.error(String.format(e.toString())); + } finally { + ResultPool.remove(id); + } + }); + } catch (Exception e) { + // Nothing to do + } + } + + private void catchChangLog(SelectResult selectResult) { + List> rows = selectResult.getRowData(); + LinkedHashSet columns = selectResult.getColumns(); + // get table identifiers + ResolvedSchema resolvedSchema = tableResult.getResolvedSchema(); + List tableIdentifierList = resolvedSchema.getColumnNames(); + // update row data map + Map>> rowDataMap = new HashMap<>(); + Map tableIdentifierIndexMap = new HashMap<>(); + JobClient jobClient = tableResult.getJobClient().get(); + while (!isAllSinkFinished(maxRowNum, rowDataMap, tableIdentifierList)) { + try { + boolean allSinkFinished = false; + Map accumulatorMap = jobClient.getAccumulators().get(); + for (String tableIdentifier : tableIdentifierList) { + if (!tableIdentifierIndexMap.containsKey(tableIdentifier)) { + tableIdentifierIndexMap.put(tableIdentifier, 0); + } else if (tableIdentifierIndexMap.get(tableIdentifier) >= maxRowNum) { + allSinkFinished = true; + continue; + } + Object accumulatorObject = accumulatorMap.get(tableIdentifier); + if (accumulatorObject instanceof List) { + List serializerRowDataList = (List) accumulatorObject; + for (int i = tableIdentifierIndexMap.get(tableIdentifier); + i < serializerRowDataList.size(); + i++) { + Map rowDataWithTableIdentifier = new HashMap<>(); + rowDataWithTableIdentifier.put(MOCK_RESULT_TABLE_IDENTIFIER, tableIdentifier); + // deserialize data from accumulator + Map deserializeRowData = + deserializeObjFromBytes((byte[]) serializerRowDataList.get(i)); + if (tableIdentifierIndexMap.get(tableIdentifier) == 0) { + columns.add(generateResultColumns(tableIdentifier, deserializeRowData)); + } + rowDataWithTableIdentifier.putAll(deserializeRowData); + // update row data map + rows.add(rowDataWithTableIdentifier); + tableIdentifierIndexMap.put( + tableIdentifier, tableIdentifierIndexMap.get(tableIdentifier) + 1); + if (tableIdentifierIndexMap.get(tableIdentifier) > maxRowNum) { + break; + } + } + log.info( + "Catch change log: table-{}: size-{},", + tableIdentifier, + tableIdentifierIndexMap.get(tableIdentifier)); + } + } + if (jobClient.getJobStatus().get().isTerminalState()) { + log.info( + "JobClient status:{}", + jobClient.getJobStatus().get().toString()); + break; + } + if (allSinkFinished && isAutoCancel) { + jobClient.cancel(); + } + Thread.sleep(1000); + } catch (Exception e) { + log.error("Deserialize change log from accumulator failed, jobId: {}: \nError: {}", id, e); + break; + } + } + log.info("Catch change log finish. Job id: {}", selectResult.getJobId()); + } + + private void catchData(SelectResult selectResult) { + List> rows = selectResult.getRowData(); + LinkedHashSet columns = selectResult.getColumns(); + // get table identifiers + ResolvedSchema resolvedSchema = tableResult.getResolvedSchema(); + List tableIdentifierList = resolvedSchema.getColumnNames(); + // update row data map + Map>> rowDataMap = new HashMap<>(); + Map tableIdentifierIndexMap = new HashMap<>(); + JobClient jobClient = tableResult.getJobClient().get(); + while (!isAllSinkFinished(maxRowNum, rowDataMap, tableIdentifierList)) { + try { + boolean allSinkFinished = false; + Map accumulatorMap = jobClient.getAccumulators().get(); + for (String tableIdentifier : tableIdentifierList) { + if (!tableIdentifierIndexMap.containsKey(tableIdentifier)) { + tableIdentifierIndexMap.put(tableIdentifier, 0); + } else if (tableIdentifierIndexMap.get(tableIdentifier) >= maxRowNum) { + allSinkFinished = true; + continue; + } + Object accumulatorObject = accumulatorMap.get(tableIdentifier); + if (accumulatorObject instanceof List) { + List serializerRowDataList = (List) accumulatorObject; + for (int i = tableIdentifierIndexMap.get(tableIdentifier); + i < serializerRowDataList.size(); + i++) { + Map rowDataWithTableIdentifier = new HashMap<>(); + rowDataWithTableIdentifier.put(MOCK_RESULT_TABLE_IDENTIFIER, tableIdentifier); + // deserialize data from accumulator + Map deserializeRowData = + deserializeObjFromBytes((byte[]) serializerRowDataList.get(i)); + String op = deserializeRowData.get(FlinkConstant.OP); + deserializeRowData.remove(FlinkConstant.OP); + if (tableIdentifierIndexMap.get(tableIdentifier) == 0) { + columns.add(generateResultColumns(tableIdentifier, deserializeRowData)); + } + rowDataWithTableIdentifier.putAll(deserializeRowData); + if (RowKind.UPDATE_BEFORE.shortString().equals(op) + || RowKind.DELETE.shortString().equals(op)) { + rows.remove(rowDataWithTableIdentifier); + } else { + rows.add(rowDataWithTableIdentifier); + } + tableIdentifierIndexMap.put( + tableIdentifier, tableIdentifierIndexMap.get(tableIdentifier) + 1); + if (tableIdentifierIndexMap.get(tableIdentifier) > maxRowNum) { + break; + } + } + log.info( + "Catch Data: table-{}: size-{},", + tableIdentifier, + tableIdentifierIndexMap.get(tableIdentifier)); + } + } + if (jobClient.getJobStatus().get().isTerminalState()) { + log.info( + "JobClient status:{}", + jobClient.getJobStatus().get().toString()); + break; + } + if (allSinkFinished && isAutoCancel) { + jobClient.cancel(); + } + Thread.sleep(1000); + } catch (Exception e) { + log.error("Deserialize data from accumulator failed, jobId: {}: \nError: {}", id, e); + break; + } + } + log.info("Catch data finish. Job id: {}", selectResult.getJobId()); + } + + /** + * check if all sink has finished + * + * @param maxRowNum maximum row num of each table + * @param rowData row data map, key: table name, value: row data + * @param tableIdentifierList table identifier + * @return true if all tables has caught enough rows + */ + private boolean isAllSinkFinished( + int maxRowNum, Map>> rowData, List tableIdentifierList) { + if (tableIdentifierList.size() > rowData.size()) { + return false; + } + for (List> rowDataList : rowData.values()) { + if (Asserts.isNotNull(rowDataList) && rowDataList.size() < maxRowNum) { + return false; + } + } + return true; + } + + private Map deserializeObjFromBytes(byte[] byteArr) throws IOException { + return mapSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(byteArr))); + } + + private String generateResultColumns(String tableIdentifier, Map rowData) { + // __op__ is first column. + Set columns = new LinkedHashSet<>(); + if (rowData.containsKey(FlinkConstant.OP)) { + columns.add(FlinkConstant.OP); + } + Set columnsFromRowData = rowData.keySet(); + columnsFromRowData.remove(FlinkConstant.OP); + columns.addAll(columnsFromRowData); + Map columnElement = new HashMap<>(); + columnElement.put(MOCK_RESULT_TABLE_IDENTIFIER, tableIdentifier); + columnElement.put(MOCK_RESULT_COLUMN_IDENTIFIER, columns); + return JsonUtils.toJsonString(columnElement); + } +} diff --git a/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java b/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java index 49973e1678..be2fbae047 100644 --- a/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java +++ b/dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java @@ -58,8 +58,9 @@ static ResultBuilder build( case DESCRIBE: return new ShowResultBuilder(id); case INSERT: + case EXECUTE: return isMockSinkFunction - ? new MockResultBuilder(id, maxRowNum, isAutoCancel) + ? new MockResultBuilder(id, maxRowNum, isChangeLog, isAutoCancel) : new InsertResultBuilder(); default: return new DDLResultBuilder(); diff --git a/dinky-core/src/main/java/org/dinky/data/result/SelectResult.java b/dinky-core/src/main/java/org/dinky/data/result/SelectResult.java index d074eb68c3..1dd9d604d1 100644 --- a/dinky-core/src/main/java/org/dinky/data/result/SelectResult.java +++ b/dinky-core/src/main/java/org/dinky/data/result/SelectResult.java @@ -22,6 +22,7 @@ import org.dinky.utils.JsonUtils; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -145,4 +146,10 @@ public static SelectResult buildSuccess(String jobID) { public static SelectResult buildFailed() { return new SelectResult(null, false, false); } + + public static SelectResult buildMockedResult(String jobID) { + SelectResult selectResult = new SelectResult(jobID, new ArrayList<>(), new LinkedHashSet<>()); + selectResult.setMockSinkResult(true); + return selectResult; + } } diff --git a/dinky-core/src/main/java/org/dinky/data/result/SelectResultBuilder.java b/dinky-core/src/main/java/org/dinky/data/result/SelectResultBuilder.java index 672119a7c1..40c5e424b5 100644 --- a/dinky-core/src/main/java/org/dinky/data/result/SelectResultBuilder.java +++ b/dinky-core/src/main/java/org/dinky/data/result/SelectResultBuilder.java @@ -55,8 +55,7 @@ public IResult getResult(TableResult tableResult) { String jobId = tableResult.getJobClient().get().getJobID().toHexString(); ResultRunnable runnable = new ResultRunnable(tableResult, id, maxRowNum, isChangeLog, isAutoCancel, timeZone); - Thread thread = new Thread(runnable, jobId); - thread.start(); + threadPoolExecutor.execute(runnable); return SelectResult.buildSuccess(jobId); } else { return SelectResult.buildFailed(); @@ -82,8 +81,7 @@ public IResult getResultWithPersistence(TableResult tableResult, JobHandler jobH runnable.registerCallback((s, selectResult) -> { jobHandler.persistResultData(Lists.newArrayList(s)); }); - Thread thread = new Thread(runnable, jobId); - thread.start(); + threadPoolExecutor.execute(runnable); return SelectResult.buildSuccess(jobId); } else { return SelectResult.buildFailed(); diff --git a/dinky-core/src/main/java/org/dinky/executor/Executor.java b/dinky-core/src/main/java/org/dinky/executor/Executor.java index d1c42d9959..e92dd31226 100644 --- a/dinky-core/src/main/java/org/dinky/executor/Executor.java +++ b/dinky-core/src/main/java/org/dinky/executor/Executor.java @@ -92,6 +92,9 @@ public abstract class Executor { // Dinky variable manager protected VariableManager variableManager = new VariableManager(); + // mock test + protected boolean isMockTest = false; + // return dinkyClassLoader public DinkyClassLoader getDinkyClassLoader() { return dinkyClassLoader; @@ -137,6 +140,14 @@ public String getTimeZone() { return getTableConfig().getLocalTimeZone().getId(); } + public boolean isMockTest() { + return isMockTest; + } + + public void setMockTest(boolean mockTest) { + isMockTest = mockTest; + } + private void initClassloader(DinkyClassLoader classLoader) { if (classLoader != null) { try { @@ -175,6 +186,8 @@ protected void init(DinkyClassLoader classLoader) { if (executorConfig.isValidVariables()) { variableManager.registerVariable(executorConfig.getVariables()); } + + isMockTest = false; } abstract CustomTableEnvironment createCustomTableEnvironment(ClassLoader classLoader); @@ -270,7 +283,7 @@ public ObjectNode getStreamGraphJsonNode(StreamGraph streamGraph) { } public StreamGraph getStreamGraph() { - return environment.getStreamGraph(); + return environment.getStreamGraph(false); } public StreamGraph getStreamGraphFromCustomStatements(List statements) { @@ -278,17 +291,11 @@ public StreamGraph getStreamGraphFromCustomStatements(List statements) { return getStreamGraph(); } - public ObjectNode getStreamGraphFromDataStream(List statements) { - statements.forEach(this::executeSql); - return getStreamGraphJsonNode(getStreamGraph()); - } - - public JobPlanInfo getJobPlanInfo(List statements) { + public JobPlanInfo getJobPlanInfoFromStatements(List statements) { return tableEnvironment.getJobPlanInfo(statements); } - public JobPlanInfo getJobPlanInfoFromDataStream(List statements) { - statements.forEach(this::executeSql); + public JobPlanInfo getJobPlanInfo() { StreamGraph streamGraph = getStreamGraph(); return new JobPlanInfo(JsonPlanGenerator.generatePlan(streamGraph.getJobGraph())); } diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 3203b9ac1c..2573e20c94 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -132,7 +132,8 @@ public JobStatementPlan parseStatements(String[] statements) { jobStatementPlan.addJobStatement(statement, JobStatementType.DDL, operationType); } } - if (jobManager.getConfig().isMockSinkFunction()) { + if (!jobManager.isPlanMode() && jobManager.getConfig().isMockSinkFunction()) { + executor.setMockTest(true); MockStatementExplainer.build(executor.getCustomTableEnvironment()).jobStatementPlanMock(jobStatementPlan); } return jobStatementPlan; diff --git a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java index ad9a316fae..4106cd7f4b 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java @@ -79,6 +79,9 @@ private void mockSink(JobStatementPlan jobStatementPlan) { // mock insert table ddl List jobStatementList = jobStatementPlan.getJobStatementList(); for (int i = 0; i < jobStatementList.size(); i++) { + if (!jobStatementList.get(i).getSqlType().equals(SqlType.CREATE)) { + continue; + } SqlNode sqlNode = tableEnv.parseSql(jobStatementList.get(i).getStatement()); if (sqlNode instanceof SqlCreateTable) { SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode; diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java index 835cf2285d..743afb9570 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java @@ -193,6 +193,6 @@ public StreamGraph getStreamGraph() { @Override public JobPlanInfo getJobPlanInfo() { - return executor.getJobPlanInfo(null); + return executor.getJobPlanInfo(); } } diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java index 7207151dfe..1360bcc962 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java @@ -155,7 +155,7 @@ public StreamGraph getStreamGraph() { @Override public JobPlanInfo getJobPlanInfo() { - return executor.getJobPlanInfo(null); + return executor.getJobPlanInfo(); } private boolean inferStatementSet() { diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java index bc3640c3b8..07cae22b8f 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobPipelineRunner.java @@ -27,6 +27,7 @@ import org.dinky.data.result.InsertResult; import org.dinky.data.result.ResultBuilder; import org.dinky.data.result.SqlExplainResult; +import org.dinky.executor.CustomTableResultImpl; import org.dinky.executor.Executor; import org.dinky.gateway.Gateway; import org.dinky.gateway.result.GatewayResult; @@ -43,6 +44,8 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.types.Row; import java.time.LocalDateTime; import java.util.ArrayList; @@ -55,6 +58,7 @@ public class JobPipelineRunner extends AbstractJobRunner { private List statements; + private TableResult tableResult; public JobPipelineRunner(JobManager jobManager) { this.jobManager = jobManager; @@ -64,7 +68,7 @@ public JobPipelineRunner(JobManager jobManager) { @Override public void run(JobStatement jobStatement) throws Exception { statements.add(jobStatement); - jobManager.getExecutor().executeSql(jobStatement.getStatement()); + tableResult = jobManager.getExecutor().executeSql(jobStatement.getStatement()); if (statements.size() == 1) { if (jobManager.isUseGateway()) { processWithGateway(); @@ -130,7 +134,9 @@ public SqlExplainResult explain(JobStatement jobStatement) { @Override public StreamGraph getStreamGraph(JobStatement jobStatement) { - explain(jobStatement); + statements.add(jobStatement); + // pipeline job execute to generate stream graph. + jobManager.getExecutor().executeSql(jobStatement.getStatement()); if (statements.size() == 1) { return jobManager.getExecutor().getStreamGraph(); } else { @@ -141,9 +147,11 @@ public StreamGraph getStreamGraph(JobStatement jobStatement) { @Override public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) { - explain(jobStatement); + statements.add(jobStatement); + // pipeline job execute to generate stream graph. + jobManager.getExecutor().executeSql(jobStatement.getStatement()); if (statements.size() == 1) { - return jobManager.getExecutor().getJobPlanInfo(statements); + return jobManager.getExecutor().getJobPlanInfo(); } else { throw new DinkyException( "Only one pipeline job is explained. The statement has be skipped: " + jobStatement.getStatement()); @@ -194,6 +202,14 @@ private void processWithoutGateway() throws Exception { add(job.getJobId()); } }); + final List rowList = new ArrayList<>(); + tableResult.getResolvedSchema().getColumns().forEach(column -> rowList.add(Row.of(-1))); + tableResult = CustomTableResultImpl.builder() + .resultKind(tableResult.getResultKind()) + .schema(tableResult.getResolvedSchema()) + .data(rowList) + .jobClient(jobClient) + .build(); } if (config.isUseResult()) { IResult result = ResultBuilder.build( @@ -202,8 +218,9 @@ private void processWithoutGateway() throws Exception { config.getMaxRowNum(), config.isUseChangeLog(), config.isUseAutoCancel(), - executor.getTimeZone()) - .getResult(null); + executor.getTimeZone(), + jobManager.getConfig().isMockSinkFunction()) + .getResultWithPersistence(tableResult, jobManager.getHandler()); job.setResult(result); } } diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java index 619027f1b6..9b2f1be2ee 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobSqlRunner.java @@ -194,7 +194,7 @@ public JobPlanInfo getJobPlanInfo(JobStatement jobStatement) { return null; } if (!statements.isEmpty()) { - return jobManager.getExecutor().getJobPlanInfo(statements); + return jobManager.getExecutor().getJobPlanInfoFromStatements(statements); } throw new DinkyException("None jobs in statement."); } diff --git a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java index b8ff7316bb..4b49b7c9ef 100644 --- a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java +++ b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java @@ -27,6 +27,7 @@ import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; import org.dinky.data.model.Table; +import org.dinky.executor.CustomTableResultImpl; import org.dinky.executor.Executor; import org.dinky.metadata.driver.Driver; import org.dinky.trans.AbstractOperation; @@ -37,8 +38,15 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.AtomicDataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.types.Row; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; @@ -74,9 +82,11 @@ public Operation create(String statement) { @Override public TableResult execute(Executor executor) { + final CustomTableResultImpl.Builder tableResultBuilder = CustomTableResultImpl.builder(); logger.info("Start build CDCSOURCE Task..."); CDCSource cdcSource = CDCSource.build(statement); FlinkCDCConfig config = cdcSource.buildFlinkCDCConfig(); + config.setMockTest(executor.isMockTest()); try { CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config); Map> allConfigMap = cdcBuilder.parseMetaDataConfigs(); @@ -190,16 +200,26 @@ public TableResult execute(Executor executor) { } DataStreamSource streamSource = cdcBuilder.build(streamExecutionEnvironment); logger.info("Build {} successful...", config.getType()); - sinkBuilder.build( - cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource); + sinkBuilder.build(streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource); logger.info("Build CDCSOURCE Task successful!"); + final List columns = new ArrayList<>(); + final List rowList = new ArrayList<>(); + for (Schema schema : config.getSchemaList()) { + for (Table table : schema.getTables()) { + columns.add(Column.physical( + "default_catalog.default_database." + sinkBuilder.getSinkTableName(table), + new AtomicDataType(new BigIntType()))); + rowList.add(Row.of(-1)); + } + } + tableResultBuilder.schema(ResolvedSchema.of(columns)).data(rowList).resultKind(ResultKind.SUCCESS); } catch (Exception e) { logger.error(e.getMessage(), e); } - return null; + return tableResultBuilder.build(); } - Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws Exception { + private Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws Exception { Map sink = config.getSink(); String autoCreate = sink.get(FlinkCDCConfig.AUTO_CREATE); if (!Asserts.isEqualsIgnoreCase(autoCreate, "true") || Asserts.isNullString(schemaName)) { diff --git a/dinky-flink/dinky-flink-1.15/pom.xml b/dinky-flink/dinky-flink-1.15/pom.xml index 03d8009b18..800747df33 100644 --- a/dinky-flink/dinky-flink-1.15/pom.xml +++ b/dinky-flink/dinky-flink-1.15/pom.xml @@ -32,6 +32,7 @@ 1.3.1 + 24.0.1 15.0 1.15.4 3.2.0 @@ -184,7 +185,7 @@ org.apache.doris flink-doris-connector-1.15 - 1.6.2 + ${doris.connector.version} org.apache.flink diff --git a/dinky-flink/dinky-flink-1.16/pom.xml b/dinky-flink/dinky-flink-1.16/pom.xml index 9553bc2e06..757b36687f 100644 --- a/dinky-flink/dinky-flink-1.16/pom.xml +++ b/dinky-flink/dinky-flink-1.16/pom.xml @@ -16,6 +16,7 @@ 1.3.1 + 24.0.1 16.2 1.16.3 3.2.0 @@ -138,7 +139,7 @@ org.apache.doris flink-doris-connector-1.16 - 1.6.2 + ${doris.connector.version} org.apache.flink diff --git a/dinky-flink/dinky-flink-1.17/pom.xml b/dinky-flink/dinky-flink-1.17/pom.xml index 5710591f54..50ca2fb711 100644 --- a/dinky-flink/dinky-flink-1.17/pom.xml +++ b/dinky-flink/dinky-flink-1.17/pom.xml @@ -16,6 +16,7 @@ 1.3.1 + 24.0.1 17.0 1.17.2 3.2.0 @@ -143,7 +144,7 @@ org.apache.doris flink-doris-connector-1.17 - 1.6.2 + ${doris.connector.version} org.apache.flink diff --git a/dinky-flink/dinky-flink-1.18/pom.xml b/dinky-flink/dinky-flink-1.18/pom.xml index 15bdce4ef7..38b201c0df 100644 --- a/dinky-flink/dinky-flink-1.18/pom.xml +++ b/dinky-flink/dinky-flink-1.18/pom.xml @@ -16,6 +16,7 @@ 1.3.1 + 24.0.1 18.0 1.18.1 3.2.0 @@ -141,7 +142,7 @@ org.apache.doris flink-doris-connector-1.18 - 1.6.2 + ${doris.connector.version} org.apache.flink diff --git a/dinky-flink/dinky-flink-1.19/pom.xml b/dinky-flink/dinky-flink-1.19/pom.xml index 78f6c26762..47b046c400 100644 --- a/dinky-flink/dinky-flink-1.19/pom.xml +++ b/dinky-flink/dinky-flink-1.19/pom.xml @@ -16,6 +16,7 @@ 1.5.0 + 24.0.1 19.0 1.19.1 3.2.0 @@ -141,7 +142,7 @@ org.apache.doris flink-doris-connector-1.19 - 1.6.2 + ${doris.connector.version} org.apache.flink diff --git a/dinky-flink/dinky-flink-1.20/pom.xml b/dinky-flink/dinky-flink-1.20/pom.xml index bfc0f9adc2..78710cecc4 100644 --- a/dinky-flink/dinky-flink-1.20/pom.xml +++ b/dinky-flink/dinky-flink-1.20/pom.xml @@ -16,6 +16,7 @@ 1.5.0 + 24.0.1 19.0 1.20.0 3.2.0 @@ -116,7 +117,7 @@ org.apache.doris flink-doris-connector-1.20 - 24.0.0 + ${doris.connector.version} org.apache.flink diff --git a/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/components/JobDetailInfoModel/ErrorMsgInfo.tsx b/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/components/JobDetailInfoModel/ErrorMsgInfo.tsx index 298cea114d..411db0aac0 100644 --- a/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/components/JobDetailInfoModel/ErrorMsgInfo.tsx +++ b/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/components/JobDetailInfoModel/ErrorMsgInfo.tsx @@ -52,7 +52,7 @@ export const ErrorMsgInfo: React.FC = (props) => { autoWrap={'on'} height={'60vh'} language={CustomEditorLanguage.JavaLog} - code={row?.error ?? '暂无日志'} + code={row?.error ?? 'No logs'} /> ); diff --git a/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/components/JobDetailInfoModel/PreViewData.tsx b/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/components/JobDetailInfoModel/PreViewData.tsx index 7446dc25fc..554c9a491d 100644 --- a/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/components/JobDetailInfoModel/PreViewData.tsx +++ b/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/components/JobDetailInfoModel/PreViewData.tsx @@ -23,13 +23,17 @@ import { FireOutlined } from '@ant-design/icons'; import ProDescriptions from '@ant-design/pro-descriptions'; import { Tag } from 'antd'; import React from 'react'; +import Result from "@/pages/DataStudio/Toolbar/Service/Result"; +import {DataStudioActionType} from "@/pages/DataStudio/data.d"; +import {DIALECT} from "@/services/constants"; type PreViewDataProps = { row: JobExecutionHistory | undefined; + dialect: string; }; export const PreViewData: React.FC = (props) => { - const { row } = props; + const { row, dialect } = props; return ( <> @@ -39,7 +43,7 @@ export const PreViewData: React.FC = (props) => { {row?.jobId ?? l('global.job.status.failed-tip')} - // todo 预览数据 + ); diff --git a/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/components/JobDetailInfoModel/index.tsx b/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/components/JobDetailInfoModel/index.tsx index 4089755046..e3023bb290 100644 --- a/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/components/JobDetailInfoModel/index.tsx +++ b/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/components/JobDetailInfoModel/index.tsx @@ -31,10 +31,11 @@ type JobDetailInfoModelProps = { handleCancel: () => void; row: JobExecutionHistory | undefined; type: number; + dialect: string; }; export const JobDetailInfoModel: React.FC = (props) => { - const { modalVisit, handleCancel, row, type } = props; + const { modalVisit, handleCancel, row, type, dialect } = props; return ( <> @@ -56,7 +57,7 @@ export const JobDetailInfoModel: React.FC = (props) => {type == 1 && } {type == 2 && } {/*todo 预览数据*/} - {/*{type == 3 && }*/} + {type == 3 && } {type == 4 && } diff --git a/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/index.tsx b/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/index.tsx index 3c110c00a8..47a518f30b 100644 --- a/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/index.tsx +++ b/dinky-web/src/pages/DataStudio/Toolbar/Service/ExecutionHistory/index.tsx @@ -32,8 +32,8 @@ import { JobDetailInfoModel } from '@/pages/DataStudio/Toolbar/Service/Execution const { Link, Paragraph, Text } = Typography; -export default (props: { taskId: number }) => { - const { taskId } = props; +export default (props: { taskId: number, dialect: string }) => { + const { taskId, dialect } = props; const refAction = useRef(); const [modalVisit, setModalVisit] = useState(false); @@ -195,25 +195,25 @@ export default (props: { taskId: number }) => { title: l('global.table.status'), valueType: 'select', valueEnum: { - '': { text: '全部', status: 'ALL' }, + '': { text: 'ALL', status: 'ALL' }, 0: { - text: '初始化中', + text: 'INITIALIZE', status: 'INITIALIZE' }, 1: { - text: '运行中', + text: 'RUNNING', status: 'RUNNING' }, 2: { - text: '成功', + text: 'SUCCESS', status: 'SUCCESS' }, 3: { - text: '失败', + text: 'FAILED', status: 'FAILED' }, 4: { - text: '取消', + text: 'CANCEL', status: 'CANCEL' } } @@ -268,6 +268,7 @@ export default (props: { taskId: number }) => { handleCancel={handleCancel} row={historyData} type={type} + dialect={dialect} /> ); diff --git a/dinky-web/src/pages/DataStudio/Toolbar/Service/Result/index.tsx b/dinky-web/src/pages/DataStudio/Toolbar/Service/Result/index.tsx index 54a505d851..e4d3a2da9e 100644 --- a/dinky-web/src/pages/DataStudio/Toolbar/Service/Result/index.tsx +++ b/dinky-web/src/pages/DataStudio/Toolbar/Service/Result/index.tsx @@ -43,9 +43,10 @@ type Data = { rowData: object[]; }; type DataList = Data[]; -export default (props: { taskId: number; action: any; dialect: string }) => { +export default (props: { taskId: number; historyId: number|undefined; action: any; dialect: string }) => { const { taskId, + historyId, action: { actionType, params }, dialect } = props; @@ -183,16 +184,19 @@ export default (props: { taskId: number; action: any; dialect: string }) => { }); const loadData = async () => { - const res = await handleGetOptionWithoutMsg(API_CONSTANTS.GET_LATEST_HISTORY_BY_ID, { - id: taskId - }); - const historyData = res.data; - if (historyData) { + let historyIdParam = historyId; + if(!historyIdParam){ + const res = await handleGetOptionWithoutMsg(API_CONSTANTS.GET_LATEST_HISTORY_BY_ID, { + id: taskId + }); + historyIdParam = res.data.id; + } + if (historyIdParam) { const tableData = await handleGetOption( API_CONSTANTS.GET_JOB_DATA, l('global.getdata.tips'), { - jobId: historyData.id + jobId: historyIdParam } ); const data = tableData.data; @@ -315,13 +319,12 @@ export default (props: { taskId: number; action: any; dialect: string }) => { }; return (
- {dataList.length > 0 ? ( - - ) : ( + + {dataList.length == 0 ?? ( )} , - children: + children: }); } if (assert(taskParams?.dialect, [DIALECT.FLINK_SQL], true, 'includes')) {