Skip to content

Commit

Permalink
Refactor connector spi to support call distributed procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed May 12, 2024
1 parent a9f33c1 commit 67d4964
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.procedure.IProcedureRegistry;
import com.facebook.presto.spi.relation.RowExpressionService;

import static java.util.Objects.requireNonNull;
Expand All @@ -32,6 +33,7 @@ public class ConnectorContextInstance
{
private final NodeManager nodeManager;
private final TypeManager typeManager;
private final IProcedureRegistry procedureRegistry;
private final FunctionMetadataManager functionMetadataManager;
private final StandardFunctionResolution functionResolution;
private final PageSorter pageSorter;
Expand All @@ -44,6 +46,7 @@ public class ConnectorContextInstance
public ConnectorContextInstance(
NodeManager nodeManager,
TypeManager typeManager,
IProcedureRegistry procedureRegistry,
FunctionMetadataManager functionMetadataManager,
StandardFunctionResolution functionResolution,
PageSorter pageSorter,
Expand All @@ -55,6 +58,7 @@ public ConnectorContextInstance(
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
this.functionMetadataManager = requireNonNull(functionMetadataManager, "functionMetadataManager is null");
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
Expand All @@ -77,6 +81,12 @@ public TypeManager getTypeManager()
return typeManager;
}

@Override
public IProcedureRegistry getProcedureRegistry()
{
return procedureRegistry;
}

@Override
public FunctionMetadataManager getFunctionMetadataManager()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTypeSerdeProvider;
import com.facebook.presto.spi.procedure.IProcedureRegistry;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.relation.DeterminismEvaluator;
import com.facebook.presto.spi.relation.DomainTranslator;
Expand Down Expand Up @@ -112,6 +113,7 @@ public class ConnectorManager
private final HandleResolver handleResolver;
private final InternalNodeManager nodeManager;
private final TypeManager typeManager;
private final IProcedureRegistry procedureRegistry;
private final PageSorter pageSorter;
private final PageIndexerFactory pageIndexerFactory;
private final NodeInfo nodeInfo;
Expand Down Expand Up @@ -148,6 +150,7 @@ public ConnectorManager(
InternalNodeManager nodeManager,
NodeInfo nodeInfo,
TypeManager typeManager,
IProcedureRegistry procedureRegistry,
PageSorter pageSorter,
PageIndexerFactory pageIndexerFactory,
TransactionManager transactionManager,
Expand All @@ -172,6 +175,7 @@ public ConnectorManager(
this.handleResolver = requireNonNull(handleResolver, "handleResolver is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null");
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
this.nodeInfo = requireNonNull(nodeInfo, "nodeInfo is null");
Expand Down Expand Up @@ -376,6 +380,7 @@ private Connector createConnector(ConnectorId connectorId, ConnectorFactory fact
ConnectorContext context = new ConnectorContextInstance(
new ConnectorAwareNodeManager(nodeManager, nodeInfo.getEnvironment(), connectorId),
typeManager,
procedureRegistry,
metadataManager.getFunctionAndTypeManager(),
new FunctionResolution(metadataManager.getFunctionAndTypeManager().getFunctionAndTypeResolver()),
pageSorter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.execution.TaskMetadataContext;
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget;
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.CreateHandle;
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.ExecuteProcedureHandle;
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.InsertHandle;
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.RefreshMaterializedViewHandle;
import com.facebook.presto.memory.context.LocalMemoryContext;
Expand Down Expand Up @@ -119,8 +120,11 @@ public TableWriterOperatorFactory(
this.metadataUpdaterManager = requireNonNull(metadataUpdaterManager, "metadataUpdaterManager is null");
this.taskMetadataContext = requireNonNull(taskMetadataContext, "taskMetadataContext is null");
checkArgument(
writerTarget instanceof CreateHandle || writerTarget instanceof InsertHandle || writerTarget instanceof RefreshMaterializedViewHandle,
"writerTarget must be CreateHandle or InsertHandle or RefreshMaterializedViewHandle");
writerTarget instanceof CreateHandle ||
writerTarget instanceof InsertHandle ||
writerTarget instanceof RefreshMaterializedViewHandle ||
writerTarget instanceof ExecuteProcedureHandle,
"writerTarget must be CreateHandle or InsertHandle or RefreshMaterializedViewHandle or TableExecuteHandle");
this.target = requireNonNull(writerTarget, "writerTarget is null");
this.session = session;
this.statisticsAggregationOperatorFactory = requireNonNull(statisticsAggregationOperatorFactory, "statisticsAggregationOperatorFactory is null");
Expand Down Expand Up @@ -170,6 +174,9 @@ private ConnectorPageSink createPageSink()
if (target instanceof RefreshMaterializedViewHandle) {
return pageSinkManager.createPageSink(session, ((RefreshMaterializedViewHandle) target).getHandle(), pageSinkContextBuilder.build());
}
if (target instanceof ExecuteProcedureHandle) {
return pageSinkManager.createPageSink(session, ((ExecuteProcedureHandle) target).getHandle(), pageSinkContextBuilder.build());
}
throw new UnsupportedOperationException("Unhandled target type: " + target.getClass().getName());
}

Expand All @@ -187,6 +194,9 @@ private static ConnectorId getConnectorId(ExecutionWriterTarget handle)
return ((RefreshMaterializedViewHandle) handle).getHandle().getConnectorId();
}

if (handle instanceof ExecuteProcedureHandle) {
return ((ExecuteProcedureHandle) handle).getHandle().getConnectorId();
}
throw new UnsupportedOperationException("Unhandled target type: " + handle.getClass().getName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
nodeManager,
nodeInfo,
metadata.getFunctionAndTypeManager(),
procedureRegistry,
pageSorter,
pageIndexerFactory,
transactionManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.procedure.IProcedureRegistry;
import com.facebook.presto.spi.relation.RowExpressionService;

public interface ConnectorContext
Expand All @@ -36,6 +37,11 @@ default TypeManager getTypeManager()
throw new UnsupportedOperationException();
}

default IProcedureRegistry getProcedureRegistry()
{
throw new UnsupportedOperationException();
}

default FunctionMetadataManager getFunctionMetadataManager()
{
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.spi.connector.classloader;

import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorPageSink;
Expand Down Expand Up @@ -51,4 +52,12 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
return new ClassLoaderSafeConnectorPageSink(delegate.createPageSink(transactionHandle, session, insertTableHandle, pageSinkContext), classLoader);
}
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, PageSinkContext pageSinkContext)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return new ClassLoaderSafeConnectorPageSink(delegate.createPageSink(transactionHandle, session, procedureHandle, pageSinkContext), classLoader);
}
}
}

0 comments on commit 67d4964

Please sign in to comment.