Skip to content

Commit

Permalink
Execute optimization, segmentation and local planning for call distri…
Browse files Browse the repository at this point in the history
…buted procedure
  • Loading branch information
hantangwangd committed Nov 3, 2024
1 parent a6d3771 commit 5fb19a6
Show file tree
Hide file tree
Showing 45 changed files with 1,095 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.drift.annotations.ThriftField;
import com.facebook.drift.annotations.ThriftStruct;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;

import javax.annotation.concurrent.Immutable;
Expand Down Expand Up @@ -58,8 +59,12 @@ public static QualifiedObjectName valueOf(String catalogName, String schemaName,
return new QualifiedObjectName(catalogName, schemaName, objectName.toLowerCase(ENGLISH));
}

@JsonCreator
@ThriftConstructor
public QualifiedObjectName(String catalogName, String schemaName, String objectName)
public QualifiedObjectName(
@JsonProperty("catalogName") String catalogName,
@JsonProperty("schemaName") String schemaName,
@JsonProperty("objectName") String objectName)
{
checkLowerCase(catalogName, "catalogName");
checkLowerCase(schemaName, "schemaName");
Expand All @@ -75,18 +80,21 @@ public CatalogSchemaName getCatalogSchemaName()
}

@ThriftField(1)
@JsonProperty("catalogName")
public String getCatalogName()
{
return catalogName;
}

@ThriftField(2)
@JsonProperty("schemaName")
public String getSchemaName()
{
return schemaName;
}

@ThriftField(3)
@JsonProperty("objectName")
public String getObjectName()
{
return objectName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.facebook.presto.execution.scheduler;

import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.metadata.DistributedProcedureHandle;
import com.facebook.presto.metadata.InsertTableHandle;
import com.facebook.presto.metadata.OutputTableHandle;
import com.facebook.presto.spi.SchemaTableName;
Expand All @@ -31,7 +33,9 @@
@JsonSubTypes.Type(value = ExecutionWriterTarget.InsertHandle.class, name = "InsertHandle"),
@JsonSubTypes.Type(value = ExecutionWriterTarget.DeleteHandle.class, name = "DeleteHandle"),
@JsonSubTypes.Type(value = ExecutionWriterTarget.RefreshMaterializedViewHandle.class, name = "RefreshMaterializedViewHandle"),
@JsonSubTypes.Type(value = ExecutionWriterTarget.UpdateHandle.class, name = "UpdateHandle")})
@JsonSubTypes.Type(value = ExecutionWriterTarget.UpdateHandle.class, name = "UpdateHandle"),
@JsonSubTypes.Type(value = ExecutionWriterTarget.ExecuteProcedureHandle.class, name = "TableExecuteHandle")
})
@SuppressWarnings({"EmptyClass", "ClassMayBeInterface"})
public abstract class ExecutionWriterTarget
{
Expand Down Expand Up @@ -204,4 +208,47 @@ public String toString()
return handle.toString();
}
}

