Skip to content

Commit

Permalink
Analyze and plan for call distributed procedure statement
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Nov 3, 2024
1 parent a8f4838 commit a6d3771
Show file tree
Hide file tree
Showing 13 changed files with 605 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ public class Analysis

private final Map<NodeRef<QuerySpecification>, List<GroupingOperation>> groupingOperations = new LinkedHashMap<>();

// for call distributed procedure
private Optional<QualifiedObjectName> procedureName;
private Optional<Object[]> procedureArguments;
private Optional<TableHandle> callTarget = Optional.empty();
private Optional<QuerySpecification> targetQuery = Optional.empty();

// for create table
private Optional<QualifiedObjectName> createTableDestination = Optional.empty();
private Map<String, Expression> createTableProperties = ImmutableMap.of();
Expand Down Expand Up @@ -635,6 +641,36 @@ public Optional<QualifiedObjectName> getCreateTableDestination()
return createTableDestination;
}

public Optional<QualifiedObjectName> getProcedureName()
{
return procedureName;
}

public void setProcedureName(Optional<QualifiedObjectName> procedureName)
{
this.procedureName = procedureName;
}

public Optional<Object[]> getProcedureArguments()
{
return procedureArguments;
}

public void setProcedureArguments(Optional<Object[]> procedureArguments)
{
this.procedureArguments = procedureArguments;
}

public Optional<TableHandle> getCallTarget()
{
return callTarget;
}

public void setCallTarget(TableHandle callTarget)
{
this.callTarget = Optional.of(callTarget);
}

public Optional<TableHandle> getAnalyzeTarget()
{
return analyzeTarget;
Expand Down Expand Up @@ -984,6 +1020,16 @@ public Optional<QuerySpecification> getCurrentQuerySpecification()
return currentQuerySpecification;
}

public void setTargetQuery(QuerySpecification targetQuery)
{
this.targetQuery = Optional.of(targetQuery);
}

public Optional<QuerySpecification> getTargetQuery()
{
return this.targetQuery;
}

