Skip to content

Commit

Permalink
Merge pull request #1105 from WeDataSphere/master
Browse files Browse the repository at this point in the history
Delete unuseful code and update sparketl appconn.
  • Loading branch information
zqburde authored Aug 17, 2023
2 parents 46ba624 + 77de42e commit a780e98
Show file tree
Hide file tree
Showing 13 changed files with 449 additions and 141 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ jobs:
- name: Build frontend by node.js
run: |
cd web
npm install lerna -g
npm install lerna@4.0.0 -g
lerna bootstrap
npm run build
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.webank.wedatasphere.dss.appconn.sparketl.query;

import com.webank.wedatasphere.dss.standard.app.development.operation.AbstractDevelopmentOperation;
import com.webank.wedatasphere.dss.standard.app.development.operation.RefQueryJumpUrlOperation;
import com.webank.wedatasphere.dss.standard.app.development.ref.QueryJumpUrlResponseRef;
import com.webank.wedatasphere.dss.standard.app.development.ref.impl.OnlyDevelopmentRequestRef;
import com.webank.wedatasphere.dss.standard.app.development.utils.QueryJumpUrlConstant;
import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationFailedException;
import org.apache.commons.lang3.StringUtils;

public class SparkEtlRefQueryJumpUrlOperation extends AbstractDevelopmentOperation<OnlyDevelopmentRequestRef.QueryJumpUrlRequestRefImpl, QueryJumpUrlResponseRef>
implements RefQueryJumpUrlOperation<OnlyDevelopmentRequestRef.QueryJumpUrlRequestRefImpl, QueryJumpUrlResponseRef> {

@Override
public QueryJumpUrlResponseRef query(OnlyDevelopmentRequestRef.QueryJumpUrlRequestRefImpl requestRef) throws ExternalOperationFailedException {
String jumpUrl = mergeBaseUrl("#/sparketl?resourceId=%s&version=%s&%s=%s&%s=%s");
String resourceId = (String) requestRef.getRefJobContent().get("resourceId");
String version = (String) requestRef.getRefJobContent().get("version");
if(StringUtils.isBlank(resourceId) || StringUtils.isBlank(version)) {
logger.info("resourceId or version is empty, maybe user {} want to create a new node.", requestRef.getUserName());
resourceId = "";
version = "";
}
jumpUrl = String.format(jumpUrl, resourceId, version, QueryJumpUrlConstant.NODE_ID.getKey(),
QueryJumpUrlConstant.NODE_ID.getValue(), QueryJumpUrlConstant.PROJECT_NAME.getKey(), QueryJumpUrlConstant.PROJECT_NAME.getValue());
return QueryJumpUrlResponseRef.newBuilder().setJumpUrl(jumpUrl).build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.webank.wedatasphere.dss.appconn.sparketl.query;

import com.webank.wedatasphere.dss.standard.app.development.operation.RefQueryJumpUrlOperation;
import com.webank.wedatasphere.dss.standard.app.development.service.AbstractRefQueryService;

public class SparkEtlRefQueryService extends AbstractRefQueryService {

@Override
protected RefQueryJumpUrlOperation createRefQueryOperation() {
return new SparkEtlRefQueryJumpUrlOperation();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.webank.wedatasphere.dss.appconn.sparketl.standard;

import com.webank.wedatasphere.dss.appconn.sparketl.execution.SparkEtlExecutionService;
import com.webank.wedatasphere.dss.appconn.sparketl.query.SparkEtlRefQueryService;
import com.webank.wedatasphere.dss.standard.app.development.service.RefExecutionService;
import com.webank.wedatasphere.dss.standard.app.development.service.RefQueryService;
import com.webank.wedatasphere.dss.standard.app.development.standard.OnlyExecutionDevelopmentStandard;

public class SparkEtlDevelopmentStandard extends OnlyExecutionDevelopmentStandard {
Expand All @@ -11,6 +13,11 @@ protected RefExecutionService createRefExecutionService() {
return new SparkEtlExecutionService();
}

@Override
protected RefQueryService createRefQueryService() {
return new SparkEtlRefQueryService();
}

@Override
public void init() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.webank.wedatasphere.dss.framework.appconn.restful;

import com.webank.wedatasphere.dss.appconn.core.ext.OnlyDevelopmentAppConn;
import com.webank.wedatasphere.dss.appconn.manager.AppConnManager;
import com.webank.wedatasphere.dss.common.label.DSSLabel;
import com.webank.wedatasphere.dss.common.label.EnvDSSLabel;
import com.webank.wedatasphere.dss.common.utils.DSSCommonUtils;
import com.webank.wedatasphere.dss.standard.app.development.operation.AppConn2LinkisRefExecutionOperation;
import com.webank.wedatasphere.dss.standard.app.development.ref.AppConn2LinkisResponseRef;
import com.webank.wedatasphere.dss.standard.app.development.ref.RefJobContentRequestRef;
import com.webank.wedatasphere.dss.standard.app.development.service.RefExecutionService;
import com.webank.wedatasphere.dss.standard.app.development.utils.DevelopmentOperationUtils;
import com.webank.wedatasphere.dss.standard.app.sso.Workspace;
import com.webank.wedatasphere.dss.standard.common.desc.AppInstance;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.security.SecurityFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

@RequestMapping(path = "/dss/framework/appconn", produces = {"application/json"})
@RestController
public class AppConn2LinkisRefExecutionRestfulApi {

private static final Logger LOGGER = LoggerFactory.getLogger(AppConn2LinkisRefExecutionRestfulApi.class);


@Autowired
private AppConnManagerRestfulApi appConnManagerRestfulApi;

@RequestMapping(path = "execute", method = RequestMethod.POST)
public Message execute(HttpServletRequest request, @RequestBody Map<String, Object> json) {
String userName = SecurityFilter.getLoginUsername(request);
LOGGER.info("user {} try to transform jobContent to Linkis job with requestBody {}.", userName, json);
String workspaceStr = (String) json.get("workspaceStr");
String appConnName = (String) json.get("appConnName");
String labelStr = (String) json.get("labels");
Map<String, Object> refJobContent = (Map<String, Object>) json.get("jobContent");
Workspace workspace = DSSCommonUtils.COMMON_GSON.fromJson(workspaceStr, Workspace.class);
OnlyDevelopmentAppConn appConn = (OnlyDevelopmentAppConn) AppConnManager.getAppConnManager().getAppConn(appConnName);
AppInstance appInstance;
List<DSSLabel> labels = Arrays.asList(new EnvDSSLabel(labelStr));
if(appConn.getAppDesc().getAppInstances().size() == 1) {
appInstance = appConn.getAppDesc().getAppInstances().get(0);
} else {
appInstance = appConn.getAppDesc().getAppInstancesByLabels(labels).get(0);
}
AppConn2LinkisResponseRef responseRef = DevelopmentOperationUtils.tryDevelopmentOperation(() -> appConn.getOrCreateDevelopmentStandard().getRefExecutionService(appInstance),
developmentService -> ((RefExecutionService) developmentService).getRefExecutionOperation(),
null, refJobContentRequestRef -> refJobContentRequestRef.setRefJobContent(refJobContent),
null, null, (developmentOperation, developmentRequestRef) -> {
developmentRequestRef.setWorkspace(workspace).setUserName(userName).setDSSLabels(labels);
return ((AppConn2LinkisRefExecutionOperation) developmentOperation).execute((RefJobContentRequestRef) developmentRequestRef);
}, null, "fetch linkis jobContent from appConn " + appConnName + " failed.");
return Message.ok().data("executionCode", responseRef.getCode()).data("params", responseRef.getParams())
.data("engineType", responseRef.getEngineType()).data("runType", responseRef.getRunType());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.webank.wedatasphere.dss.linkis.node.execution.entity;

import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils;
import org.apache.linkis.httpclient.request.POSTAction;
import org.apache.linkis.httpclient.request.UserAction;

public class AppConn2LinkisPostAction extends POSTAction implements UserAction {

private String url;
private String user;

@Override
public String getRequestPayload() {
return LinkisJobExecutionUtils.gson.toJson(getRequestPayloads());
}

public void setUrl(String url) {
this.url = url;
}

@Override
public String getURL() {
return url;
}

@Override
public void setUser(String user) {
this.user = user;
}

@Override
public String getUser() {
return user;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import com.webank.wedatasphere.dss.linkis.node.execution.execution.LinkisNodeExecution;
import com.webank.wedatasphere.dss.linkis.node.execution.job.Job;
import com.webank.wedatasphere.dss.linkis.node.execution.listener.LinkisExecutionListener;
import com.webank.wedatasphere.dss.linkis.node.execution.parser.CodeParser;
import com.webank.wedatasphere.dss.linkis.node.execution.parser.JobParamsParser;
import com.webank.wedatasphere.dss.linkis.node.execution.parser.JobParser;
import com.webank.wedatasphere.dss.linkis.node.execution.parser.JobRuntimeParamsParser;
import com.webank.wedatasphere.dss.linkis.node.execution.parser.*;
import com.webank.wedatasphere.dss.linkis.node.execution.service.LinkisURLService;
import com.webank.wedatasphere.dss.linkis.node.execution.service.impl.BuildJobActionImpl;
import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils;
Expand Down Expand Up @@ -59,6 +56,8 @@ private LinkisNodeExecutionImpl() {
registerJobParser(new CodeParser());
registerJobParser(new JobRuntimeParamsParser());
registerJobParser(new JobParamsParser());
registerJobParser(new BML2LinkisJobParser());
registerJobParser(new AppConn2LinkisJobParser());
}

public static LinkisNodeExecution getLinkisNodeExecution() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.webank.wedatasphere.dss.linkis.node.execution.parser;

import com.google.gson.reflect.TypeToken;
import com.webank.wedatasphere.dss.linkis.node.execution.conf.LinkisJobExecutionConfiguration;
import com.webank.wedatasphere.dss.linkis.node.execution.entity.AppConn2LinkisPostAction;
import com.webank.wedatasphere.dss.linkis.node.execution.entity.BMLResource;
import com.webank.wedatasphere.dss.linkis.node.execution.exception.LinkisJobExecutionErrorException;
import com.webank.wedatasphere.dss.linkis.node.execution.job.AppConnLinkisJob;
import com.webank.wedatasphere.dss.linkis.node.execution.job.Job;
import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob;
import com.webank.wedatasphere.dss.linkis.node.execution.service.LinkisURLService;
import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils;
import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisUjesClientUtils;
import org.apache.linkis.httpclient.dws.DWSHttpClient;
import org.apache.linkis.httpclient.dws.config.DWSClientConfig;
import org.apache.linkis.httpclient.response.impl.DefaultHttpResult;

import java.util.HashMap;
import java.util.Map;

public class AppConn2LinkisJobParser extends BML2LinkisJobParser {

@Override
public void parseJob(Job job) throws Exception {
if(!(job instanceof AppConnLinkisJob)) {
return;
}
AppConnLinkisJob appConnLinkisJob = (AppConnLinkisJob) job;
String runType = appConnLinkisJob.getRunType();
// 只处理包含 appconn2linkis 的 AppConnLinkisJob,例如:linkis.appconn.<appconnName>.appconn2linkis
if(!runType.toLowerCase().contains("appconn2linkis")) {
return;
}
job.getLogObj().info(String.format("AppConn %s try to generate Linkis jobContent from AppConn execution.", runType));
getAndSetCode(null, appConnLinkisJob);
}

@Override
protected void dealExecutionParams(LinkisJob linkisAppConnJob, Map<String, Object> executionParams) {
String engineType = (String) executionParams.get("engineType");
String runType = (String) executionParams.get("runType");
setEngineType(linkisAppConnJob, engineType, runType);
}

@Override
protected Map<String, Object> getExecutionParams(BMLResource bmlResource, LinkisJob job) {
String user = job.getUser();
String linkisUrl = LinkisURLService.Factory.getLinkisURLService().getDefaultLinkisURL(job);
String token = LinkisJobExecutionConfiguration.LINKIS_AUTHOR_USER_TOKEN.getValue(job.getJobProps());
DWSHttpClient client = null;
DWSClientConfig clientConfig = LinkisUjesClientUtils.getClientConfig1_X(linkisUrl, user, token, job.getJobProps());
AppConn2LinkisPostAction appConn2LinkisPostAction = new AppConn2LinkisPostAction();
appConn2LinkisPostAction.setUrl("/api/rest_j/v1/dss/framework/appconn/execute");
appConn2LinkisPostAction.addRequestPayload("workspaceStr", job.getRuntimeParams().get("workspace"));
appConn2LinkisPostAction.addRequestPayload("appConnName", job.getRunType().split("\\.")[0]);
appConn2LinkisPostAction.addRequestPayload("labels", getLabels(job.getJobProps().get("labels")));
Map<String, Object> jobContent = new HashMap<>();
if(job.getCode() != null && !job.getCode().isEmpty()) {
jobContent.putAll(LinkisJobExecutionUtils.gson.fromJson(job.getCode(), new TypeToken<Map<String, Object>>() {}.getType()));
}
jobContent.putAll(job.getParams());
appConn2LinkisPostAction.addRequestPayload("jobContent", jobContent);
appConn2LinkisPostAction.setUser(user);
job.getLogObj().info(String.format("try to ask AppConn %s to execute AppConn2Linkis with requestBody %s.", job.getRunType(), appConn2LinkisPostAction.getRequestPayload()));
appConn2LinkisPostAction.addHeader("Referer", "");
try {
client = new DWSHttpClient(clientConfig, "Workspace-Fetch-Client-");
DefaultHttpResult result = (DefaultHttpResult) client.execute(appConn2LinkisPostAction);
if (result.getStatusCode() == 200 || result.getStatusCode() == 0) {
Map<String, Object> responseBody = LinkisJobExecutionUtils.gson.fromJson(result.getResponseBody(), Map.class);
return (Map<String, Object>) responseBody.get("data");
} else {
throw new LinkisJobExecutionErrorException(50063, "Failed to get workspace str, responseBody is: " +
result.getResponseBody());
}
} finally {
if(client != null) {
client.close();
}
}
}

private String getLabels(String labels) {
if (labels.contains("route") || labels.contains("DSSEnv") ) {
Map<String, Object> labelMap = LinkisJobExecutionUtils.gson.fromJson(labels, Map.class);
return (String) labelMap.getOrDefault("route", labelMap.getOrDefault("DSSEnv", labels));
} else {
return labels;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.webank.wedatasphere.dss.linkis.node.execution.parser;

import com.google.gson.reflect.TypeToken;
import com.webank.wedatasphere.dss.linkis.node.execution.entity.BMLResource;
import com.webank.wedatasphere.dss.linkis.node.execution.exception.LinkisJobExecutionErrorException;
import com.webank.wedatasphere.dss.linkis.node.execution.job.AppConnLinkisJob;
import com.webank.wedatasphere.dss.linkis.node.execution.job.Job;
import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils;

import java.util.Map;

public class BML2LinkisJobParser extends CodeParser {

@Override
public void parseJob(Job job) throws Exception {
if(!(job instanceof AppConnLinkisJob)) {
return;
}
AppConnLinkisJob appConnLinkisJob = (AppConnLinkisJob) job;
String runType = appConnLinkisJob.getRunType();
// 只处理包含 bml2linkis 的 AppConnLinkisJob,例如:linkis.appconn.<appconnName>.bml2linkis
if(!runType.toLowerCase().contains("bml2linkis")) {
return;
}
job.getLogObj().info(String.format("AppConn %s try to generate Linkis jobContent from code %s.", runType,
job.getCode()));
Map<String, Object> script = LinkisJobExecutionUtils.gson.fromJson(job.getCode(), new TypeToken<Map<String, Object>>() {}.getType());
if(!script.containsKey("resourceId") || !script.containsKey("version") || !script.containsKey("fileName")) {
job.getLogObj().error("the code error, resourceId, version or fileName is not exists.");
throw new LinkisJobExecutionErrorException(90100, "cannot recognize fileName from jobContent.");
}
BMLResource bmlResource = new BMLResource();
bmlResource.setResourceId((String) script.get("resourceId"));
bmlResource.setVersion((String) script.get("version"));
// fileName 的格式为 ${resourceId}.${engineType}.${runType}
bmlResource.setFileName((String) script.get("fileName"));
getAndSetCode(bmlResource, appConnLinkisJob);
String[] fileNameArray = bmlResource.getFileName().split("\\.");
if(fileNameArray.length < 3) {
job.getLogObj().error(String.format("cannot recognize fileName %s, the fileName format must be ${resourceId}.${engineType}.${runType}", bmlResource.getFileName()));
throw new LinkisJobExecutionErrorException(90100, "cannot recognize fileName from jobContent.");
}
String realEngineType = fileNameArray[fileNameArray.length - 2];
String realRunType = fileNameArray[fileNameArray.length - 1];
setEngineType(job, realEngineType, realRunType);
}

protected void setEngineType(Job job, String realEngineType, String realRunType) {
job.getLogObj().warn(String.format("switch job from engineType %s with runType %s to engineType %s with runType %s",
job.getEngineType(), job.getRunType(), realEngineType, realRunType));
job.setEngineType(realEngineType);
job.setRunType(realRunType);
}

}
Loading

0 comments on commit a780e98

Please sign in to comment.