From 5fb19a6bf68fc1fecaf5a730fe969fce6086b320 Mon Sep 17 00:00:00 2001 From: wangd Date: Mon, 9 Sep 2024 15:16:35 +0800 Subject: [PATCH] Execute optimization, segmentation and local planning for call distributed procedure --- .../presto/common/QualifiedObjectName.java | 10 +- .../scheduler/ExecutionWriterTarget.java | 49 +++++- .../execution/scheduler/TableWriteInfo.java | 13 ++ .../metadata/DelegatingMetadataManager.java | 12 ++ .../metadata/DistributedProcedureHandle.java | 87 ++++++++++ ...stributedProcedureHandleJacksonModule.java | 30 ++++ .../presto/metadata/HandleJsonModule.java | 1 + .../presto/metadata/HandleResolver.java | 18 ++ .../facebook/presto/metadata/Metadata.java | 10 ++ .../presto/metadata/MetadataManager.java | 35 ++++ .../presto/operator/TableWriterOperator.java | 14 +- .../presto/split/PageSinkManager.java | 9 + .../presto/split/PageSinkProvider.java | 3 + .../sql/planner/BasePlanFragmenter.java | 10 ++ .../sql/planner/GroupedExecutionTagger.java | 18 ++ .../sql/planner/LocalExecutionPlanner.java | 48 ++++++ .../presto/sql/planner/LogicalPlanner.java | 5 +- .../presto/sql/planner/OutputExtractor.java | 12 ++ .../presto/sql/planner/PlanOptimizers.java | 3 + .../sql/planner/SplitSourceFactory.java | 7 + .../planner/optimizations/AddExchanges.java | 25 ++- .../optimizations/AddLocalExchanges.java | 15 ++ .../optimizations/PropertyDerivations.java | 16 ++ .../PruneUnreferencedOutputs.java | 20 +++ .../optimizations/PushdownSubfields.java | 8 + .../optimizations/RewriteWriterTarget.java | 158 ++++++++++++++++++ .../StreamPropertyDerivations.java | 9 + .../planner/optimizations/SymbolMapper.java | 25 +++ .../UnaliasSymbolReferences.java | 9 + .../sql/planner/planPrinter/PlanPrinter.java | 9 + .../CallDistributedProcedureValidator.java | 66 ++++++++ .../sql/planner/sanity/PlanChecker.java | 3 +- .../sanity/ValidateDependenciesChecker.java | 10 ++ .../presto/testing/LocalQueryRunner.java | 3 +- .../presto/testing/TestingHandle.java | 3 +- .../presto/testing/TestingHandleResolver.java | 7 + .../facebook/presto/util/GraphvizPrinter.java | 9 + .../execution/TestExecuteProcedureHandle.java | 117 +++++++++++++ .../presto/metadata/AbstractMockMetadata.java | 12 ++ .../sql/planner/TestLogicalPlanner.java | 120 +++++++++++++ .../presto/spi/ConnectorHandleResolver.java | 5 + .../spi/connector/ConnectorMetadata.java | 24 +++ .../connector/ConnectorPageSinkProvider.java | 9 + .../ClassLoaderSafeConnectorMetadata.java | 22 +++ ...ssLoaderSafeConnectorPageSinkProvider.java | 9 + 45 files changed, 1095 insertions(+), 12 deletions(-) create mode 100644 presto-main/src/main/java/com/facebook/presto/metadata/DistributedProcedureHandle.java create mode 100644 presto-main/src/main/java/com/facebook/presto/metadata/DistributedProcedureHandleJacksonModule.java create mode 100644 presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/RewriteWriterTarget.java create mode 100644 presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/CallDistributedProcedureValidator.java create mode 100644 presto-main/src/test/java/com/facebook/presto/execution/TestExecuteProcedureHandle.java diff --git a/presto-common/src/main/java/com/facebook/presto/common/QualifiedObjectName.java b/presto-common/src/main/java/com/facebook/presto/common/QualifiedObjectName.java index a7a95a29620c..1eef175c9ec7 100644 --- a/presto-common/src/main/java/com/facebook/presto/common/QualifiedObjectName.java +++ b/presto-common/src/main/java/com/facebook/presto/common/QualifiedObjectName.java @@ -17,6 +17,7 @@ import com.facebook.drift.annotations.ThriftField; import com.facebook.drift.annotations.ThriftStruct; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonValue; import javax.annotation.concurrent.Immutable; @@ -58,8 +59,12 @@ public static QualifiedObjectName valueOf(String catalogName, String schemaName, return new QualifiedObjectName(catalogName, schemaName, objectName.toLowerCase(ENGLISH)); } + @JsonCreator @ThriftConstructor - public QualifiedObjectName(String catalogName, String schemaName, String objectName) + public QualifiedObjectName( + @JsonProperty("catalogName") String catalogName, + @JsonProperty("schemaName") String schemaName, + @JsonProperty("objectName") String objectName) { checkLowerCase(catalogName, "catalogName"); checkLowerCase(schemaName, "schemaName"); @@ -75,18 +80,21 @@ public CatalogSchemaName getCatalogSchemaName() } @ThriftField(1) + @JsonProperty("catalogName") public String getCatalogName() { return catalogName; } @ThriftField(2) + @JsonProperty("schemaName") public String getSchemaName() { return schemaName; } @ThriftField(3) + @JsonProperty("objectName") public String getObjectName() { return objectName; diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java index 248bfd2def3e..8ce6ce9904de 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/ExecutionWriterTarget.java @@ -14,6 +14,8 @@ package com.facebook.presto.execution.scheduler; +import com.facebook.presto.common.QualifiedObjectName; +import com.facebook.presto.metadata.DistributedProcedureHandle; import com.facebook.presto.metadata.InsertTableHandle; import com.facebook.presto.metadata.OutputTableHandle; import com.facebook.presto.spi.SchemaTableName; @@ -31,7 +33,9 @@ @JsonSubTypes.Type(value = ExecutionWriterTarget.InsertHandle.class, name = "InsertHandle"), @JsonSubTypes.Type(value = ExecutionWriterTarget.DeleteHandle.class, name = "DeleteHandle"), @JsonSubTypes.Type(value = ExecutionWriterTarget.RefreshMaterializedViewHandle.class, name = "RefreshMaterializedViewHandle"), - @JsonSubTypes.Type(value = ExecutionWriterTarget.UpdateHandle.class, name = "UpdateHandle")}) + @JsonSubTypes.Type(value = ExecutionWriterTarget.UpdateHandle.class, name = "UpdateHandle"), + @JsonSubTypes.Type(value = ExecutionWriterTarget.ExecuteProcedureHandle.class, name = "TableExecuteHandle") +}) @SuppressWarnings({"EmptyClass", "ClassMayBeInterface"}) public abstract class ExecutionWriterTarget { @@ -204,4 +208,47 @@ public String toString() return handle.toString(); } } + + public static class ExecuteProcedureHandle + extends ExecutionWriterTarget + { + private final DistributedProcedureHandle handle; + private final SchemaTableName schemaTableName; + private final QualifiedObjectName procedureName; + + @JsonCreator + public ExecuteProcedureHandle( + @JsonProperty("handle") DistributedProcedureHandle handle, + @JsonProperty("schemaTableName") SchemaTableName schemaTableName, + @JsonProperty("procedureName") QualifiedObjectName procedureName) + { + this.handle = requireNonNull(handle, "handle is null"); + this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.procedureName = requireNonNull(procedureName, "procedureName is null"); + } + + @JsonProperty + public DistributedProcedureHandle getHandle() + { + return handle; + } + + @JsonProperty + public SchemaTableName getSchemaTableName() + { + return schemaTableName; + } + + @JsonProperty + public QualifiedObjectName getProcedureName() + { + return procedureName; + } + + @Override + public String toString() + { + return handle.toString(); + } + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java index 5dd75ce2bb46..1590d6b67f72 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java @@ -16,6 +16,7 @@ import com.facebook.presto.Session; import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.ExecuteProcedureHandle; import com.facebook.presto.metadata.AnalyzeTableHandle; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.metadata.TableLayoutResult; @@ -34,6 +35,7 @@ import com.facebook.presto.sql.planner.plan.StatisticsWriterNode; import com.facebook.presto.sql.planner.plan.TableFinishNode; import com.facebook.presto.sql.planner.plan.TableWriterNode; +import com.facebook.presto.sql.planner.plan.TableWriterNode.CallDistributedProcedureTarget; import com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -107,6 +109,17 @@ private static Optional createWriterTarget(Optional fragments) + { + delegate.finishCallDistributedProcedure(session, procedureHandle, procedureName, fragments); + } + @Override public TableHandle beginUpdate(Session session, TableHandle tableHandle, List updatedColumns) { diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/DistributedProcedureHandle.java b/presto-main/src/main/java/com/facebook/presto/metadata/DistributedProcedureHandle.java new file mode 100644 index 000000000000..1d3776b3ecca --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/metadata/DistributedProcedureHandle.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.metadata; + +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public final class DistributedProcedureHandle +{ + private final ConnectorId connectorId; + private final ConnectorTransactionHandle transactionHandle; + private final ConnectorDistributedProcedureHandle connectorHandle; + + @JsonCreator + public DistributedProcedureHandle( + @JsonProperty("connectorId") ConnectorId connectorId, + @JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle, + @JsonProperty("connectorHandle") ConnectorDistributedProcedureHandle connectorHandle) + { + this.connectorId = requireNonNull(connectorId, "connectorId is null"); + this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null"); + this.connectorHandle = requireNonNull(connectorHandle, "connectorHandle is null"); + } + + @JsonProperty + public ConnectorId getConnectorId() + { + return connectorId; + } + + @JsonProperty + public ConnectorTransactionHandle getTransactionHandle() + { + return transactionHandle; + } + + @JsonProperty + public ConnectorDistributedProcedureHandle getConnectorHandle() + { + return connectorHandle; + } + + @Override + public int hashCode() + { + return Objects.hash(connectorId, transactionHandle, connectorHandle); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + DistributedProcedureHandle o = (DistributedProcedureHandle) obj; + return Objects.equals(this.connectorId, o.connectorId) && + Objects.equals(this.transactionHandle, o.transactionHandle) && + Objects.equals(this.connectorHandle, o.connectorHandle); + } + + @Override + public String toString() + { + return connectorId + ":" + connectorHandle; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/DistributedProcedureHandleJacksonModule.java b/presto-main/src/main/java/com/facebook/presto/metadata/DistributedProcedureHandleJacksonModule.java new file mode 100644 index 000000000000..ff9cd22861ad --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/metadata/DistributedProcedureHandleJacksonModule.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.metadata; + +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; + +import javax.inject.Inject; + +public class DistributedProcedureHandleJacksonModule + extends AbstractTypedJacksonModule +{ + @Inject + public DistributedProcedureHandleJacksonModule(HandleResolver handleResolver) + { + super(ConnectorDistributedProcedureHandle.class, + handleResolver::getId, + handleResolver::getDistributedProcedureHandleClass); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java b/presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java index 0c07b99aaab4..06bf912b4626 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java @@ -32,6 +32,7 @@ public void configure(Binder binder) jsonBinder(binder).addModuleBinding().to(SplitJacksonModule.class); jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class); jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class); + jsonBinder(binder).addModuleBinding().to(DistributedProcedureHandleJacksonModule.class); jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class); jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class); jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class); diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/HandleResolver.java b/presto-main/src/main/java/com/facebook/presto/metadata/HandleResolver.java index c6293cfb7472..4706e05dc533 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/HandleResolver.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/HandleResolver.java @@ -16,6 +16,7 @@ import com.facebook.presto.connector.informationSchema.InformationSchemaHandleResolver; import com.facebook.presto.connector.system.SystemHandleResolver; import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.ConnectorIndexHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; @@ -114,6 +115,11 @@ public String getId(ConnectorInsertTableHandle insertHandle) return getId(insertHandle, MaterializedHandleResolver::getInsertTableHandleClass); } + public String getId(ConnectorDistributedProcedureHandle distributedProcedureHandle) + { + return getId(distributedProcedureHandle, MaterializedHandleResolver::getDistributedProcedureHandleClass); + } + public String getId(ConnectorPartitioningHandle partitioningHandle) { return getId(partitioningHandle, MaterializedHandleResolver::getPartitioningHandleClass); @@ -169,6 +175,11 @@ public Class getInsertTableHandleClass(Str return resolverFor(id).getInsertTableHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id)); } + public Class getDistributedProcedureHandleClass(String id) + { + return resolverFor(id).getDistributedProcedureHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id)); + } + public Class getPartitioningHandleClass(String id) { return resolverFor(id).getPartitioningHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id)); @@ -240,6 +251,7 @@ private static class MaterializedHandleResolver private final Optional> indexHandle; private final Optional> outputTableHandle; private final Optional> insertTableHandle; + private final Optional> distributedProcedureHandle; private final Optional> partitioningHandle; private final Optional> transactionHandle; private final Optional> metadataUpdateHandle; @@ -256,6 +268,7 @@ public MaterializedHandleResolver(ConnectorHandleResolver resolver) partitioningHandle = getHandleClass(resolver::getPartitioningHandleClass); transactionHandle = getHandleClass(resolver::getTransactionHandleClass); metadataUpdateHandle = getHandleClass(resolver::getMetadataUpdateHandleClass); + distributedProcedureHandle = getHandleClass(resolver::getDistributedProcedureHandleClass); } private static Optional> getHandleClass(Supplier> callable) @@ -303,6 +316,11 @@ public Optional> getInsertTableHandl return insertTableHandle; } + public Optional> getDistributedProcedureHandleClass() + { + return distributedProcedureHandle; + } + public Optional> getPartitioningHandleClass() { return partitioningHandle; diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java b/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java index 579776256112..c4a7a30ac5e0 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java @@ -332,6 +332,16 @@ public interface Metadata */ void finishDelete(Session session, TableHandle tableHandle, Collection fragments); + /** + * Begin call distributed procedure + */ + DistributedProcedureHandle beginCallDistributedProcedure(Session session, QualifiedObjectName procedureName, TableHandle tableHandle, Object[] arguments); + + /** + * Finish call distributed procedure + */ + void finishCallDistributedProcedure(Session session, DistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection fragments); + /** * Begin update query */ diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java index 640023cbdfa1..af004fe93f9f 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java @@ -29,6 +29,7 @@ import com.facebook.presto.execution.QueryManager; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorMetadataUpdateHandle; @@ -972,6 +973,40 @@ public void finishDelete(Session session, TableHandle tableHandle, Collection fragments) + { + ConnectorId connectorId = procedureHandle.getConnectorId(); + ConnectorMetadata metadata = getMetadata(session, connectorId); + metadata.finishCallDistributedProcedure(session.toConnectorSession(connectorId), procedureHandle.getConnectorHandle(), procedureName, fragments); + } + @Override public TableHandle beginUpdate(Session session, TableHandle tableHandle, List updatedColumns) { diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TableWriterOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/TableWriterOperator.java index f0450403b598..984105c19fa8 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TableWriterOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TableWriterOperator.java @@ -27,6 +27,7 @@ import com.facebook.presto.execution.TaskMetadataContext; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.CreateHandle; +import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.ExecuteProcedureHandle; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.InsertHandle; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.RefreshMaterializedViewHandle; import com.facebook.presto.memory.context.LocalMemoryContext; @@ -119,8 +120,11 @@ public TableWriterOperatorFactory( this.metadataUpdaterManager = requireNonNull(metadataUpdaterManager, "metadataUpdaterManager is null"); this.taskMetadataContext = requireNonNull(taskMetadataContext, "taskMetadataContext is null"); checkArgument( - writerTarget instanceof CreateHandle || writerTarget instanceof InsertHandle || writerTarget instanceof RefreshMaterializedViewHandle, - "writerTarget must be CreateHandle or InsertHandle or RefreshMaterializedViewHandle"); + writerTarget instanceof CreateHandle || + writerTarget instanceof InsertHandle || + writerTarget instanceof RefreshMaterializedViewHandle || + writerTarget instanceof ExecuteProcedureHandle, + "writerTarget must be CreateHandle or InsertHandle or RefreshMaterializedViewHandle or TableExecuteHandle"); this.target = requireNonNull(writerTarget, "writerTarget is null"); this.session = session; this.statisticsAggregationOperatorFactory = requireNonNull(statisticsAggregationOperatorFactory, "statisticsAggregationOperatorFactory is null"); @@ -170,6 +174,9 @@ private ConnectorPageSink createPageSink() if (target instanceof RefreshMaterializedViewHandle) { return pageSinkManager.createPageSink(session, ((RefreshMaterializedViewHandle) target).getHandle(), pageSinkContextBuilder.build()); } + if (target instanceof ExecuteProcedureHandle) { + return pageSinkManager.createPageSink(session, ((ExecuteProcedureHandle) target).getHandle(), pageSinkContextBuilder.build()); + } throw new UnsupportedOperationException("Unhandled target type: " + target.getClass().getName()); } @@ -187,6 +194,9 @@ private static ConnectorId getConnectorId(ExecutionWriterTarget handle) return ((RefreshMaterializedViewHandle) handle).getHandle().getConnectorId(); } + if (handle instanceof ExecuteProcedureHandle) { + return ((ExecuteProcedureHandle) handle).getHandle().getConnectorId(); + } throw new UnsupportedOperationException("Unhandled target type: " + handle.getClass().getName()); } diff --git a/presto-main/src/main/java/com/facebook/presto/split/PageSinkManager.java b/presto-main/src/main/java/com/facebook/presto/split/PageSinkManager.java index aee08454aefc..2bbabcb6fe4f 100644 --- a/presto-main/src/main/java/com/facebook/presto/split/PageSinkManager.java +++ b/presto-main/src/main/java/com/facebook/presto/split/PageSinkManager.java @@ -14,6 +14,7 @@ package com.facebook.presto.split; import com.facebook.presto.Session; +import com.facebook.presto.metadata.DistributedProcedureHandle; import com.facebook.presto.metadata.InsertTableHandle; import com.facebook.presto.metadata.OutputTableHandle; import com.facebook.presto.spi.ConnectorId; @@ -62,6 +63,14 @@ public ConnectorPageSink createPageSink(Session session, InsertTableHandle table return providerFor(tableHandle.getConnectorId()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle(), pageSinkContext); } + @Override + public ConnectorPageSink createPageSink(Session session, DistributedProcedureHandle procedureHandle, PageSinkContext pageSinkContext) + { + // assumes connectorId and catalog are the same + ConnectorSession connectorSession = session.toConnectorSession(procedureHandle.getConnectorId()); + return providerFor(procedureHandle.getConnectorId()).createPageSink(procedureHandle.getTransactionHandle(), connectorSession, procedureHandle.getConnectorHandle(), pageSinkContext); + } + private ConnectorPageSinkProvider providerFor(ConnectorId connectorId) { ConnectorPageSinkProvider provider = pageSinkProviders.get(connectorId); diff --git a/presto-main/src/main/java/com/facebook/presto/split/PageSinkProvider.java b/presto-main/src/main/java/com/facebook/presto/split/PageSinkProvider.java index 3e4612787019..8da7105c7c04 100644 --- a/presto-main/src/main/java/com/facebook/presto/split/PageSinkProvider.java +++ b/presto-main/src/main/java/com/facebook/presto/split/PageSinkProvider.java @@ -14,6 +14,7 @@ package com.facebook.presto.split; import com.facebook.presto.Session; +import com.facebook.presto.metadata.DistributedProcedureHandle; import com.facebook.presto.metadata.InsertTableHandle; import com.facebook.presto.metadata.OutputTableHandle; import com.facebook.presto.spi.ConnectorPageSink; @@ -24,4 +25,6 @@ public interface PageSinkProvider ConnectorPageSink createPageSink(Session session, OutputTableHandle tableHandle, PageSinkContext pageSinkContext); ConnectorPageSink createPageSink(Session session, InsertTableHandle tableHandle, PageSinkContext pageSinkContext); + + ConnectorPageSink createPageSink(Session session, DistributedProcedureHandle procedureHandle, PageSinkContext pageSinkContext); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index eb3d4e7ff872..4a8c5da43625 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -38,6 +38,7 @@ import com.facebook.presto.spi.plan.ValuesNode; import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode; import com.facebook.presto.sql.planner.plan.MetadataDeleteNode; @@ -261,6 +262,15 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext context) + { + if (node.getPartitioningScheme().isPresent()) { + context.get().setDistribution(node.getPartitioningScheme().get().getPartitioning().getHandle(), metadata, session); + } + return context.defaultRewrite(node, context.get()); + } + @Override public PlanNode visitValues(ValuesNode node, RewriteContext context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/GroupedExecutionTagger.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/GroupedExecutionTagger.java index c13c5b8b6e81..ff9a3b779798 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/GroupedExecutionTagger.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/GroupedExecutionTagger.java @@ -24,11 +24,13 @@ import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; import com.facebook.presto.sql.planner.plan.JoinNode; import com.facebook.presto.sql.planner.plan.MergeJoinNode; import com.facebook.presto.sql.planner.plan.RowNumberNode; import com.facebook.presto.sql.planner.plan.TableWriterNode; +import com.facebook.presto.sql.planner.plan.TableWriterNode.CallDistributedProcedureTarget; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.WindowNode; import com.google.common.base.VerifyException; @@ -226,6 +228,22 @@ public GroupedExecutionTagger.GroupedExecutionProperties visitMarkDistinct(MarkD return GroupedExecutionTagger.GroupedExecutionProperties.notCapable(); } + @Override + public GroupedExecutionTagger.GroupedExecutionProperties visitCallDistributedProcedure(CallDistributedProcedureNode node, Void context) + { + GroupedExecutionTagger.GroupedExecutionProperties properties = node.getSource().accept(this, null); + boolean recoveryEligible = properties.isRecoveryEligible(); + CallDistributedProcedureTarget target = node.getTarget().orElseThrow(() -> new VerifyException("target is absent")); + recoveryEligible &= metadata.getConnectorCapabilities(session, target.getConnectorId()).contains(SUPPORTS_PAGE_SINK_COMMIT); + + return new GroupedExecutionTagger.GroupedExecutionProperties( + properties.isCurrentNodeCapable(), + properties.isSubTreeUseful(), + properties.getCapableTableScanNodes(), + properties.getTotalLifespans(), + recoveryEligible); + } + @Override public GroupedExecutionTagger.GroupedExecutionProperties visitTableWriter(TableWriterNode node, Void context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index f232d9a7c528..f55be9191538 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -36,6 +36,7 @@ import com.facebook.presto.execution.scheduler.ExecutionWriterTarget; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.CreateHandle; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.DeleteHandle; +import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.ExecuteProcedureHandle; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.InsertHandle; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.RefreshMaterializedViewHandle; import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.UpdateHandle; @@ -187,6 +188,7 @@ import com.facebook.presto.sql.planner.optimizations.IndexJoinOptimizer; import com.facebook.presto.sql.planner.plan.AbstractJoinNode; import com.facebook.presto.sql.planner.plan.AssignUniqueId; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.DeleteNode; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; @@ -2688,6 +2690,48 @@ public PhysicalOperation visitSemiJoin(SemiJoinNode node, LocalExecutionPlanCont return new PhysicalOperation(operator, outputMappings, context, probeSource); } + @Override + public PhysicalOperation visitCallDistributedProcedure(CallDistributedProcedureNode node, LocalExecutionPlanContext context) + { + // Set table writer count + if (node.getPartitioningScheme().isPresent()) { + context.setDriverInstanceCount(getTaskPartitionedWriterCount(session)); + } + else { + context.setDriverInstanceCount(getTaskWriterCount(session)); + } + + PhysicalOperation source = node.getSource().accept(this, context); + + ImmutableMap.Builder outputMapping = ImmutableMap.builder(); + outputMapping.put(node.getRowCountVariable(), ROW_COUNT_CHANNEL); + outputMapping.put(node.getFragmentVariable(), FRAGMENT_CHANNEL); + outputMapping.put(node.getTableCommitContextVariable(), CONTEXT_CHANNEL); + + List inputChannels = node.getColumns().stream() + .map(source::variableToChannel) + .collect(toImmutableList()); + List notNullChannelColumnNames = node.getColumns().stream() + .map(variable -> node.getNotNullColumnVariables().contains(variable) ? node.getColumnNames().get(source.variableToChannel(variable)) : null) + .collect(Collectors.toList()); + + OperatorFactory operatorFactory = new TableWriterOperatorFactory( + context.getNextOperatorId(), + node.getId(), + pageSinkManager, + metadataUpdaterManager, + context.getTaskMetadataContext(), + context.getTableWriteInfo().getWriterTarget().orElseThrow(() -> new VerifyException("writerTarget is absent")), + inputChannels, + notNullChannelColumnNames, + session, + new DevNullOperatorFactory(context.getNextOperatorId(), node.getId()), // statistics are not calculated + getVariableTypes(node.getOutputVariables()), + tableCommitContextCodec, + getPageSinkCommitStrategy()); + return new PhysicalOperation(operatorFactory, outputMapping.build(), context, source); + } + @Override public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context) { @@ -3462,6 +3506,10 @@ else if (target instanceof UpdateHandle) { metadata.finishUpdate(session, ((UpdateHandle) target).getHandle(), fragments); return Optional.empty(); } + else if (target instanceof ExecuteProcedureHandle) { + metadata.finishCallDistributedProcedure(session, ((ExecuteProcedureHandle) target).getHandle(), ((ExecuteProcedureHandle) target).getProcedureName(), fragments); + return Optional.empty(); + } else { throw new AssertionError("Unhandled target type: " + target.getClass().getName()); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java index ed87fffc05e5..01e7abc6b7ab 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java @@ -92,6 +92,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; @@ -237,7 +238,9 @@ private RelationPlan createCallDistributedProcedurePlan(Analysis analysis, Call .map(ColumnMetadata::getName) .collect(toImmutableList()); + Map columnHandleMap = metadata.getColumnHandles(session, targetTable); TableLayout tableLayout = metadata.getLayout(session, targetTable); + List columnHandles = columnNames.stream().map(columnHandleMap::get).collect(Collectors.toList()); List outputLayout = plan.getRoot().getOutputVariables(); Optional partitioningScheme = Optional.empty(); @@ -245,7 +248,7 @@ private RelationPlan createCallDistributedProcedurePlan(Analysis analysis, Call if (partitioningHandle.isPresent()) { List partitionFunctionArguments = new ArrayList<>(); tableLayout.getTablePartitioning().get().getPartitioningColumns().stream() - .mapToInt(columnNames::indexOf) + .mapToInt(columnHandles::indexOf) .mapToObj(outputLayout::get) .forEach(partitionFunctionArguments::add); partitioningScheme = Optional.of(new PartitioningScheme( diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/OutputExtractor.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/OutputExtractor.java index 172f4e6052c3..1c4867824d73 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/OutputExtractor.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/OutputExtractor.java @@ -17,6 +17,7 @@ import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; import com.facebook.presto.sql.planner.plan.SequenceNode; import com.facebook.presto.sql.planner.plan.TableWriterNode; @@ -62,6 +63,17 @@ public Void visitTableWriter(TableWriterNode node, Void context) return null; } + @Override + public Void visitCallDistributedProcedure(CallDistributedProcedureNode node, Void context) + { + TableWriterNode.WriterTarget writerTarget = node.getTarget().orElseThrow(() -> new VerifyException("target is absent")); + connectorId = writerTarget.getConnectorId(); + checkState(schemaTableName == null || schemaTableName.equals(writerTarget.getSchemaTableName()), + "cannot have more than a single create, insert or delete in a query"); + schemaTableName = writerTarget.getSchemaTableName(); + return null; + } + public Void visitSequence(SequenceNode node, Void context) { // Left children of sequence are ignored since they don't output anything diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java index 32184097720d..8a6b0a4d2e9a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java @@ -173,6 +173,7 @@ import com.facebook.presto.sql.planner.optimizations.ReplaceConstantVariableReferencesWithConstants; import com.facebook.presto.sql.planner.optimizations.ReplicateSemiJoinInDelete; import com.facebook.presto.sql.planner.optimizations.RewriteIfOverAggregation; +import com.facebook.presto.sql.planner.optimizations.RewriteWriterTarget; import com.facebook.presto.sql.planner.optimizations.SetFlatteningOptimizer; import com.facebook.presto.sql.planner.optimizations.ShardJoins; import com.facebook.presto.sql.planner.optimizations.SimplifyPlanWithEmptyInput; @@ -932,6 +933,8 @@ public PlanOptimizers( builder.add(new MetadataDeleteOptimizer(metadata)); + builder.add(new RewriteWriterTarget()); + // TODO: consider adding a formal final plan sanitization optimizer that prepares the plan for transmission/execution/logging // TODO: figure out how to improve the set flattening optimizer so that it can run at any point this.planningTimeOptimizers = builder.build(); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java index f1a33190ac10..1870ad076d3f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java @@ -39,6 +39,7 @@ import com.facebook.presto.split.SplitSource; import com.facebook.presto.split.SplitSourceProvider; import com.facebook.presto.sql.planner.plan.AssignUniqueId; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.DeleteNode; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; @@ -354,6 +355,12 @@ public Map visitTableWriter(TableWriterNode node, Conte return node.getSource().accept(this, context); } + @Override + public Map visitCallDistributedProcedure(CallDistributedProcedureNode node, Context context) + { + return node.getSource().accept(this, context); + } + @Override public Map visitTableWriteMerge(TableWriterMergeNode node, Context context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 9d677ac23247..cb680f6f322f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -54,6 +54,7 @@ import com.facebook.presto.sql.planner.TypeProvider; import com.facebook.presto.sql.planner.optimizations.PreferredProperties.PartitioningProperties; import com.facebook.presto.sql.planner.plan.ApplyNode; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.ChildReplacer; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; @@ -644,15 +645,31 @@ public PlanWithProperties visitTableScan(TableScanNode node, PreferredProperties } @Override - public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProperties preferredProperties) + public PlanWithProperties visitCallDistributedProcedure(CallDistributedProcedureNode node, PreferredProperties preferredProperties) { - PlanWithProperties source = accept(node.getSource(), preferredProperties); + return getTableWriterPlanWithProperties(node, preferredProperties, node.getPartitioningScheme(), node.getPartitioningScheme()); + } + @Override + public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProperties preferredProperties) + { Optional shufflePartitioningScheme = node.getTablePartitioningScheme(); if (!shufflePartitioningScheme.isPresent()) { shufflePartitioningScheme = node.getPreferredShufflePartitioningScheme(); } + return getTableWriterPlanWithProperties(node, preferredProperties, node.getTablePartitioningScheme(), shufflePartitioningScheme); + } + + private PlanWithProperties getTableWriterPlanWithProperties( + PlanNode node, + PreferredProperties preferredProperties, + Optional tablePartitioningScheme, + Optional shufflePartitioningScheme) + { + checkArgument(node instanceof TableWriterNode || node instanceof CallDistributedProcedureNode); + PlanWithProperties source = accept(node.getSources().get(0), preferredProperties); + if (!shufflePartitioningScheme.isPresent()) { if (scaleWriters) { shufflePartitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), source.getNode().getOutputVariables())); @@ -668,9 +685,9 @@ else if (redistributeWrites) { !(source.getProperties().isRefinedPartitioningOver(shufflePartitioningScheme.get().getPartitioning(), false, metadata, session) && canPushdownPartialMerge(source.getNode(), partialMergePushdownStrategy))) { PartitioningScheme exchangePartitioningScheme = shufflePartitioningScheme.get(); - if (node.getTablePartitioningScheme().isPresent() && isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled(session)) { + if (tablePartitioningScheme.isPresent() && isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled(session)) { int writerThreadsPerNode = getTaskPartitionedWriterCount(session); - int bucketCount = getBucketCount(node.getTablePartitioningScheme().get().getPartitioning().getHandle()); + int bucketCount = getBucketCount(tablePartitioningScheme.get().getPartitioning().getHandle()); int[] bucketToPartition = new int[bucketCount]; for (int i = 0; i < bucketCount; i++) { bucketToPartition[i] = i / writerThreadsPerNode; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java index 9ba87d5dfab7..bb5d8cdf63b1 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java @@ -39,6 +39,7 @@ import com.facebook.presto.sql.planner.TypeProvider; import com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties; import com.facebook.presto.sql.planner.plan.ApplyNode; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode; @@ -533,6 +534,20 @@ public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, StreamPrefe return planAndEnforceChildren(node, requiredProperties, requiredProperties); } + @Override + public PlanWithProperties visitCallDistributedProcedure(CallDistributedProcedureNode node, StreamPreferredProperties parentPreferences) + { + if (node.getPartitioningScheme().isPresent() && getTaskPartitionedWriterCount(session) == 1) { + return planAndEnforceChildren(node, singleStream(), defaultParallelism(session)); + } + + if (!node.getPartitioningScheme().isPresent() && getTaskWriterCount(session) == 1) { + return planAndEnforceChildren(node, singleStream(), defaultParallelism(session)); + } + + return planAndEnforceChildren(node, fixedParallelism(), fixedParallelism()); + } + // // Table Writer // diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java index b868ffac28a7..e4386b734f97 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java @@ -46,6 +46,7 @@ import com.facebook.presto.sql.planner.optimizations.ActualProperties.Global; import com.facebook.presto.sql.planner.plan.ApplyNode; import com.facebook.presto.sql.planner.plan.AssignUniqueId; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.DeleteNode; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; @@ -720,6 +721,21 @@ public ActualProperties visitTableWriter(TableWriterNode node, List inputProperties) + { + ActualProperties properties = Iterables.getOnlyElement(inputProperties); + + if (properties.isCoordinatorOnly()) { + return ActualProperties.builder() + .global(coordinatorSingleStreamPartition()) + .build(); + } + return ActualProperties.builder() + .global(properties.isSingleNode() ? singleStreamPartition() : arbitraryPartition()) + .build(); + } + @Override public ActualProperties visitTableWriteMerge(TableWriterMergeNode node, List inputProperties) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java index 08ea868f8d56..f10e8bcd7ab0 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java @@ -46,6 +46,7 @@ import com.facebook.presto.sql.planner.VariablesExtractor; import com.facebook.presto.sql.planner.plan.ApplyNode; import com.facebook.presto.sql.planner.plan.AssignUniqueId; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.DeleteNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode; @@ -773,6 +774,25 @@ public PlanNode visitTableWriteMerge(TableWriterMergeNode node, RewriteContext> context) + { + PlanNode source = context.rewrite(node.getSource(), ImmutableSet.copyOf(node.getSource().getOutputVariables())); + return new CallDistributedProcedureNode( + node.getSourceLocation(), + node.getId(), + node.getStatsEquivalentPlanNode(), + source, + node.getTarget(), + node.getRowCountVariable(), + node.getFragmentVariable(), + node.getTableCommitContextVariable(), + node.getColumns(), + node.getColumnNames(), + node.getNotNullColumnVariables(), + node.getPartitioningScheme()); + } + @Override public PlanNode visitStatisticsWriterNode(StatisticsWriterNode node, RewriteContext> context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PushdownSubfields.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PushdownSubfields.java index 302aaefd1907..987295b18352 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PushdownSubfields.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PushdownSubfields.java @@ -57,6 +57,7 @@ import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.sql.planner.TypeProvider; import com.facebook.presto.sql.planner.plan.ApplyNode; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode; import com.facebook.presto.sql.planner.plan.GroupIdNode; import com.facebook.presto.sql.planner.plan.IndexJoinNode; @@ -401,6 +402,13 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext conte node.getEnforcedConstraint()); } + @Override + public PlanNode visitCallDistributedProcedure(CallDistributedProcedureNode node, RewriteContext context) + { + context.get().variables.addAll(node.getColumns()); + return context.defaultRewrite(node, context.get()); + } + @Override public PlanNode visitTableWriter(TableWriterNode node, RewriteContext context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/RewriteWriterTarget.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/RewriteWriterTarget.java new file mode 100644 index 000000000000..e910f387c8ce --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/RewriteWriterTarget.java @@ -0,0 +1,158 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.optimizations; + +import com.facebook.presto.Session; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.VariableAllocator; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.plan.UnionNode; +import com.facebook.presto.spi.plan.ValuesNode; +import com.facebook.presto.sql.planner.TypeProvider; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; +import com.facebook.presto.sql.planner.plan.ExchangeNode; +import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; +import com.facebook.presto.sql.planner.plan.SimplePlanRewriter.RewriteContext; +import com.facebook.presto.sql.planner.plan.TableFinishNode; +import com.facebook.presto.sql.planner.plan.TableWriterNode.CallDistributedProcedureTarget; +import com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget; + +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static java.util.stream.Collectors.toSet; + +public class RewriteWriterTarget + implements PlanOptimizer +{ + public RewriteWriterTarget() + {} + + @Override + public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) + { + Rewriter rewriter = new Rewriter(); + PlanNode rewrittenPlan = SimplePlanRewriter.rewriteWith(rewriter, plan, Optional.empty()); + return PlanOptimizerResult.optimizerResult(rewrittenPlan, rewriter.isPlanChanged()); + } + + private class Rewriter + extends SimplePlanRewriter> + { + private boolean planChanged; + + public Rewriter() + {} + + @Override + public PlanNode visitCallDistributedProcedure(CallDistributedProcedureNode node, RewriteContext> context) + { + CallDistributedProcedureTarget callDistributedProcedureTarget = (CallDistributedProcedureTarget) getContextTarget(context); + return new CallDistributedProcedureNode( + node.getSourceLocation(), + node.getId(), + node.getSource(), + Optional.of(callDistributedProcedureTarget), + node.getRowCountVariable(), + node.getFragmentVariable(), + node.getTableCommitContextVariable(), + node.getColumns(), + node.getColumnNames(), + node.getNotNullColumnVariables(), + node.getPartitioningScheme()); + } + + @Override + public PlanNode visitTableFinish(TableFinishNode node, RewriteContext> context) + { + PlanNode child = node.getSource(); + + Optional newTarget = getWriterTarget(child); + if (!newTarget.isPresent()) { + return node; + } + + planChanged = true; + child = context.rewrite(child, newTarget); + + return new TableFinishNode( + node.getSourceLocation(), + node.getId(), + child, + newTarget, + node.getRowCountVariable(), + node.getStatisticsAggregation(), + node.getStatisticsAggregationDescriptor()); + } + + public Optional getWriterTarget(PlanNode node) + { + if (node instanceof CallDistributedProcedureNode) { + Optional tableHandle = findTableHandleForCallDistributedProcedure(((CallDistributedProcedureNode) node).getSource()); + Optional callDistributedProcedureTarget = ((CallDistributedProcedureNode) node).getTarget(); + return !tableHandle.isPresent() ? callDistributedProcedureTarget.map(WriterTarget.class::cast) : + callDistributedProcedureTarget.map(target -> new CallDistributedProcedureTarget( + target.getProcedureName(), + target.getProcedureArguments(), + tableHandle, + target.getSchemaTableName())); + } + + if (node instanceof ExchangeNode || node instanceof UnionNode) { + Set> writerTargets = node.getSources().stream() + .map(this::getWriterTarget) + .collect(toSet()); + return getOnlyElement(writerTargets); + } + + return Optional.empty(); + } + + private Optional findTableHandleForCallDistributedProcedure(PlanNode startNode) + { + List tableScanNodes = PlanNodeSearcher.searchFrom(startNode) + .where(node -> node instanceof TableScanNode) + .findAll(); + + if (tableScanNodes.size() == 1) { + return Optional.of(((TableScanNode) tableScanNodes.get(0)).getTable()); + } + + List valuesNodes = PlanNodeSearcher.searchFrom(startNode) + .where(node -> node instanceof ValuesNode) + .findAll(); + + if (valuesNodes.size() == 1) { + return Optional.empty(); + } + + throw new IllegalArgumentException("Expected to find exactly one update target TableScanNode in plan but found: " + tableScanNodes); + } + + public boolean isPlanChanged() + { + return planChanged; + } + } + + private static WriterTarget getContextTarget(RewriteContext> context) + { + return context.get().orElseThrow(() -> new IllegalStateException("WriterTarget not present")); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java index 3f465f47a85f..307290ffd6d9 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java @@ -35,6 +35,7 @@ import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.sql.planner.plan.ApplyNode; import com.facebook.presto.sql.planner.plan.AssignUniqueId; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.DeleteNode; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; @@ -440,6 +441,14 @@ public StreamProperties visitDelete(DeleteNode node, List inpu return properties.withUnspecifiedPartitioning(); } + @Override + public StreamProperties visitCallDistributedProcedure(CallDistributedProcedureNode node, List inputProperties) + { + StreamProperties properties = Iterables.getOnlyElement(inputProperties); + // call distributed procedure only outputs the row count + return properties.withUnspecifiedPartitioning(); + } + @Override public StreamProperties visitTableWriter(TableWriterNode node, List inputProperties) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java index 5e7331b566e4..36d9527ed367 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java @@ -32,6 +32,7 @@ import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.sql.planner.Symbol; import com.facebook.presto.sql.planner.TypeProvider; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.StatisticAggregations; import com.facebook.presto.sql.planner.plan.StatisticAggregationsDescriptor; import com.facebook.presto.sql.planner.plan.StatisticsWriterNode; @@ -52,6 +53,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.stream.Collectors; import static com.facebook.presto.spi.StandardWarningCode.MULTIPLE_ORDER_BY; import static com.facebook.presto.spi.plan.AggregationNode.groupingSets; @@ -263,6 +265,29 @@ public TableWriterNode map(TableWriterNode node, PlanNode source, PlanNodeId new node.getIsTemporaryTableWriter()); } + public CallDistributedProcedureNode map(CallDistributedProcedureNode node, PlanNode source) + { + ImmutableList columns = node.getColumns().stream() + .map(this::map) + .collect(toImmutableList()); + Set notNullColumnVariables = node.getNotNullColumnVariables().stream() + .map(this::map) + .collect(toImmutableSet()); + + return new CallDistributedProcedureNode( + node.getSourceLocation(), + node.getId(), + source, + node.getTarget(), + node.getRowCountVariable(), + node.getFragmentVariable(), + node.getTableCommitContextVariable(), + columns, + columns.stream().map(VariableReferenceExpression::getName).collect(Collectors.toList()), + notNullColumnVariables, + node.getPartitioningScheme().map(partitioningScheme -> canonicalize(partitioningScheme, source))); + } + public StatisticsWriterNode map(StatisticsWriterNode node, PlanNode source) { return new StatisticsWriterNode( diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java index e6dc7e4c3020..f9c483def3cf 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java @@ -50,6 +50,7 @@ import com.facebook.presto.sql.planner.TypeProvider; import com.facebook.presto.sql.planner.plan.ApplyNode; import com.facebook.presto.sql.planner.plan.AssignUniqueId; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.DeleteNode; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; @@ -670,6 +671,14 @@ private static ImmutableList.Builder rewriteSources(SetOperationNode n return rewrittenSources; } + @Override + public PlanNode visitCallDistributedProcedure(CallDistributedProcedureNode node, RewriteContext context) + { + PlanNode source = context.rewrite(node.getSource()); + SymbolMapper mapper = new SymbolMapper(mapping, types, warningCollector); + return mapper.map(node, source); + } + @Override public PlanNode visitTableWriter(TableWriterNode node, RewriteContext context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java index 77dff87b9b33..0fe949b7176a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java @@ -72,6 +72,7 @@ import com.facebook.presto.sql.planner.plan.AbstractJoinNode; import com.facebook.presto.sql.planner.plan.ApplyNode; import com.facebook.presto.sql.planner.plan.AssignUniqueId; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.DeleteNode; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; @@ -95,6 +96,7 @@ import com.facebook.presto.sql.planner.plan.TableFinishNode; import com.facebook.presto.sql.planner.plan.TableWriterMergeNode; import com.facebook.presto.sql.planner.plan.TableWriterNode; +import com.facebook.presto.sql.planner.plan.TableWriterNode.CallDistributedProcedureTarget; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.UnnestNode; import com.facebook.presto.sql.planner.plan.WindowNode; @@ -1179,6 +1181,13 @@ public Void visitStatisticsWriterNode(StatisticsWriterNode node, Void context) return processChildren(node, context); } + @Override + public Void visitCallDistributedProcedure(CallDistributedProcedureNode node, Void context) + { + addNode(node, "CallDistributedProcedure", format("[%s]", node.getTarget().map(CallDistributedProcedureTarget::getProcedureName).orElse(null))); + return processChildren(node, context); + } + @Override public Void visitTableFinish(TableFinishNode node, Void context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/CallDistributedProcedureValidator.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/CallDistributedProcedureValidator.java new file mode 100644 index 000000000000..ea4a302ecebb --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/CallDistributedProcedureValidator.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.sanity; + +import com.facebook.presto.Session; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.OutputNode; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.ProjectNode; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.plan.ValuesNode; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; +import com.facebook.presto.sql.planner.plan.ExchangeNode; +import com.facebook.presto.sql.planner.plan.TableFinishNode; + +import java.util.Optional; + +import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; + +public final class CallDistributedProcedureValidator + implements PlanChecker.Checker +{ + @Override + public void validate(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector) + { + Optional callDistributedProcedureNode = searchFrom(planNode) + .where(node -> node instanceof CallDistributedProcedureNode) + .findFirst(); + + if (!callDistributedProcedureNode.isPresent()) { + // not a call distributed procedure plan + return; + } + + searchFrom(planNode) + .findAll() + .forEach(node -> { + if (!isAllowedNode(node)) { + throw new IllegalStateException("Unexpected " + node.getClass().getSimpleName() + " found in plan; probably connector was not able to handle provided WHERE expression"); + } + }); + } + + private boolean isAllowedNode(PlanNode node) + { + return node instanceof TableScanNode + || node instanceof ValuesNode + || node instanceof ProjectNode + || node instanceof CallDistributedProcedureNode + || node instanceof OutputNode + || node instanceof ExchangeNode + || node instanceof TableFinishNode; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java index 20495092f914..184fd5e15864 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java @@ -74,7 +74,8 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode, PlanC new VerifyNoIntermediateFormExpression(), new VerifyProjectionLocality(), new DynamicFiltersChecker(), - new WarnOnScanWithoutPartitionPredicate(featuresConfig)); + new WarnOnScanWithoutPartitionPredicate(featuresConfig), + new CallDistributedProcedureValidator()); if (featuresConfig.isNativeExecutionEnabled() && (featuresConfig.isDisableTimeStampWithTimeZoneForNative() || featuresConfig.isDisableIPAddressForNative())) { builder.put(Stage.INTERMEDIATE, new CheckUnsupportedPrestissimoTypes(featuresConfig)); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateDependenciesChecker.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateDependenciesChecker.java index b68c1ab2845e..8f8cd1f2636e 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateDependenciesChecker.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateDependenciesChecker.java @@ -43,6 +43,7 @@ import com.facebook.presto.sql.planner.optimizations.WindowNodeUtil; import com.facebook.presto.sql.planner.plan.ApplyNode; import com.facebook.presto.sql.planner.plan.AssignUniqueId; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.DeleteNode; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; @@ -608,6 +609,15 @@ public Void visitExchange(ExchangeNode node, Set bo return null; } + @Override + public Void visitCallDistributedProcedure(CallDistributedProcedureNode node, Set boundVariables) + { + PlanNode source = node.getSource(); + source.accept(this, boundVariables); // visit child + + return null; + } + @Override public Void visitTableWriter(TableWriterNode node, Set boundVariables) { diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index 95f71bc4c00d..36b697e47a58 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -442,7 +442,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, new TablePropertyManager(), new ColumnPropertyManager(), new AnalyzePropertyManager(), - transactionManager); + transactionManager, + procedureRegistry); this.splitManager = new SplitManager(metadata, new QueryManagerConfig(), nodeSchedulerConfig); this.planCheckerProviderManager = new PlanCheckerProviderManager(new JsonCodecSimplePlanFragmentSerde(jsonCodec(SimplePlanFragment.class))); this.distributedPlanChecker = new PlanChecker(featuresConfig, false, planCheckerProviderManager); diff --git a/presto-main/src/main/java/com/facebook/presto/testing/TestingHandle.java b/presto-main/src/main/java/com/facebook/presto/testing/TestingHandle.java index 34de904e9068..40294ba5dcb0 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/TestingHandle.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/TestingHandle.java @@ -13,12 +13,13 @@ */ package com.facebook.presto.testing; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorTableLayoutHandle; public enum TestingHandle - implements ConnectorOutputTableHandle, ConnectorInsertTableHandle, ConnectorTableLayoutHandle + implements ConnectorOutputTableHandle, ConnectorInsertTableHandle, ConnectorTableLayoutHandle, ConnectorDistributedProcedureHandle { INSTANCE } diff --git a/presto-main/src/main/java/com/facebook/presto/testing/TestingHandleResolver.java b/presto-main/src/main/java/com/facebook/presto/testing/TestingHandleResolver.java index 4da22baca8df..9a4c593add2c 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/TestingHandleResolver.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/TestingHandleResolver.java @@ -14,6 +14,7 @@ package com.facebook.presto.testing; import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorMetadataUpdateHandle; @@ -64,6 +65,12 @@ public Class getInsertTableHandleClass() return TestingHandle.class; } + @Override + public Class getDistributedProcedureHandleClass() + { + return TestingHandle.class; + } + @Override public Class getTransactionHandleClass() { diff --git a/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java b/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java index d3937d02c0f7..8fdc28b5368d 100644 --- a/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java @@ -43,6 +43,7 @@ import com.facebook.presto.sql.planner.plan.AbstractJoinNode; import com.facebook.presto.sql.planner.plan.ApplyNode; import com.facebook.presto.sql.planner.plan.AssignUniqueId; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.DeleteNode; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; @@ -65,6 +66,7 @@ import com.facebook.presto.sql.planner.plan.TableFinishNode; import com.facebook.presto.sql.planner.plan.TableWriterMergeNode; import com.facebook.presto.sql.planner.plan.TableWriterNode; +import com.facebook.presto.sql.planner.plan.TableWriterNode.CallDistributedProcedureTarget; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.UnnestNode; import com.facebook.presto.sql.planner.plan.WindowNode; @@ -293,6 +295,13 @@ public Void visitSequence(SequenceNode node, Void context) return null; } + @Override + public Void visitCallDistributedProcedure(CallDistributedProcedureNode node, Void context) + { + printNode(node, format("CallDistributedProcedure[%s]", node.getTarget().map(CallDistributedProcedureTarget::getProcedureName).orElse(null)), NODE_COLORS.get(NodeType.TABLE_WRITER)); + return node.getSource().accept(this, context); + } + @Override public Void visitTableWriter(TableWriterNode node, Void context) { diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestExecuteProcedureHandle.java b/presto-main/src/test/java/com/facebook/presto/execution/TestExecuteProcedureHandle.java new file mode 100644 index 000000000000..3eba2734147e --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestExecuteProcedureHandle.java @@ -0,0 +1,117 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.execution; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.json.JsonModule; +import com.facebook.presto.common.QualifiedObjectName; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.ExecuteProcedureHandle; +import com.facebook.presto.metadata.DistributedProcedureHandle; +import com.facebook.presto.metadata.FunctionAndTypeManager; +import com.facebook.presto.metadata.HandleJsonModule; +import com.facebook.presto.metadata.HandleResolver; +import com.facebook.presto.server.SliceDeserializer; +import com.facebook.presto.server.SliceSerializer; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.sql.Serialization; +import com.facebook.presto.sql.analyzer.FeaturesConfig; +import com.facebook.presto.sql.parser.SqlParser; +import com.facebook.presto.sql.tree.Expression; +import com.facebook.presto.sql.tree.FunctionCall; +import com.facebook.presto.testing.TestingHandle; +import com.facebook.presto.testing.TestingHandleResolver; +import com.facebook.presto.testing.TestingTransactionHandle; +import com.facebook.presto.type.TypeDeserializer; +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import io.airlift.slice.Slice; +import org.testng.annotations.Test; + +import java.util.UUID; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; +import static com.facebook.airlift.json.JsonBinder.jsonBinder; +import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static com.facebook.presto.metadata.FunctionAndTypeManager.createTestFunctionAndTypeManager; +import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static org.testng.Assert.assertEquals; + +public class TestExecuteProcedureHandle +{ + @Test + public void testExecuteProcedureHandleRoundTrip() + { + String catalogName = "test_catalog"; + JsonCodec codec = createJsonCodec(catalogName); + UUID uuid = UUID.randomUUID(); + ExecuteProcedureHandle expected = createExecuteProcedureHandle(catalogName, uuid); + ExecuteProcedureHandle actual = codec.fromJson(codec.toJson(expected)); + + assertEquals(actual.getProcedureName(), expected.getProcedureName()); + assertEquals(actual.getSchemaTableName(), expected.getSchemaTableName()); + assertEquals(actual.getHandle().getClass(), expected.getHandle().getClass()); + assertEquals(actual.getHandle().getConnectorId(), expected.getHandle().getConnectorId()); + assertEquals(actual.getHandle().getTransactionHandle(), expected.getHandle().getTransactionHandle()); + assertEquals(actual.getHandle().getConnectorHandle(), expected.getHandle().getConnectorHandle()); + } + + private static JsonCodec createJsonCodec(String catalogName) + { + Module module = binder -> { + SqlParser sqlParser = new SqlParser(); + FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager(); + binder.install(new JsonModule()); + binder.install(new HandleJsonModule()); + binder.bind(SqlParser.class).toInstance(sqlParser); + binder.bind(TypeManager.class).toInstance(functionAndTypeManager); + configBinder(binder).bindConfig(FeaturesConfig.class); + newSetBinder(binder, Type.class); + jsonBinder(binder).addSerializerBinding(Slice.class).to(SliceSerializer.class); + jsonBinder(binder).addDeserializerBinding(Slice.class).to(SliceDeserializer.class); + jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class); + jsonBinder(binder).addSerializerBinding(Expression.class).to(Serialization.ExpressionSerializer.class); + jsonBinder(binder).addDeserializerBinding(Expression.class).to(Serialization.ExpressionDeserializer.class); + jsonBinder(binder).addDeserializerBinding(FunctionCall.class).to(Serialization.FunctionCallDeserializer.class); + jsonBinder(binder).addKeySerializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionSerializer.class); + jsonBinder(binder).addKeyDeserializerBinding(VariableReferenceExpression.class).to(Serialization.VariableReferenceExpressionDeserializer.class); + jsonCodecBinder(binder).bindJsonCodec(ExecuteProcedureHandle.class); + }; + Bootstrap app = new Bootstrap(ImmutableList.of(module)); + Injector injector = app + .doNotInitializeLogging() + .quiet() + .initialize(); + injector.getInstance(HandleResolver.class) + .addConnectorName(catalogName, new TestingHandleResolver()); + return injector.getInstance(new Key>() {}); + } + + private static ExecuteProcedureHandle createExecuteProcedureHandle(String catalogName, UUID uuid) + { + DistributedProcedureHandle distributedProcedureHandle = new DistributedProcedureHandle( + new ConnectorId(catalogName), + new TestingTransactionHandle(uuid), + TestingHandle.INSTANCE); + return new ExecuteProcedureHandle(distributedProcedureHandle, + new SchemaTableName("schema1", "table1"), + QualifiedObjectName.valueOf(catalogName, "schema1", "table1")); + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java b/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java index f7e388e8e001..c4365ce7f30b 100644 --- a/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java +++ b/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java @@ -428,6 +428,18 @@ public void finishDelete(Session session, TableHandle tableHandle, Collection fragments) + { + throw new UnsupportedOperationException(); + } + @Override public TableHandle beginUpdate(Session session, TableHandle tableHandle, List updatedColumns) { diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java index 1a967e51a145..512b82d8680b 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java @@ -15,9 +15,18 @@ import com.facebook.presto.Session; import com.facebook.presto.common.block.SortOrder; +import com.facebook.presto.execution.TestingPageSourceProvider; import com.facebook.presto.functionNamespace.FunctionNamespaceManagerPlugin; import com.facebook.presto.functionNamespace.json.JsonFileBasedFunctionNamespaceManagerFactory; +import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorContext; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.facebook.presto.spi.plan.AggregationNode; import com.facebook.presto.spi.plan.DistinctLimitNode; import com.facebook.presto.spi.plan.FilterNode; @@ -28,7 +37,10 @@ import com.facebook.presto.spi.plan.TableScanNode; import com.facebook.presto.spi.plan.TopNNode; import com.facebook.presto.spi.plan.ValuesNode; +import com.facebook.presto.spi.procedure.DistributedProcedure; +import com.facebook.presto.spi.procedure.Procedure; import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.spi.transaction.IsolationLevel; import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType; import com.facebook.presto.sql.planner.assertions.BasePlanTest; import com.facebook.presto.sql.planner.assertions.ExpressionMatcher; @@ -37,6 +49,7 @@ import com.facebook.presto.sql.planner.optimizations.AddLocalExchanges; import com.facebook.presto.sql.planner.optimizations.PlanOptimizer; import com.facebook.presto.sql.planner.plan.ApplyNode; +import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode; import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode; import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.IndexJoinNode; @@ -44,8 +57,12 @@ import com.facebook.presto.sql.planner.plan.LateralJoinNode; import com.facebook.presto.sql.planner.plan.SemiJoinNode; import com.facebook.presto.sql.planner.plan.StatisticsWriterNode; +import com.facebook.presto.sql.planner.plan.TableFinishNode; import com.facebook.presto.sql.tree.LongLiteral; import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.testing.TestingHandleResolver; +import com.facebook.presto.testing.TestingMetadata; +import com.facebook.presto.testing.TestingSplitManager; import com.facebook.presto.tests.QueryTemplate; import com.facebook.presto.util.MorePredicates; import com.google.common.collect.ImmutableList; @@ -53,8 +70,12 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; @@ -76,6 +97,7 @@ import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_LAST; import static com.facebook.presto.common.predicate.Domain.singleValue; import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.StandardTypes.VARCHAR; import static com.facebook.presto.common.type.VarcharType.createVarcharType; import static com.facebook.presto.spi.StandardErrorCode.INVALID_LIMIT_CLAUSE; import static com.facebook.presto.spi.plan.AggregationNode.Step.FINAL; @@ -86,6 +108,8 @@ import static com.facebook.presto.spi.plan.JoinType.INNER; import static com.facebook.presto.spi.plan.JoinType.LEFT; import static com.facebook.presto.spi.plan.JoinType.RIGHT; +import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA; +import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME; import static com.facebook.presto.sql.Optimizer.PlanStage.OPTIMIZED; import static com.facebook.presto.sql.Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED; import static com.facebook.presto.sql.TestExpressionInterpreter.AVG_UDAF_CPP; @@ -151,8 +175,104 @@ public class TestLogicalPlanner public void setup() { setupJsonFunctionNamespaceManager(this.getQueryRunner()); + + // Register catalog `test` with a distributed procedure `distributed_fun` + this.getQueryRunner().createCatalog("test", + new ConnectorFactory() + { + @Override + public String getName() + { + return "test"; + } + + @Override + public ConnectorHandleResolver getHandleResolver() + { + return new TestingHandleResolver(); + } + + @Override + public Connector create(String catalogName, Map config, ConnectorContext context) + { + List arguments = new ArrayList<>(); + arguments.add(new Procedure.Argument(SCHEMA, VARCHAR)); + arguments.add(new Procedure.Argument(TABLE_NAME, VARCHAR)); + Set procedures = new HashSet<>(); + procedures.add(new DistributedProcedure("system", "distributed_fun", + arguments, + (session, transactionContext, procedureHandle, fragments) -> null, + (transactionContext, procedureHandle, fragments) -> {})); + + return new Connector() + { + private final ConnectorMetadata metadata = new TestingMetadata(); + + @Override + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) + { + return new ConnectorTransactionHandle() + {}; + } + + @Override + public ConnectorPageSourceProvider getPageSourceProvider() + { + return new TestingPageSourceProvider(); + } + + @Override + public ConnectorMetadata getMetadata(ConnectorTransactionHandle transaction) + { + return metadata; + } + + @Override + public ConnectorSplitManager getSplitManager() + { + return new TestingSplitManager(ImmutableList.of()); + } + + @Override + public Set getProcedures() + { + return procedures; + } + }; + } + }, ImmutableMap.of()); } + @Test + public void testCallDistributedProcedure() + { + Session session = getQueryRunner().getDefaultSession(); + + // Call non-existed distributed procedure + assertPlanFailedWithException("call test.system.no_fun('a', 'b')", session, + format("Distributed procedure not registered: test.system.no_fun", "test", "system", "no_fun")); + + // Call distributed procedure on non-existed target table + assertPlanFailedWithException("call test.system.distributed_fun('tiny', 'notable')", session, + format("Table %s.%s.%s does not exist", session.getCatalog().get(), "tiny", "notable")); + + // Call distributed procedure on partitioned target table + assertDistributedPlan("call test.system.distributed_fun('tiny', 'orders')", + anyTree(node(TableFinishNode.class, + exchange(REMOTE_STREAMING, GATHER, + node(CallDistributedProcedureNode.class, + exchange(LOCAL, GATHER, + tableScan("orders"))))))); + + // Call distributed procedure on unPartitioned target table + assertDistributedPlan("call test.system.distributed_fun('tiny', 'customer')", + anyTree(node(TableFinishNode.class, + exchange(REMOTE_STREAMING, GATHER, + node(CallDistributedProcedureNode.class, + exchange(LOCAL, GATHER, + exchange(REMOTE_STREAMING, REPARTITION, + tableScan("customer")))))))); + } @Test public void testAnalyze() { diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorHandleResolver.java b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorHandleResolver.java index 1e83bb99c47c..20d244382545 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorHandleResolver.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorHandleResolver.java @@ -41,6 +41,11 @@ default Class getInsertTableHandleClass() throw new UnsupportedOperationException(); } + default Class getDistributedProcedureHandleClass() + { + throw new UnsupportedOperationException(); + } + default Class getPartitioningHandleClass() { throw new UnsupportedOperationException(); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java index 313500319bd3..30b13d6525a1 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java @@ -13,10 +13,12 @@ */ package com.facebook.presto.spi.connector; +import com.facebook.presto.common.QualifiedObjectName; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.Type; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorMetadataUpdateHandle; import com.facebook.presto.spi.ConnectorNewTableLayout; @@ -546,6 +548,28 @@ default ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, Connec throw new PrestoException(NOT_SUPPORTED, "This connector does not support updates"); } + /** + * Begin call distributed procedure + */ + default ConnectorDistributedProcedureHandle beginCallDistributedProcedure( + ConnectorSession session, + QualifiedObjectName procedureName, + ConnectorTableLayoutHandle tableLayoutHandle, + Object[] arguments) + { + throw new PrestoException(NOT_SUPPORTED, "This connector does not support distributed procedure"); + } + + /** + * Finish call distributed procedure + * + * @param fragments all fragments returned by {@link com.facebook.presto.spi.UpdatablePageSource#finish()} + */ + default void finishCallDistributedProcedure(ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection fragments) + { + throw new PrestoException(NOT_SUPPORTED, "This connector does not support distributed procedure"); + } + /** * Begin delete query */ diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorPageSinkProvider.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorPageSinkProvider.java index f200da4d1326..f2551e461659 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorPageSinkProvider.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorPageSinkProvider.java @@ -13,15 +13,24 @@ */ package com.facebook.presto.spi.connector; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorPageSink; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.PageSinkContext; +import com.facebook.presto.spi.PrestoException; + +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; public interface ConnectorPageSinkProvider { ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, PageSinkContext pageSinkContext); ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, PageSinkContext pageSinkContext); + + default ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, PageSinkContext pageSinkContext) + { + throw new PrestoException(NOT_SUPPORTED, "This connector does not support distributed procedure"); + } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java index 73d696ca6a42..817479f47630 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java @@ -13,10 +13,12 @@ */ package com.facebook.presto.spi.connector.classloader; +import com.facebook.presto.common.QualifiedObjectName; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.Type; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorMetadataUpdateHandle; import com.facebook.presto.spi.ConnectorNewTableLayout; @@ -585,6 +587,26 @@ public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, Connect } } + @Override + public ConnectorDistributedProcedureHandle beginCallDistributedProcedure( + ConnectorSession session, + QualifiedObjectName procedureName, + ConnectorTableLayoutHandle tableLayoutHandle, + Object[] arguments) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.beginCallDistributedProcedure(session, procedureName, tableLayoutHandle, arguments); + } + } + + @Override + public void finishCallDistributedProcedure(ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection fragments) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + delegate.finishCallDistributedProcedure(session, procedureHandle, procedureName, fragments); + } + } + @Override public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) { diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkProvider.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkProvider.java index 19466998df65..ec5941171c65 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkProvider.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkProvider.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.spi.connector.classloader; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorPageSink; @@ -51,4 +52,12 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa return new ClassLoaderSafeConnectorPageSink(delegate.createPageSink(transactionHandle, session, insertTableHandle, pageSinkContext), classLoader); } } + + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, PageSinkContext pageSinkContext) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return new ClassLoaderSafeConnectorPageSink(delegate.createPageSink(transactionHandle, session, procedureHandle, pageSinkContext), classLoader); + } + } }