public static class ExecuteProcedureHandle
extends ExecutionWriterTarget
{
private final DistributedProcedureHandle handle;
private final SchemaTableName schemaTableName;
private final QualifiedObjectName procedureName;

@JsonCreator
public ExecuteProcedureHandle(
@JsonProperty("handle") DistributedProcedureHandle handle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("procedureName") QualifiedObjectName procedureName)
{
this.handle = requireNonNull(handle, "handle is null");
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.procedureName = requireNonNull(procedureName, "procedureName is null");
}

@JsonProperty
public DistributedProcedureHandle getHandle()
{
return handle;
}

@JsonProperty
public SchemaTableName getSchemaTableName()
{
return schemaTableName;
}

@JsonProperty
public QualifiedObjectName getProcedureName()
{
return procedureName;
}

@Override
public String toString()
{
return handle.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.facebook.presto.Session;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget.ExecuteProcedureHandle;
import com.facebook.presto.metadata.AnalyzeTableHandle;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.TableLayoutResult;
Expand All @@ -34,6 +35,7 @@
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
import com.facebook.presto.sql.planner.plan.TableFinishNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode.CallDistributedProcedureTarget;
import com.facebook.presto.sql.planner.plan.TableWriterNode.WriterTarget;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -107,6 +109,17 @@ private static Optional<ExecutionWriterTarget> createWriterTarget(Optional<Table
TableWriterNode.RefreshMaterializedViewReference refresh = (TableWriterNode.RefreshMaterializedViewReference) target;
return Optional.of(new ExecutionWriterTarget.RefreshMaterializedViewHandle(metadata.beginRefreshMaterializedView(session, refresh.getHandle()), refresh.getSchemaTableName()));
}
if (target instanceof CallDistributedProcedureTarget) {
CallDistributedProcedureTarget callDistributedProcedureTarget = (CallDistributedProcedureTarget) target;
return Optional.of(new ExecuteProcedureHandle(
metadata.beginCallDistributedProcedure(
session,
callDistributedProcedureTarget.getProcedureName(),
callDistributedProcedureTarget.getSourceHandle().orElse(null),
callDistributedProcedureTarget.getProcedureArguments()),
callDistributedProcedureTarget.getSchemaTableName(),
callDistributedProcedureTarget.getProcedureName()));
}
throw new IllegalArgumentException("Unhandled target type: " + target.getClass().getSimpleName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,18 @@ public void finishDelete(Session session, TableHandle tableHandle, Collection<Sl
delegate.finishDelete(session, tableHandle, fragments);
}

@Override
public DistributedProcedureHandle beginCallDistributedProcedure(Session session, QualifiedObjectName procedureName, TableHandle tableHandle, Object[] arguments)
{
return delegate.beginCallDistributedProcedure(session, procedureName, tableHandle, arguments);
}

@Override
public void finishCallDistributedProcedure(Session session, DistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection<Slice> fragments)
{
delegate.finishCallDistributedProcedure(session, procedureHandle, procedureName, fragments);
}

@Override
public TableHandle beginUpdate(Session session, TableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.metadata;

import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

import static java.util.Objects.requireNonNull;

public final class DistributedProcedureHandle
{
private final ConnectorId connectorId;
private final ConnectorTransactionHandle transactionHandle;
private final ConnectorDistributedProcedureHandle connectorHandle;

@JsonCreator
public DistributedProcedureHandle(
@JsonProperty("connectorId") ConnectorId connectorId,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle,
@JsonProperty("connectorHandle") ConnectorDistributedProcedureHandle connectorHandle)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
this.connectorHandle = requireNonNull(connectorHandle, "connectorHandle is null");
}

@JsonProperty
public ConnectorId getConnectorId()
{
return connectorId;
}

@JsonProperty
public ConnectorTransactionHandle getTransactionHandle()
{
return transactionHandle;
}

@JsonProperty
public ConnectorDistributedProcedureHandle getConnectorHandle()
{
return connectorHandle;
}

@Override
public int hashCode()
{
return Objects.hash(connectorId, transactionHandle, connectorHandle);
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
DistributedProcedureHandle o = (DistributedProcedureHandle) obj;
return Objects.equals(this.connectorId, o.connectorId) &&
Objects.equals(this.transactionHandle, o.transactionHandle) &&
Objects.equals(this.connectorHandle, o.connectorHandle);
}

@Override
public String toString()
{
return connectorId + ":" + connectorHandle;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.metadata;

import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;

import javax.inject.Inject;

public class DistributedProcedureHandleJacksonModule
extends AbstractTypedJacksonModule<ConnectorDistributedProcedureHandle>
{
@Inject
public DistributedProcedureHandleJacksonModule(HandleResolver handleResolver)
{
super(ConnectorDistributedProcedureHandle.class,
handleResolver::getId,
handleResolver::getDistributedProcedureHandleClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void configure(Binder binder)
jsonBinder(binder).addModuleBinding().to(SplitJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(DistributedProcedureHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.connector.informationSchema.InformationSchemaHandleResolver;
import com.facebook.presto.connector.system.SystemHandleResolver;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
Expand Down Expand Up @@ -114,6 +115,11 @@ public String getId(ConnectorInsertTableHandle insertHandle)
return getId(insertHandle, MaterializedHandleResolver::getInsertTableHandleClass);
}

public String getId(ConnectorDistributedProcedureHandle distributedProcedureHandle)
{
return getId(distributedProcedureHandle, MaterializedHandleResolver::getDistributedProcedureHandleClass);
}

public String getId(ConnectorPartitioningHandle partitioningHandle)
{
return getId(partitioningHandle, MaterializedHandleResolver::getPartitioningHandleClass);
Expand Down Expand Up @@ -169,6 +175,11 @@ public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass(Str
return resolverFor(id).getInsertTableHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
}

public Class<? extends ConnectorDistributedProcedureHandle> getDistributedProcedureHandleClass(String id)
{
return resolverFor(id).getDistributedProcedureHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
}

public Class<? extends ConnectorPartitioningHandle> getPartitioningHandleClass(String id)
{
return resolverFor(id).getPartitioningHandleClass().orElseThrow(() -> new IllegalArgumentException("No resolver for " + id));
Expand Down Expand Up @@ -240,6 +251,7 @@ private static class MaterializedHandleResolver
private final Optional<Class<? extends ConnectorIndexHandle>> indexHandle;
private final Optional<Class<? extends ConnectorOutputTableHandle>> outputTableHandle;
private final Optional<Class<? extends ConnectorInsertTableHandle>> insertTableHandle;
private final Optional<Class<? extends ConnectorDistributedProcedureHandle>> distributedProcedureHandle;
private final Optional<Class<? extends ConnectorPartitioningHandle>> partitioningHandle;
private final Optional<Class<? extends ConnectorTransactionHandle>> transactionHandle;
private final Optional<Class<? extends ConnectorMetadataUpdateHandle>> metadataUpdateHandle;
Expand All @@ -256,6 +268,7 @@ public MaterializedHandleResolver(ConnectorHandleResolver resolver)
partitioningHandle = getHandleClass(resolver::getPartitioningHandleClass);
transactionHandle = getHandleClass(resolver::getTransactionHandleClass);
metadataUpdateHandle = getHandleClass(resolver::getMetadataUpdateHandleClass);
distributedProcedureHandle = getHandleClass(resolver::getDistributedProcedureHandleClass);
}

private static <T> Optional<Class<? extends T>> getHandleClass(Supplier<Class<? extends T>> callable)
Expand Down Expand Up @@ -303,6 +316,11 @@ public Optional<Class<? extends ConnectorInsertTableHandle>> getInsertTableHandl
return insertTableHandle;
}

public Optional<Class<? extends ConnectorDistributedProcedureHandle>> getDistributedProcedureHandleClass()
{
return distributedProcedureHandle;
}

public Optional<Class<? extends ConnectorPartitioningHandle>> getPartitioningHandleClass()
{
return partitioningHandle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,16 @@ public interface Metadata
*/
void finishDelete(Session session, TableHandle tableHandle, Collection<Slice> fragments);

/**
* Begin call distributed procedure
*/
DistributedProcedureHandle beginCallDistributedProcedure(Session session, QualifiedObjectName procedureName, TableHandle tableHandle, Object[] arguments);

/**
* Finish call distributed procedure
*/
void finishCallDistributedProcedure(Session session, DistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection<Slice> fragments);

/**
* Begin update query
*/
Expand Down
Loading

0 comments on commit 5fb19a6

Please sign in to comment.