public Map<FunctionKind, Set<String>> getInvokedFunctions()
{
Map<FunctionKind, Set<String>> functionMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public enum SemanticErrorCode

SAMPLE_PERCENTAGE_OUT_OF_RANGE,

PROCEDURE_NOT_FOUND,
INVALID_PROCEDURE_ARGUMENTS,

INVALID_SESSION_PROPERTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,53 @@ public ListenableFuture<?> execute(Call call, TransactionManager transactionMana
.orElseThrow(() -> new SemanticException(MISSING_CATALOG, call, "Catalog %s does not exist", procedureName.getCatalogName()));
Procedure procedure = metadata.getProcedureRegistry().resolve(connectorId, toSchemaTableName(procedureName));

Map<NodeRef<Parameter>, Expression> parameterLookup = parameterExtractor(call, parameters);
Object[] values = extractParameterValuesInOrder(call, procedure, metadata, session, parameterLookup);

// validate arguments
MethodType methodType = procedure.getMethodHandle().type();
for (int i = 0; i < procedure.getArguments().size(); i++) {
if ((values[i] == null) && methodType.parameterType(i).isPrimitive()) {
String name = procedure.getArguments().get(i).getName();
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Procedure argument cannot be null: " + name);
}
}

// insert session argument
List<Object> arguments = new ArrayList<>();
Iterator<Object> valuesIterator = asList(values).iterator();
for (Class<?> type : methodType.parameterList()) {
if (ConnectorSession.class.isAssignableFrom(type)) {
arguments.add(session.toConnectorSession(connectorId));
}
else {
arguments.add(valuesIterator.next());
}
}

try {
procedure.getMethodHandle().invokeWithArguments(arguments);
}
catch (Throwable t) {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throwIfInstanceOf(t, PrestoException.class);
throw new PrestoException(PROCEDURE_CALL_FAILED, t);
}

return immediateFuture(null);
}

public static Object toTypeObjectValue(Session session, Type type, Object value)
{
BlockBuilder blockBuilder = type.createBlockBuilder(null, 1);
writeNativeValue(type, blockBuilder, value);
return type.getObjectValue(session.getSqlFunctionProperties(), blockBuilder, 0);
}

public static Object[] extractParameterValuesInOrder(Call call, Procedure procedure, Metadata metadata, Session session, Map<NodeRef<Parameter>, Expression> parameterLookup)
{
// map declared argument names to positions
Map<String, Integer> positions = new HashMap<>();
for (int i = 0; i < procedure.getArguments().size(); i++) {
Expand Down Expand Up @@ -131,7 +178,6 @@ else if (i < procedure.getArguments().size()) {

// get argument values
Object[] values = new Object[procedure.getArguments().size()];
Map<NodeRef<Parameter>, Expression> parameterLookup = parameterExtractor(call, parameters);
for (Entry<String, CallArgument> entry : names.entrySet()) {
CallArgument callArgument = entry.getValue();
int index = positions.get(entry.getKey());
Expand All @@ -156,45 +202,6 @@ else if (i < procedure.getArguments().size()) {
}
}

// validate arguments
MethodType methodType = procedure.getMethodHandle().type();
for (int i = 0; i < procedure.getArguments().size(); i++) {
if ((values[i] == null) && methodType.parameterType(i).isPrimitive()) {
String name = procedure.getArguments().get(i).getName();
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Procedure argument cannot be null: " + name);
}
}

// insert session argument
List<Object> arguments = new ArrayList<>();
Iterator<Object> valuesIterator = asList(values).iterator();
for (Class<?> type : methodType.parameterList()) {
if (ConnectorSession.class.isAssignableFrom(type)) {
arguments.add(session.toConnectorSession(connectorId));
}
else {
arguments.add(valuesIterator.next());
}
}

try {
procedure.getMethodHandle().invokeWithArguments(arguments);
}
catch (Throwable t) {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throwIfInstanceOf(t, PrestoException.class);
throw new PrestoException(PROCEDURE_CALL_FAILED, t);
}

return immediateFuture(null);
}

private static Object toTypeObjectValue(Session session, Type type, Object value)
{
BlockBuilder blockBuilder = type.createBlockBuilder(null, 1);
writeNativeValue(type, blockBuilder, value);
return type.getObjectValue(session.getSqlFunctionProperties(), blockBuilder, 0);
return values;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,30 @@ public MetadataManager(
ColumnPropertyManager columnPropertyManager,
AnalyzePropertyManager analyzePropertyManager,
TransactionManager transactionManager)
{
this(
functionAndTypeManager,
blockEncodingSerde,
sessionPropertyManager,
schemaPropertyManager,
tablePropertyManager,
columnPropertyManager,
analyzePropertyManager,
transactionManager,
new ProcedureRegistry(functionAndTypeManager));
}

@VisibleForTesting
public MetadataManager(
FunctionAndTypeManager functionAndTypeManager,
BlockEncodingSerde blockEncodingSerde,
SessionPropertyManager sessionPropertyManager,
SchemaPropertyManager schemaPropertyManager,
TablePropertyManager tablePropertyManager,
ColumnPropertyManager columnPropertyManager,
AnalyzePropertyManager analyzePropertyManager,
TransactionManager transactionManager,
IProcedureRegistry procedureRegistry)
{
this(
createTestingViewCodec(functionAndTypeManager),
Expand All @@ -178,7 +202,7 @@ public MetadataManager(
analyzePropertyManager,
transactionManager,
functionAndTypeManager,
new ProcedureRegistry(functionAndTypeManager));
procedureRegistry);
}

@Inject
Expand Down Expand Up @@ -252,6 +276,21 @@ public static MetadataManager createTestMetadataManager(TransactionManager trans
transactionManager);
}

public static MetadataManager createTestMetadataManager(TransactionManager transactionManager, FeaturesConfig featuresConfig, FunctionsConfig functionsConfig, IProcedureRegistry procedureRegistry)
{
BlockEncodingManager blockEncodingManager = new BlockEncodingManager();
return new MetadataManager(
new FunctionAndTypeManager(transactionManager, blockEncodingManager, featuresConfig, functionsConfig, new HandleResolver(), ImmutableSet.of()),
blockEncodingManager,
createTestingSessionPropertyManager(),
new SchemaPropertyManager(),
new TablePropertyManager(),
new ColumnPropertyManager(),
new AnalyzePropertyManager(),
transactionManager,
procedureRegistry);
}

@Override
public final void verifyComparableOrderableContract()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.sql.analyzer;

import com.facebook.presto.Session;
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.function.FunctionHandle;
Expand Down Expand Up @@ -108,11 +109,20 @@ public Analysis analyze(Statement statement, boolean isDescribe)
}

public Analysis analyzeSemantic(Statement statement, boolean isDescribe)
{
return analyzeSemantic(statement, Optional.empty(), isDescribe);
}

public Analysis analyzeSemantic(
Statement statement,
Optional<QualifiedObjectName> procedureName,
boolean isDescribe)
{
Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, parameterLookup, accessControl, warningCollector);
Analysis analysis = new Analysis(rewrittenStatement, parameterLookup, isDescribe);

metadataExtractor.populateMetadataHandle(session, rewrittenStatement, analysis.getMetadataHandle());
analysis.setProcedureName(procedureName);
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session, warningCollector);
analyzer.analyze(rewrittenStatement, Optional.empty());
analyzeForUtilizedColumns(analysis, analysis.getStatement());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ public QueryAnalysis analyze(AnalyzerContext analyzerContext, PreparedQuery prep
session.getWarningCollector(),
Optional.of(metadataExtractorExecutor));

Analysis analysis = analyzer.analyzeSemantic(((BuiltInQueryPreparer.BuiltInPreparedQuery) preparedQuery).getStatement(), false);
Analysis analysis = analyzer.analyzeSemantic(
((BuiltInQueryPreparer.BuiltInPreparedQuery) preparedQuery).getStatement(),
((BuiltInQueryPreparer.BuiltInPreparedQuery) preparedQuery).getDistributedProcedureName(),
false);
return new BuiltInQueryAnalysis(analysis);
}

Expand Down
Loading

0 comments on commit a6d3771

Please sign in to comment.