From 237b45a3ec762411e3b1665ab781a7971a8423d6 Mon Sep 17 00:00:00 2001 From: Hper <369161356@qq.com> Date: Tue, 12 Nov 2024 17:16:59 +0800 Subject: [PATCH] [BugFix][FlinkSQL] V1.1.0 version Keberos bug related repair, SQL SET value does not take effect, etc (#3875) Signed-off-by: Zzm0809 <934230207@qq.com> Co-authored-by: harper Co-authored-by: Zzm0809 <934230207@qq.com> Co-authored-by: zhuxt2015 <594754793@qq.com> Co-authored-by: zhuxt2015 Co-authored-by: Wink <32723967+aiwenmo@users.noreply.github.com> Co-authored-by: gaoyan Co-authored-by: gaoyan1998 Co-authored-by: jianjun.xu <128395511+jianjun159@users.noreply.github.com> Co-authored-by: jianjun.xu Co-authored-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Co-authored-by: zhangyuhang Co-authored-by: ZackYoung Co-authored-by: zackyoungh Co-authored-by: LUOSHANGJIE\71826 <718261933@qq.com> Co-authored-by: 18216499322 <18216499322@users.noreply.github.com> Co-authored-by: Zzm0809 Co-authored-by: MactavishCui <166927232+MactavishCui@users.noreply.github.com> --- .../org/dinky/controller/APIController.java | 15 ++++++++ .../org/dinky/cdc/AbstractSinkBuilder.java | 25 ++++++++++++ .../org/dinky/cdc/kafka/KafkaSinkBuilder.java | 38 ++++++++++++------- .../org/dinky/cdc/mysql/MysqlCDCBuilder.java | 4 ++ .../org/dinky/cdc/kafka/KafkaSinkBuilder.java | 38 +++++++++++-------- .../dinky/cdc/kafka/KafkaSinkJsonBuilder.java | 11 +++++- .../src/main/java/org/dinky/api/FlinkAPI.java | 6 ++- .../org/dinky/constant/FlinkConstant.java | 3 ++ .../org/dinky/executor/ExecutorConfig.java | 10 ++++- .../dinky/job/builder/JobExecuteBuilder.java | 2 + .../JobDetail/JobOperator/JobOperator.tsx | 5 ++- 11 files changed, 123 insertions(+), 34 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/controller/APIController.java b/dinky-admin/src/main/java/org/dinky/controller/APIController.java index 1d58b27cf5..6b0ac4c0fc 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/APIController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/APIController.java @@ -28,8 +28,10 @@ import org.dinky.data.enums.Status; import org.dinky.data.exception.NotSupportExplainExcepition; import org.dinky.data.model.job.JobInstance; +import org.dinky.data.result.ProTableResult; import org.dinky.data.result.Result; import org.dinky.data.result.SqlExplainResult; +import org.dinky.data.vo.task.JobInstanceVo; import org.dinky.gateway.enums.SavePointType; import org.dinky.gateway.result.SavePointResult; import org.dinky.job.JobResult; @@ -45,6 +47,7 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import io.swagger.annotations.Api; @@ -202,4 +205,16 @@ public Result getTaskLineage(@RequestParam Integer id) { taskService.initTenantByTaskId(id); return Result.succeed(taskService.getTaskLineage(id), Status.QUERY_SUCCESS); } + + @PostMapping("/getJobInstanceList") + @ApiImplicitParam( + name = "para", + value = "Query parameters", + dataType = "JsonNode", + paramType = "body", + required = true, + dataTypeClass = JsonNode.class) + public ProTableResult listJobInstances(@RequestBody JsonNode para) { + return jobInstanceService.listJobInstances(para); + } } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java index deb9d7fc55..f9c82b3846 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java @@ -131,7 +131,10 @@ protected Properties getProperties() { && entry.getKey().startsWith("properties") && Asserts.isNotNullString(entry.getValue())) { properties.setProperty(entry.getKey().replace("properties.", ""), entry.getValue()); + } else { + properties.setProperty(entry.getKey(), entry.getValue()); } + logger.info("sink config k/v:{}", properties); } return properties; } @@ -587,4 +590,26 @@ protected List getPKList(Table table) { protected ZoneId getSinkTimeZone() { return this.sinkTimeZone; } + + protected Map getTableTopicMap() { + String topicMapStr = this.config.getSink().get("table.topic.map"); + Map tableTopicMap = new HashMap<>(); + if (topicMapStr != null) { + String[] topicTabArray = topicMapStr.split(";"); + for (String topicTab : topicTabArray) { + if (topicTab != null) { + String[] topicTable = topicTab.split(":"); + if (topicTable.length > 1) { + String[] tables = topicTable[1].split(","); + for (String table : tables) { + tableTopicMap.put(table, topicTable[0]); + } + } + } + } + } + + logger.info("topic map," + tableTopicMap); + return tableTopicMap; + } } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java index 9c227bf9d0..6caf75a0d7 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java @@ -116,21 +116,23 @@ public DataStreamSource build( .setTopic(config.getSink().get("topic")) .setValueSerializationSchema(new SimpleStringSchema()) .build()) + .setTransactionalIdPrefix( + config.getSink().get("transactional.id.prefix") == null + ? "" + : config.getSink().get("transactional.id.prefix")) .setDeliverGuarantee(DeliveryGuarantee.valueOf( - env.getCheckpointingMode().name())); + config.getSink().get("delivery.guarantee") == null + ? "NONE" + : config.getSink().get("delivery.guarantee"))); if (!kafkaProducerConfig.isEmpty()) { kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); } - if (!kafkaProducerConfig.isEmpty() - && kafkaProducerConfig.containsKey("transactional.id") - && Asserts.isNotNullString(kafkaProducerConfig.getProperty("transactional.id"))) { - kafkaSinkBuilder.setTransactionalIdPrefix(kafkaProducerConfig.getProperty("transactional.id")); - } KafkaSink kafkaSink = kafkaSinkBuilder.build(); dataStreamSource.sinkTo(kafkaSink); } else { Map> tagMap = new LinkedHashMap<>(); Map tableMap = new LinkedHashMap<>(); + Map tableTopicMap = this.getTableTopicMap(); ObjectMapper objectMapper = new ObjectMapper(); SingleOutputStreamOperator mapOperator = dataStreamSource .map(x -> objectMapper.readValue(x, Map.class)) @@ -141,6 +143,7 @@ public DataStreamSource build( if (Asserts.isNotNullCollection(schemaList)) { for (Schema schema : schemaList) { if (Asserts.isNullCollection(schema.getTables())) { + logger.error("Schema:{} tables is empty", schema.getName()); // if schema tables is empty, throw exception throw new IllegalArgumentException( "Schema tables is empty, please check your configuration or check your database permission and try again."); @@ -176,6 +179,14 @@ public void processElement(Map map, ProcessFunction.Context ctx, Co }); tagMap.forEach((k, v) -> { String topic = getSinkTableName(k); + if (tableTopicMap != null) { + String tableName = k.getName(); + String newTopic = tableTopicMap.get(tableName); + if (Asserts.isNotNullString(newTopic)) { + topic = newTopic; + } + } + org.apache.flink.connector.kafka.sink.KafkaSinkBuilder kafkaSinkBuilder = KafkaSink.builder() .setBootstrapServers(config.getSink().get("brokers")) @@ -183,17 +194,18 @@ public void processElement(Map map, ProcessFunction.Context ctx, Co .setTopic(topic) .setValueSerializationSchema(new SimpleStringSchema()) .build()) + .setTransactionalIdPrefix( + config.getSink().get("transactional.id.prefix") == null + ? "" + : config.getSink().get("transactional.id.prefix")) .setDeliverGuarantee(DeliveryGuarantee.valueOf( - env.getCheckpointingMode().name())); + config.getSink().get("delivery.guarantee") == null + ? "NONE" + : config.getSink().get("delivery.guarantee"))); if (!kafkaProducerConfig.isEmpty()) { kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); } - if (!kafkaProducerConfig.isEmpty() - && kafkaProducerConfig.containsKey("transactional.id") - && Asserts.isNotNullString(kafkaProducerConfig.getProperty("transactional.id"))) { - kafkaSinkBuilder.setTransactionalIdPrefix( - kafkaProducerConfig.getProperty("transactional.id") + "-" + topic); - } + KafkaSink kafkaSink = kafkaSinkBuilder.build(); process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic); }); diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/mysql/MysqlCDCBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/mysql/MysqlCDCBuilder.java index 39d3b5fc35..1014ae4219 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/mysql/MysqlCDCBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/mysql/MysqlCDCBuilder.java @@ -83,6 +83,10 @@ public DataStreamSource build(StreamExecutionEnvironment env) { Properties debeziumProperties = new Properties(); debeziumProperties.setProperty("bigint.unsigned.handling.mode", "long"); debeziumProperties.setProperty("decimal.handling.mode", "string"); + if (Asserts.isNotNullString(serverTimeZone) + && Asserts.isNotNullString(config.getDebezium().get("datetime.type"))) { + debeziumProperties.setProperty("datetime.format.timestamp.zone", serverTimeZone); + } config.getDebezium().forEach((key, value) -> { if (Asserts.isNotNullString(key) && Asserts.isNotNullString(value)) { diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java index 896b0532ea..8cdd0df10a 100644 --- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkBuilder.java @@ -85,20 +85,21 @@ public DataStreamSource build( .setTopic(config.getSink().get("topic")) .setValueSerializationSchema(new SimpleStringSchema()) .build()) - .setDeliverGuarantee(DeliveryGuarantee.valueOf( - env.getCheckpointingMode().name())); + .setTransactionalIdPrefix(config.getSink().get("transactional.id.prefix") == null + ? "" + : config.getSink().get("transactional.id.prefix")) + .setDeliverGuarantee( + DeliveryGuarantee.valueOf(config.getSink().get("delivery.guarantee") == null + ? "NONE" + : config.getSink().get("delivery.guarantee"))); if (!kafkaProducerConfig.isEmpty()) { kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); } - if (!kafkaProducerConfig.isEmpty() - && kafkaProducerConfig.containsKey(TRANSACTIONAL_ID) - && Asserts.isNotNullString(kafkaProducerConfig.getProperty(TRANSACTIONAL_ID))) { - kafkaSinkBuilder.setTransactionalIdPrefix(kafkaProducerConfig.getProperty(TRANSACTIONAL_ID)); - } KafkaSink kafkaSink = kafkaSinkBuilder.build(); dataStreamSource.sinkTo(kafkaSink); } else { Map> tagMap = new LinkedHashMap<>(); + Map tableTopicMap = this.getTableTopicMap(); Map tableMap = new LinkedHashMap<>(); ObjectMapper objectMapper = new ObjectMapper(); SingleOutputStreamOperator mapOperator = dataStreamSource @@ -147,6 +148,13 @@ public void processElement(Map map, ProcessFunction.Context ctx, Co tagMap.forEach((k, v) -> { String topic = getSinkTableName(k); + if (tableTopicMap != null) { + String tableName = k.getName(); + String newTopic = tableTopicMap.get(tableName); + if (Asserts.isNotNullString(newTopic)) { + topic = newTopic; + } + } org.apache.flink.connector.kafka.sink.KafkaSinkBuilder kafkaSinkBuilder = KafkaSink.builder() .setBootstrapServers(config.getSink().get("brokers")) @@ -154,17 +162,17 @@ public void processElement(Map map, ProcessFunction.Context ctx, Co .setTopic(topic) .setValueSerializationSchema(new SimpleStringSchema()) .build()) - .setDeliverGuarantee(DeliveryGuarantee.valueOf( - env.getCheckpointingMode().name())); + .setTransactionalIdPrefix(config.getSink().get("transactional.id.prefix") == null + ? "" + : config.getSink().get("transactional.id.prefix")) + .setDeliverGuarantee( + DeliveryGuarantee.valueOf(config.getSink().get("delivery.guarantee") == null + ? "NONE" + : config.getSink().get("delivery.guarantee"))); if (!kafkaProducerConfig.isEmpty()) { kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig); } - if (!kafkaProducerConfig.isEmpty() - && kafkaProducerConfig.containsKey(TRANSACTIONAL_ID) - && Asserts.isNotNullString(kafkaProducerConfig.getProperty(TRANSACTIONAL_ID))) { - kafkaSinkBuilder.setTransactionalIdPrefix( - kafkaProducerConfig.getProperty(TRANSACTIONAL_ID) + "-" + topic); - } + KafkaSink kafkaSink = kafkaSinkBuilder.build(); process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic); }); diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java index d4053689cd..83bfdbb81f 100644 --- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java +++ b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/kafka/KafkaSinkJsonBuilder.java @@ -113,6 +113,14 @@ public DataStreamSource build( String topic = getSinkTableName(table); if (Asserts.isNotNullString(config.getSink().get("topic"))) { topic = config.getSink().get("topic"); + } else { + Map tableTopicMap = this.getTableTopicMap(); + if (tableTopicMap != null) { + String newTopic = tableTopicMap.get(tableName); + if (Asserts.isNotNullString(newTopic)) { + topic = newTopic; + } + } } List columnNameList = new LinkedList<>(); List columnTypeList = new LinkedList<>(); @@ -193,8 +201,7 @@ public void processElement(Map value, Context context, Collector collect } } }); - stringOperator.addSink(new FlinkKafkaProducer( - config.getSink().get("brokers"), topic, new SimpleStringSchema())); + stringOperator.addSink(new FlinkKafkaProducer(topic, new SimpleStringSchema(), getProperties())); } } } catch (Exception ex) { diff --git a/dinky-core/src/main/java/org/dinky/api/FlinkAPI.java b/dinky-core/src/main/java/org/dinky/api/FlinkAPI.java index ceacf8189c..912f3f0ddd 100644 --- a/dinky-core/src/main/java/org/dinky/api/FlinkAPI.java +++ b/dinky-core/src/main/java/org/dinky/api/FlinkAPI.java @@ -72,7 +72,11 @@ public class FlinkAPI { private static final ObjectMapper mapper = new ObjectMapper(); public FlinkAPI(String address) { - this.address = address; + if (address.startsWith(NetConstant.HTTP) || address.startsWith(NetConstant.HTTPS)) { + this.address = address; + } else { + this.address = NetConstant.HTTP + address; + } } public static FlinkAPI build(String address) { diff --git a/dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java b/dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java index 3ee14381a6..b280c7cac7 100644 --- a/dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java +++ b/dinky-core/src/main/java/org/dinky/constant/FlinkConstant.java @@ -29,4 +29,7 @@ public interface FlinkConstant { String LOCAL_HOST = "localhost"; /** changlog op */ String OP = "op"; + + /** flink 默认端口 **/ + Integer FLINK_REST_DEFAULT_PORT = 8081; } diff --git a/dinky-core/src/main/java/org/dinky/executor/ExecutorConfig.java b/dinky-core/src/main/java/org/dinky/executor/ExecutorConfig.java index 64471d44e8..8f9f6a2ce4 100644 --- a/dinky-core/src/main/java/org/dinky/executor/ExecutorConfig.java +++ b/dinky-core/src/main/java/org/dinky/executor/ExecutorConfig.java @@ -20,6 +20,8 @@ package org.dinky.executor; import org.dinky.assertion.Asserts; +import org.dinky.constant.FlinkConstant; +import org.dinky.data.constant.NetConstant; import org.dinky.data.enums.GatewayType; import java.util.ArrayList; @@ -167,14 +169,18 @@ public static ExecutorConfig build( String host = null; Integer port = null; + String hostPort = address; if (Asserts.isNotNullString(address)) { - String[] strings = address.split(":"); + if (address.startsWith(NetConstant.HTTP) || address.startsWith(NetConstant.HTTPS)) { + hostPort = address.replace(NetConstant.HTTP, "").replace(NetConstant.HTTPS, ""); + } + String[] strings = hostPort.split(":"); if (strings.length > 1) { host = strings[0]; port = Integer.parseInt(strings[1]); } else { host = strings[0]; - port = 8081; + port = FlinkConstant.FLINK_REST_DEFAULT_PORT; } } diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java index 6fb835742d..3253393f97 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java @@ -65,6 +65,8 @@ public void run() throws Exception { } GatewayResult gatewayResult = null; config.addGatewayConfig(executor.getSetConfig()); + config.addGatewayConfig( + executor.getCustomTableEnvironment().getConfig().getConfiguration()); config.getGatewayConfig().setSql(jobParam.getParsedSql()); if (runMode.isApplicationMode()) { diff --git a/dinky-web/src/pages/DevOps/JobDetail/JobOperator/JobOperator.tsx b/dinky-web/src/pages/DevOps/JobDetail/JobOperator/JobOperator.tsx index 31c28c51c3..bf21997b90 100644 --- a/dinky-web/src/pages/DevOps/JobDetail/JobOperator/JobOperator.tsx +++ b/dinky-web/src/pages/DevOps/JobDetail/JobOperator/JobOperator.tsx @@ -40,7 +40,10 @@ export type OperatorType = { }; const JobOperator = (props: OperatorType) => { const { jobDetail, refesh } = props; - const webUri = `${API_CONSTANTS.BASE_URL}/api/flink/${jobDetail?.clusterInstance?.jobManagerHost}/#/job/running/${jobDetail?.instance?.jid}/overview`; + const jobManagerHost = jobDetail?.clusterInstance?.jobManagerHost; + const webUri = jobManagerHost?.startsWith('http://') || jobManagerHost?.startsWith('https://') + ? jobManagerHost + : `${API_CONSTANTS.BASE_URL}/api/flink/${jobManagerHost}/#/job/running/${jobDetail?.instance?.jid}/overview`; const handleJobOperator = (key: string) => { Modal.confirm({