Skip to content

Commit

Permalink
[BugFix][FlinkSQL] V1.1.0 version Keberos bug related repair, SQL SET…
Browse files Browse the repository at this point in the history
… value does not take effect, etc (#3875)

Signed-off-by: Zzm0809 <934230207@qq.com>
Co-authored-by: harper <donotcoffee@gmail.com>
Co-authored-by: Zzm0809 <934230207@qq.com>
Co-authored-by: zhuxt2015 <594754793@qq.com>
Co-authored-by: zhuxt2015 <zhuxt2015@users.noreply.github.com>
Co-authored-by: Wink <32723967+aiwenmo@users.noreply.github.com>
Co-authored-by: gaoyan <ikiler@126.com>
Co-authored-by: gaoyan1998 <gaoyan1998@users.noreply.github.com>
Co-authored-by: jianjun.xu <128395511+jianjun159@users.noreply.github.com>
Co-authored-by: jianjun.xu <jianjun.xu@ly.com>
Co-authored-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
Co-authored-by: zhangyuhang <yuhang2.zhang@ly.com>
Co-authored-by: ZackYoung <zackyoungh@163.com>
Co-authored-by: zackyoungh <zackyoungh@users.noreply.github.com>
Co-authored-by: LUOSHANGJIE\71826 <718261933@qq.com>
Co-authored-by: 18216499322 <18216499322@users.noreply.github.com>
Co-authored-by: Zzm0809 <Zzm0809@users.noreply.github.com>
Co-authored-by: MactavishCui <166927232+MactavishCui@users.noreply.github.com>
  • Loading branch information
18 people authored Nov 12, 2024
1 parent 233cb2a commit 237b45a
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 34 deletions.
15 changes: 15 additions & 0 deletions dinky-admin/src/main/java/org/dinky/controller/APIController.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.dinky.data.enums.Status;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.model.job.JobInstance;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.data.vo.task.JobInstanceVo;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
Expand All @@ -45,6 +47,7 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import io.swagger.annotations.Api;
Expand Down Expand Up @@ -202,4 +205,16 @@ public Result getTaskLineage(@RequestParam Integer id) {
taskService.initTenantByTaskId(id);
return Result.succeed(taskService.getTaskLineage(id), Status.QUERY_SUCCESS);
}

@PostMapping("/getJobInstanceList")
@ApiImplicitParam(
name = "para",
value = "Query parameters",
dataType = "JsonNode",
paramType = "body",
required = true,
dataTypeClass = JsonNode.class)
public ProTableResult<JobInstanceVo> listJobInstances(@RequestBody JsonNode para) {
return jobInstanceService.listJobInstances(para);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ protected Properties getProperties() {
&& entry.getKey().startsWith("properties")
&& Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("properties.", ""), entry.getValue());
} else {
properties.setProperty(entry.getKey(), entry.getValue());
}
logger.info("sink config k/v:{}", properties);
}
return properties;
}
Expand Down Expand Up @@ -587,4 +590,26 @@ protected List<String> getPKList(Table table) {
protected ZoneId getSinkTimeZone() {
return this.sinkTimeZone;
}

protected Map<String, String> getTableTopicMap() {
String topicMapStr = this.config.getSink().get("table.topic.map");
Map<String, String> tableTopicMap = new HashMap<>();
if (topicMapStr != null) {
String[] topicTabArray = topicMapStr.split(";");
for (String topicTab : topicTabArray) {
if (topicTab != null) {
String[] topicTable = topicTab.split(":");
if (topicTable.length > 1) {
String[] tables = topicTable[1].split(",");
for (String table : tables) {
tableTopicMap.put(table, topicTable[0]);
}
}
}
}
}

logger.info("topic map," + tableTopicMap);
return tableTopicMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,23 @@ public DataStreamSource build(
.setTopic(config.getSink().get("topic"))
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setTransactionalIdPrefix(
config.getSink().get("transactional.id.prefix") == null
? ""
: config.getSink().get("transactional.id.prefix"))
.setDeliverGuarantee(DeliveryGuarantee.valueOf(
env.getCheckpointingMode().name()));
config.getSink().get("delivery.guarantee") == null
? "NONE"
: config.getSink().get("delivery.guarantee")));
if (!kafkaProducerConfig.isEmpty()) {
kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig);
}
if (!kafkaProducerConfig.isEmpty()
&& kafkaProducerConfig.containsKey("transactional.id")
&& Asserts.isNotNullString(kafkaProducerConfig.getProperty("transactional.id"))) {
kafkaSinkBuilder.setTransactionalIdPrefix(kafkaProducerConfig.getProperty("transactional.id"));
}
KafkaSink<String> kafkaSink = kafkaSinkBuilder.build();
dataStreamSource.sinkTo(kafkaSink);
} else {
Map<Table, OutputTag<String>> tagMap = new LinkedHashMap<>();
Map<String, Table> tableMap = new LinkedHashMap<>();
Map<String, String> tableTopicMap = this.getTableTopicMap();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource
.map(x -> objectMapper.readValue(x, Map.class))
Expand All @@ -141,6 +143,7 @@ public DataStreamSource build(
if (Asserts.isNotNullCollection(schemaList)) {
for (Schema schema : schemaList) {
if (Asserts.isNullCollection(schema.getTables())) {
logger.error("Schema:{} tables is empty", schema.getName());
// if schema tables is empty, throw exception
throw new IllegalArgumentException(
"Schema tables is empty, please check your configuration or check your database permission and try again.");
Expand Down Expand Up @@ -176,24 +179,33 @@ public void processElement(Map map, ProcessFunction<Map, String>.Context ctx, Co
});
tagMap.forEach((k, v) -> {
String topic = getSinkTableName(k);
if (tableTopicMap != null) {
String tableName = k.getName();
String newTopic = tableTopicMap.get(tableName);
if (Asserts.isNotNullString(newTopic)) {
topic = newTopic;
}
}

org.apache.flink.connector.kafka.sink.KafkaSinkBuilder<String> kafkaSinkBuilder =
KafkaSink.<String>builder()
.setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setTransactionalIdPrefix(
config.getSink().get("transactional.id.prefix") == null
? ""
: config.getSink().get("transactional.id.prefix"))
.setDeliverGuarantee(DeliveryGuarantee.valueOf(
env.getCheckpointingMode().name()));
config.getSink().get("delivery.guarantee") == null
? "NONE"
: config.getSink().get("delivery.guarantee")));
if (!kafkaProducerConfig.isEmpty()) {
kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig);
}
if (!kafkaProducerConfig.isEmpty()
&& kafkaProducerConfig.containsKey("transactional.id")
&& Asserts.isNotNullString(kafkaProducerConfig.getProperty("transactional.id"))) {
kafkaSinkBuilder.setTransactionalIdPrefix(
kafkaProducerConfig.getProperty("transactional.id") + "-" + topic);
}

KafkaSink<String> kafkaSink = kafkaSinkBuilder.build();
process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("bigint.unsigned.handling.mode", "long");
debeziumProperties.setProperty("decimal.handling.mode", "string");
if (Asserts.isNotNullString(serverTimeZone)
&& Asserts.isNotNullString(config.getDebezium().get("datetime.type"))) {
debeziumProperties.setProperty("datetime.format.timestamp.zone", serverTimeZone);
}

config.getDebezium().forEach((key, value) -> {
if (Asserts.isNotNullString(key) && Asserts.isNotNullString(value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,21 @@ public DataStreamSource<String> build(
.setTopic(config.getSink().get("topic"))
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.valueOf(
env.getCheckpointingMode().name()));
.setTransactionalIdPrefix(config.getSink().get("transactional.id.prefix") == null
? ""
: config.getSink().get("transactional.id.prefix"))
.setDeliverGuarantee(
DeliveryGuarantee.valueOf(config.getSink().get("delivery.guarantee") == null
? "NONE"
: config.getSink().get("delivery.guarantee")));
if (!kafkaProducerConfig.isEmpty()) {
kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig);
}
if (!kafkaProducerConfig.isEmpty()
&& kafkaProducerConfig.containsKey(TRANSACTIONAL_ID)
&& Asserts.isNotNullString(kafkaProducerConfig.getProperty(TRANSACTIONAL_ID))) {
kafkaSinkBuilder.setTransactionalIdPrefix(kafkaProducerConfig.getProperty(TRANSACTIONAL_ID));
}
KafkaSink<String> kafkaSink = kafkaSinkBuilder.build();
dataStreamSource.sinkTo(kafkaSink);
} else {
Map<Table, OutputTag<String>> tagMap = new LinkedHashMap<>();
Map<String, String> tableTopicMap = this.getTableTopicMap();
Map<String, Table> tableMap = new LinkedHashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
SingleOutputStreamOperator<Map> mapOperator = dataStreamSource
Expand Down Expand Up @@ -147,24 +148,31 @@ public void processElement(Map map, ProcessFunction<Map, String>.Context ctx, Co

tagMap.forEach((k, v) -> {
String topic = getSinkTableName(k);
if (tableTopicMap != null) {
String tableName = k.getName();
String newTopic = tableTopicMap.get(tableName);
if (Asserts.isNotNullString(newTopic)) {
topic = newTopic;
}
}
org.apache.flink.connector.kafka.sink.KafkaSinkBuilder<String> kafkaSinkBuilder =
KafkaSink.<String>builder()
.setBootstrapServers(config.getSink().get("brokers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.valueOf(
env.getCheckpointingMode().name()));
.setTransactionalIdPrefix(config.getSink().get("transactional.id.prefix") == null
? ""
: config.getSink().get("transactional.id.prefix"))
.setDeliverGuarantee(
DeliveryGuarantee.valueOf(config.getSink().get("delivery.guarantee") == null
? "NONE"
: config.getSink().get("delivery.guarantee")));
if (!kafkaProducerConfig.isEmpty()) {
kafkaSinkBuilder.setKafkaProducerConfig(kafkaProducerConfig);
}
if (!kafkaProducerConfig.isEmpty()
&& kafkaProducerConfig.containsKey(TRANSACTIONAL_ID)
&& Asserts.isNotNullString(kafkaProducerConfig.getProperty(TRANSACTIONAL_ID))) {
kafkaSinkBuilder.setTransactionalIdPrefix(
kafkaProducerConfig.getProperty(TRANSACTIONAL_ID) + "-" + topic);
}

KafkaSink<String> kafkaSink = kafkaSinkBuilder.build();
process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ public DataStreamSource<String> build(
String topic = getSinkTableName(table);
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
topic = config.getSink().get("topic");
} else {
Map<String, String> tableTopicMap = this.getTableTopicMap();
if (tableTopicMap != null) {
String newTopic = tableTopicMap.get(tableName);
if (Asserts.isNotNullString(newTopic)) {
topic = newTopic;
}
}
}
List<String> columnNameList = new LinkedList<>();
List<LogicalType> columnTypeList = new LinkedList<>();
Expand Down Expand Up @@ -193,8 +201,7 @@ public void processElement(Map value, Context context, Collector<String> collect
}
}
});
stringOperator.addSink(new FlinkKafkaProducer<String>(
config.getSink().get("brokers"), topic, new SimpleStringSchema()));
stringOperator.addSink(new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), getProperties()));
}
}
} catch (Exception ex) {
Expand Down
6 changes: 5 additions & 1 deletion dinky-core/src/main/java/org/dinky/api/FlinkAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ public class FlinkAPI {
private static final ObjectMapper mapper = new ObjectMapper();

public FlinkAPI(String address) {
this.address = address;
if (address.startsWith(NetConstant.HTTP) || address.startsWith(NetConstant.HTTPS)) {
this.address = address;
} else {
this.address = NetConstant.HTTP + address;
}
}

public static FlinkAPI build(String address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ public interface FlinkConstant {
String LOCAL_HOST = "localhost";
/** changlog op */
String OP = "op";

/** flink 默认端口 **/
Integer FLINK_REST_DEFAULT_PORT = 8081;
}
10 changes: 8 additions & 2 deletions dinky-core/src/main/java/org/dinky/executor/ExecutorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.dinky.executor;

import org.dinky.assertion.Asserts;
import org.dinky.constant.FlinkConstant;
import org.dinky.data.constant.NetConstant;
import org.dinky.data.enums.GatewayType;

import java.util.ArrayList;
Expand Down Expand Up @@ -167,14 +169,18 @@ public static ExecutorConfig build(

String host = null;
Integer port = null;
String hostPort = address;
if (Asserts.isNotNullString(address)) {
String[] strings = address.split(":");
if (address.startsWith(NetConstant.HTTP) || address.startsWith(NetConstant.HTTPS)) {
hostPort = address.replace(NetConstant.HTTP, "").replace(NetConstant.HTTPS, "");
}
String[] strings = hostPort.split(":");
if (strings.length > 1) {
host = strings[0];
port = Integer.parseInt(strings[1]);
} else {
host = strings[0];
port = 8081;
port = FlinkConstant.FLINK_REST_DEFAULT_PORT;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public void run() throws Exception {
}
GatewayResult gatewayResult = null;
config.addGatewayConfig(executor.getSetConfig());
config.addGatewayConfig(
executor.getCustomTableEnvironment().getConfig().getConfiguration());
config.getGatewayConfig().setSql(jobParam.getParsedSql());

if (runMode.isApplicationMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ export type OperatorType = {
};
const JobOperator = (props: OperatorType) => {
const { jobDetail, refesh } = props;
const webUri = `${API_CONSTANTS.BASE_URL}/api/flink/${jobDetail?.clusterInstance?.jobManagerHost}/#/job/running/${jobDetail?.instance?.jid}/overview`;
const jobManagerHost = jobDetail?.clusterInstance?.jobManagerHost;
const webUri = jobManagerHost?.startsWith('http://') || jobManagerHost?.startsWith('https://')
? jobManagerHost
: `${API_CONSTANTS.BASE_URL}/api/flink/${jobManagerHost}/#/job/running/${jobDetail?.instance?.jid}/overview`;

const handleJobOperator = (key: string) => {
Modal.confirm({
Expand Down

0 comments on commit 237b45a

Please sign in to comment.