Skip to content

Commit

Permalink
[bug][core] fix the issue of Object not found when mocking statement (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
MactavishCui authored Nov 13, 2024
1 parent c936749 commit 7cf19b4
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.dinky.utils.JsonUtils;
import org.dinky.utils.LineageContext;

import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigOption;
Expand Down Expand Up @@ -184,6 +185,11 @@ public Configuration getRootConfiguration() {
}
}

@Override
public SqlNode parseSql(String sql) {
return getParser().parseSql(sql);
}

private static Executor lookupExecutor(
ClassLoader classLoader, String executorIdentifier, StreamExecutionEnvironment executionEnvironment) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.dinky.parser.CustomParserImpl;
import org.dinky.utils.LineageContext;

import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigOption;
Expand Down Expand Up @@ -284,6 +285,11 @@ public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extr
return record;
}

@Override
public SqlNode parseSql(String sql) {
return getParser().parseSql(sql);
}

@Override
public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
List<Expression> expressions = ExpressionParser.INSTANCE.parseExpressionList(fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.dinky.parser.CustomParserImpl;
import org.dinky.utils.JsonUtils;

import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
Expand Down Expand Up @@ -167,4 +168,9 @@ public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extr
record.setExplain(getPlanner().explain(operations, extraDetails));
return record;
}

@Override
public SqlNode parseSql(String sql) {
return ((ExtendedParser) getParser()).parseSql(sql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.dinky.parser.CustomParserImpl;

import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
Expand Down Expand Up @@ -144,4 +145,9 @@ public StreamGraph getStreamGraphFromInserts(List<String> statements) {

return transOperatoinsToStreamGraph(modifyOperations);
}

@Override
public SqlNode parseSql(String sql) {
return ((ExtendedParser) getParser()).parseSql(sql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.dinky.operations.CustomNewParserImpl;

import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
Expand Down Expand Up @@ -149,4 +150,9 @@ public StreamGraph getStreamGraphFromInserts(List<String> statements) {
public void createCatalog(String catalogName, CatalogDescriptor catalogDescriptor) {
getCatalogManager().createCatalog(catalogName, catalogDescriptor);
}

@Override
public SqlNode parseSql(String sql) {
return ((ExtendedParser) getParser()).getCustomParser().parseSql(sql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.dinky.operations.CustomNewParserImpl;

import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
Expand Down Expand Up @@ -157,4 +158,9 @@ public TableResultInternal executeCachedPlanInternal(CachedPlan cachedPlan) {

return null;
}

@Override
public SqlNode parseSql(String sql) {
return ((ExtendedParser) getParser()).getCustomParser().parseSql(sql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.dinky.operations.CustomNewParserImpl;

import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
Expand Down Expand Up @@ -157,4 +158,9 @@ public TableResultInternal executeCachedPlanInternal(CachedPlan cachedPlan) {

return null;
}

@Override
public SqlNode parseSql(String sql) {
return ((ExtendedParser) getParser()).getCustomParser().parseSql(sql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;

import org.apache.calcite.sql.SqlNode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
Expand Down Expand Up @@ -70,6 +71,8 @@ public interface CustomTableEnvironment

Configuration getRootConfiguration();

SqlNode parseSql(String sql);

default JobGraph getJobGraphFromInserts(List<String> statements) {
return getStreamGraphFromInserts(statements).getJobGraph();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,19 @@
import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;

import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -89,22 +86,23 @@ private void mockSink(JobParam jobParam) {
// mock insert table ddl
List<StatementParam> mockedDdl = new ArrayList<>();
for (StatementParam ddl : jobParam.getDdl()) {
List<Operation> parseOperationList = tableEnv.getParser().parse(ddl.getValue());
for (Operation operation : parseOperationList) {
if (operation instanceof CreateTableOperation) {
CreateTableOperation createOperation = (CreateTableOperation) operation;
CatalogTable catalogTable = createOperation.getCatalogTable();
// get table name and check if it should be mocked
String tableName = createOperation.getTableIdentifier().getObjectName();
if (tablesNeedMock.contains(tableName)) {
// generate mock statement
mockedDdl.add(
new StatementParam(getSinkMockDdlStatement(tableName, catalogTable), SqlType.CREATE));
} else {
mockedDdl.add(ddl);
}
SqlNode sqlNode = tableEnv.parseSql(ddl.getValue());
boolean isDdlMocked = false;
if (sqlNode instanceof SqlCreateTable) {
SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode;
String tableName = sqlCreateTable.getTableName().toString();
if (tablesNeedMock.contains(tableName)) {
// generate mock statement
mockedDdl.add(new StatementParam(
getSinkMockDdlStatement(
tableName, sqlCreateTable.getColumnList().toString()),
SqlType.CREATE));
isDdlMocked = true;
}
}
if (!isDdlMocked) {
mockedDdl.add(ddl);
}
}
jobParam.setDdl(mockedDdl);
log.debug("Mock sink succeed: {}", JsonUtils.toJsonString(jobParam));
Expand Down Expand Up @@ -155,19 +153,11 @@ private Set<String> getTableNamesNeedMockAndModifyTrans(JobParam jobParam) {
* get mocked ddl statement
*
* @param tableName table name
* @param catalogTable catalog table
* @param columns columns
* @return ddl that connector is changed as well as other options not changed
*/
private String getSinkMockDdlStatement(String tableName, CatalogTable catalogTable) {
private String getSinkMockDdlStatement(String tableName, String columns) {
String mockedOption = "'connector'='" + MockDynamicTableSinkFactory.IDENTIFIER + "'";
// columns
Schema unresolvedSchema = catalogTable.getUnresolvedSchema();
String columns = unresolvedSchema.getColumns().stream()
.map(column -> {
Schema.UnresolvedPhysicalColumn physicalColumn = (Schema.UnresolvedPhysicalColumn) column;
return physicalColumn.getName() + " " + physicalColumn.getDataType();
})
.collect(Collectors.joining(", "));
return MessageFormat.format(
MOCK_SQL_TEMPLATE,
StringUtils.join(generateMockedTableIdentifier(tableName), "."),
Expand Down

0 comments on commit 7cf19b4

Please sign in to comment.