Skip to content

Commit

Permalink
[Feature][Web]Flink jar add form (#3949)
Browse files Browse the repository at this point in the history
Co-authored-by: zackyoungh <zackyoungh@users.noreply.github.com>
  • Loading branch information
zackyoungh and zackyoungh authored Nov 24, 2024
1 parent 3f46587 commit 9b8108f
Show file tree
Hide file tree
Showing 21 changed files with 470 additions and 116 deletions.
67 changes: 67 additions & 0 deletions dinky-admin/src/main/java/org/dinky/controller/TaskController.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,23 @@
import org.dinky.data.enums.Status;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.exception.SqlExplainExcepition;
import org.dinky.data.model.JarSubmitParam;
import org.dinky.data.model.Task;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.data.vo.FlinkJarSqlConvertVO;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
import org.dinky.mybatis.annotation.Save;
import org.dinky.service.TaskService;
import org.dinky.trans.ExecuteJarParseStrategyUtil;
import org.dinky.utils.SqlUtil;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
Expand All @@ -61,7 +67,14 @@

import cn.dev33.satoken.annotation.SaCheckLogin;
import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.codec.Base64;
import cn.hutool.core.lang.Dict;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.lang.tree.Tree;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.extra.template.TemplateConfig;
import cn.hutool.extra.template.TemplateEngine;
import cn.hutool.extra.template.engine.freemarker.FreemarkerEngine;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
Expand All @@ -77,6 +90,8 @@
public class TaskController {

private final TaskService taskService;
private static final TemplateEngine ENGINE =
new FreemarkerEngine(new TemplateConfig("templates", TemplateConfig.ResourceMode.CLASSPATH));

@GetMapping("/submitTask")
@ApiOperation("Submit Task")
Expand Down Expand Up @@ -283,4 +298,56 @@ public Result<List<TaskDTO>> getMyTask() {
int id = StpUtil.getLoginIdAsInt();
return Result.succeed(taskService.getUserTasks(id));
}

@PostMapping("/flinkJarSqlConvertForm")
@ApiOperation("FlinkJar SqlConvertForm")
public Result<FlinkJarSqlConvertVO> flinkJarSqlConvertForm(@RequestBody TaskDTO taskDTO) {

String sqlStatement = taskDTO.getStatement();
String[] statements = SqlUtil.getStatements(sqlStatement);
FlinkJarSqlConvertVO flinkJarSqlConvertVO = new FlinkJarSqlConvertVO();
flinkJarSqlConvertVO.setJarSubmitParam(JarSubmitParam.empty());
if (ArrayUtil.isEmpty(statements)) {
flinkJarSqlConvertVO.setInitSqlStatement(sqlStatement);
return Result.succeed(flinkJarSqlConvertVO);
}
Integer lastExecuteJarSqlStatementIndex = null;
for (int i = 0; i < statements.length; i++) {
if (ExecuteJarParseStrategyUtil.match(statements[i])) {
lastExecuteJarSqlStatementIndex = i;
}
}
if (lastExecuteJarSqlStatementIndex == null) {
return Result.succeed(flinkJarSqlConvertVO);
}
String lastSqlStatement = statements[lastExecuteJarSqlStatementIndex];
JarSubmitParam info = JarSubmitParam.getInfo(lastSqlStatement);
flinkJarSqlConvertVO.setJarSubmitParam(info);
String sql = Arrays.stream(ArrayUtil.remove(statements, lastExecuteJarSqlStatementIndex))
.map(x -> x + ";")
.collect(Collectors.joining("\n"));
flinkJarSqlConvertVO.setInitSqlStatement(sql);
return Result.succeed(flinkJarSqlConvertVO);
}

@PostMapping("/flinkJarFormConvertSql")
@ApiOperation("FlinkJar FormConvertSql")
public Result<String> flinkJarFormConvertSql(@RequestBody FlinkJarSqlConvertVO dto) {
JarSubmitParam jarSubmitParam = dto.getJarSubmitParam();
Dict objectMap = Dict.create()
.set("uri", Opt.ofNullable(jarSubmitParam.getUri()).orElse(""))
.set(
"args",
"base64@"
+ Base64.encode(
Opt.ofNullable(jarSubmitParam.getArgs()).orElse("")))
.set("mainClass", Opt.ofNullable(jarSubmitParam.getMainClass()).orElse(""))
.set(
"allowNonRestoredState",
Opt.ofNullable(jarSubmitParam.getAllowNonRestoredState())
.orElse(false)
.toString());
String executeJarSql = ENGINE.getTemplate("executeJar.sql").render(objectMap);
return Result.succeed(dto.getInitSqlStatement() + "\n" + executeJarSql, "");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.vo;

import org.dinky.data.model.JarSubmitParam;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class FlinkJarSqlConvertVO {
private String initSqlStatement;
private JarSubmitParam jarSubmitParam;
}
6 changes: 6 additions & 0 deletions dinky-admin/src/main/resources/templates/executeJar.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
EXECUTE JAR WITH (
'uri'='${uri}',
'main-class'='${mainClass}',
'args'='${args}',
'allowNonRestoredState'='${allowNonRestoredState}'
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.model;

import org.dinky.parser.SqlSegment;

import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.codec.Base64;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import lombok.Getter;
import lombok.Setter;

@Setter
@Getter
public class JarSubmitParam {
protected JarSubmitParam() {}

private String uri;
private String mainClass;
private String args;
private String parallelism;
private String savepointPath;
private Boolean allowNonRestoredState = SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.defaultValue();

public static JarSubmitParam build(String statement) {
JarSubmitParam submitParam = getInfo(statement);
Assert.notBlank(submitParam.getUri());
return submitParam;
}

public static JarSubmitParam getInfo(String statement) {
statement = statement.replace("\r\n", " ").replace("\n", " ") + " ENDOFSQL";
SqlSegment sqlSegment = new SqlSegment("with", "(with\\s+\\()(.+)(\\))", "',");
sqlSegment.parse(statement);
List<String> bodyPieces = sqlSegment.getBodyPieces();
Map<String, String> keyValue = getKeyValue(bodyPieces);
return BeanUtil.toBean(
keyValue,
JarSubmitParam.class,
CopyOptions.create().setFieldNameEditor(s -> StrUtil.toCamelCase(s, '-')));
}

private static Map<String, String> getKeyValue(List<String> list) {
Map<String, String> map = new HashMap<>();
Pattern p = Pattern.compile("'(.*?)'\\s*=\\s*'(.*?)'");
for (String s : list) {
Matcher m = p.matcher(s + "'");
if (m.find()) {
map.put(m.group(1), m.group(2));
}
}
return map;
}

public static JarSubmitParam empty() {
JarSubmitParam jarSubmitParam = new JarSubmitParam();
jarSubmitParam.setArgs("");
jarSubmitParam.setMainClass("");
jarSubmitParam.setUri("");
return jarSubmitParam;
}

public String getArgs() {
if (StrUtil.subPre(args, 7).equals("base64@")) {
return Base64.decodeStr(StrUtil.removePrefix(args, "base64@"));
}
return args;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.trans;

import org.dinky.trans.parse.ExecuteJarParseStrategy;

import cn.hutool.core.lang.Singleton;

public class ExecuteJarParseStrategyUtil {

public static boolean match(String statement) {
return Singleton.get(ExecuteJarParseStrategy.class).match(statement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@

import org.dinky.config.Dialect;
import org.dinky.context.TaskContextHolder;
import org.dinky.data.model.JarSubmitParam;
import org.dinky.executor.CustomTableEnvironment;
import org.dinky.trans.AbstractOperation;
import org.dinky.trans.ExtendOperation;
import org.dinky.trans.parse.ExecuteJarParseStrategy;
import org.dinky.utils.FlinkStreamEnvironmentUtil;
import org.dinky.utils.URLUtils;

import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
Expand All @@ -47,11 +46,8 @@
import java.util.Optional;

import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.util.StrUtil;
import lombok.Getter;
import lombok.Setter;

public class ExecuteJarOperation extends AbstractOperation implements ExtendOperation {

Expand Down Expand Up @@ -148,23 +144,4 @@ public Pipeline explainJar(CustomTableEnvironment tEnv) {
public Pipeline explain(CustomTableEnvironment tEnv, List<URL> classpaths) {
return getStreamGraph(tEnv, classpaths);
}

@Setter
@Getter
public static class JarSubmitParam {
protected JarSubmitParam() {}

private String uri;
private String mainClass;
private String args;
private String parallelism;
private String savepointPath;
private Boolean allowNonRestoredState = SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.defaultValue();

public static JarSubmitParam build(String statement) {
JarSubmitParam submitParam = ExecuteJarParseStrategy.getInfo(statement);
Assert.notBlank(submitParam.getUri());
return submitParam;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,13 @@

package org.dinky.trans.parse;

import org.dinky.parser.SqlSegment;
import org.dinky.trans.dml.ExecuteJarOperation;

import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.parse.AbstractRegexParseStrategy;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.util.StrUtil;

public class ExecuteJarParseStrategy extends AbstractRegexParseStrategy {
private static final String PATTERN_STR = "^EXECUTE\\s+JAR\\s+WITH\\s*\\(.+\\)";
private static final Pattern PATTERN = Pattern.compile(PATTERN_STR, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
Expand All @@ -44,30 +35,6 @@ public ExecuteJarParseStrategy() {
super(PATTERN);
}

public static ExecuteJarOperation.JarSubmitParam getInfo(String statement) {
statement = statement.replace("\r\n", " ").replace("\n", " ") + " ENDOFSQL";
SqlSegment sqlSegment = new SqlSegment("with", "(with\\s+\\()(.+)(\\))", "',");
sqlSegment.parse(statement);
List<String> bodyPieces = sqlSegment.getBodyPieces();
Map<String, String> keyValue = getKeyValue(bodyPieces);
return BeanUtil.toBean(
keyValue,
ExecuteJarOperation.JarSubmitParam.class,
CopyOptions.create().setFieldNameEditor(s -> StrUtil.toCamelCase(s, '-')));
}

private static Map<String, String> getKeyValue(List<String> list) {
Map<String, String> map = new HashMap<>();
Pattern p = Pattern.compile("'(.*?)'\\s*=\\s*'(.*?)'");
for (String s : list) {
Matcher m = p.matcher(s + "'");
if (m.find()) {
map.put(m.group(1), m.group(2));
}
}
return map;
}

@Override
public Operation convert(String statement) {
return new ExecuteJarOperation(statement);
Expand Down
Loading

0 comments on commit 9b8108f

Please sign in to comment.