From 66c6d8640513a43dde6236daab779e48ae62eaec Mon Sep 17 00:00:00 2001 From: wangd Date: Tue, 11 Jun 2024 16:51:06 +0800 Subject: [PATCH] Refactor Iceberg connector to support call distributed procedure --- .../CallDistributedProcedureSplitSource.java | 135 ++++++++++++++++++ .../iceberg/IcebergAbstractMetadata.java | 81 +++++++++-- .../IcebergDistributedProcedureHandle.java | 112 +++++++++++++++ .../presto/iceberg/IcebergHandleResolver.java | 7 + .../presto/iceberg/IcebergHiveMetadata.java | 6 +- .../iceberg/IcebergHiveMetadataFactory.java | 6 +- .../presto/iceberg/IcebergNativeMetadata.java | 10 +- .../iceberg/IcebergNativeMetadataFactory.java | 6 +- .../iceberg/IcebergPageSinkProvider.java | 16 +++ .../presto/iceberg/IcebergSplitManager.java | 11 ++ .../iceberg/IcebergTransactionContext.java | 91 ++++++++++++ .../InternalIcebergConnectorFactory.java | 2 + .../connector/ConnectorContextInstance.java | 10 ++ .../presto/connector/ConnectorManager.java | 5 + .../presto/testing/LocalQueryRunner.java | 1 + .../spi/connector/ConnectorContext.java | 6 + 16 files changed, 485 insertions(+), 20 deletions(-) create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTransactionContext.java diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java new file mode 100644 index 0000000000000..72799f30d4948 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java @@ -0,0 +1,135 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.iceberg.delete.DeleteFile; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.SplitWeight; +import com.facebook.presto.spi.connector.ConnectorPartitionHandle; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Closer; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy; +import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat; +import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber; +import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys; +import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterators.limit; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedFuture; + +public class CallDistributedProcedureSplitSource + implements ConnectorSplitSource +{ + private CloseableIterator fileScanTaskIterator; + private Optional> fileScanTaskConsumer; + + private final TableScan tableScan; + private final Closer closer = Closer.create(); + private final double minimumAssignedSplitWeight; + private final ConnectorSession session; + + public CallDistributedProcedureSplitSource( + ConnectorSession session, + TableScan tableScan, + CloseableIterable fileScanTaskIterable, + Optional> fileScanTaskConsumer, + double minimumAssignedSplitWeight) + { + this.session = requireNonNull(session, "session is null"); + this.tableScan = requireNonNull(tableScan, "tableScan is null"); + this.fileScanTaskIterator = fileScanTaskIterable.iterator(); + this.fileScanTaskConsumer = requireNonNull(fileScanTaskConsumer, "fileScanTaskConsumer is null"); + this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; + closer.register(fileScanTaskIterator); + } + + @Override + public CompletableFuture getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize) + { + // TODO: move this to a background thread + List splits = new ArrayList<>(); + Iterator iterator = limit(fileScanTaskIterator, maxSize); + while (iterator.hasNext()) { + FileScanTask task = iterator.next(); + fileScanTaskConsumer.ifPresent(consumer -> consumer.accept(task)); + splits.add(toIcebergSplit(task)); + } + return completedFuture(new ConnectorSplitBatch(splits, isFinished())); + } + + @Override + public boolean isFinished() + { + return !fileScanTaskIterator.hasNext(); + } + + @Override + public void close() + { + try { + closer.close(); + // TODO: remove this after org.apache.iceberg.io.CloseableIterator'withClose + // correct release resources holds by iterator. + fileScanTaskIterator = CloseableIterator.empty(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private ConnectorSplit toIcebergSplit(FileScanTask task) + { + PartitionSpec spec = task.spec(); + Optional partitionData = partitionDataFromStructLike(spec, task.file().partition()); + + // TODO: We should leverage residual expression and convert that to TupleDomain. + // The predicate here is used by readers for predicate push down at reader level, + // so when we do not use residual expression, we are just wasting CPU cycles + // on reader side evaluating a condition that we know will always be true. + + return new IcebergSplit( + task.file().path().toString(), + task.start(), + task.length(), + fromIcebergFileFormat(task.file().format()), + ImmutableList.of(), + getPartitionKeys(task), + PartitionSpecParser.toJson(spec), + partitionData.map(PartitionData::toJson), + getNodeSelectionStrategy(session), + SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)), + task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()), + Optional.empty(), + getDataSequenceNumber(task.file())); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 75b6cdebe6c70..e86a17827771f 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -15,6 +15,7 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.QualifiedObjectName; import com.facebook.presto.common.Subfield; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.BigintType; @@ -27,10 +28,13 @@ import com.facebook.presto.iceberg.changelog.ChangelogUtil; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorNewTableLayout; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.ConnectorTableLayout; import com.facebook.presto.spi.ConnectorTableLayoutHandle; @@ -49,6 +53,9 @@ import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionOperator; import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionType; import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.procedure.DistributedProcedure; +import com.facebook.presto.spi.procedure.IProcedureRegistry; +import com.facebook.presto.spi.procedure.Procedure; import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.RowExpressionService; import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; @@ -165,10 +172,11 @@ public abstract class IcebergAbstractMetadata implements ConnectorMetadata { protected final TypeManager typeManager; + protected final IProcedureRegistry procedureRegistry; protected final JsonCodec commitTaskCodec; protected final NodeVersion nodeVersion; protected final RowExpressionService rowExpressionService; - protected Transaction transaction; + protected IcebergTransactionContext transactionContext; private final StandardFunctionResolution functionResolution; private final ConcurrentMap icebergTables = new ConcurrentHashMap<>(); @@ -176,12 +184,14 @@ public abstract class IcebergAbstractMetadata public IcebergAbstractMetadata( TypeManager typeManager, + IProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, NodeVersion nodeVersion) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); @@ -203,6 +213,11 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName public abstract void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName); + public Optional getSplitSourceInCurrentCallProcedureTransaction() + { + return transactionContext == null ? Optional.empty() : transactionContext.getConnectorSplitSource(); + } + /** * This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker */ @@ -432,7 +447,7 @@ public Optional finishCreateTable(ConnectorSession sess protected ConnectorInsertTableHandle beginIcebergTableInsert(IcebergTableHandle table, Table icebergTable) { - transaction = icebergTable.newTransaction(); + transactionContext = new IcebergTransactionContext(Optional.of(icebergTable), icebergTable.newTransaction()); return new IcebergWritableTableHandle( table.getSchemaName(), @@ -449,12 +464,13 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(IcebergTableHandle public Optional finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) { if (fragments.isEmpty()) { - transaction.commitTransaction(); + transactionContext.getTransaction().commitTransaction(); + transactionContext.destroy(); return Optional.empty(); } IcebergWritableTableHandle table = (IcebergWritableTableHandle) insertHandle; - Table icebergTable = transaction.table(); + Table icebergTable = transactionContext.getTransaction().table(); List commitTasks = fragments.stream() .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) @@ -465,7 +481,7 @@ public Optional finishInsert(ConnectorSession session, icebergTable.schema().findType(field.sourceId()))) .toArray(Type[]::new); - AppendFiles appendFiles = transaction.newFastAppend(); + AppendFiles appendFiles = transactionContext.getTransaction().newFastAppend(); for (CommitTaskData task : commitTasks) { DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) .withPath(task.getPath()) @@ -483,7 +499,8 @@ public Optional finishInsert(ConnectorSession session, } appendFiles.commit(); - transaction.commitTransaction(); + transactionContext.getTransaction().commitTransaction(); + transactionContext.destroy(); return Optional.of(new HiveWrittenPartitions(commitTasks.stream() .map(CommitTaskData::getPath) @@ -789,6 +806,44 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa removeScanFiles(icebergTable, TupleDomain.all()); } + @Override + public ConnectorDistributedProcedureHandle beginCallDistributedProcedure( + ConnectorSession session, + QualifiedObjectName procedureName, + ConnectorTableLayoutHandle tableLayoutHandle, + Object[] arguments) + { + IcebergTableHandle handle = ((IcebergTableLayoutHandle) tableLayoutHandle).getTable(); + Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); + + if (handle.isSnapshotSpecified()) { + throw new PrestoException(NOT_SUPPORTED, "This connector do not allow table execute at specified snapshot"); + } + + transactionContext = new IcebergTransactionContext(Optional.of(icebergTable), icebergTable.newTransaction()); + Procedure procedure = procedureRegistry.resolve( + new ConnectorId(procedureName.getCatalogName()), + new SchemaTableName( + procedureName.getSchemaName(), + procedureName.getObjectName())); + verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure"); + return ((DistributedProcedure) procedure).getBeginCallDistributedProcedure().begin(session, transactionContext, tableLayoutHandle, arguments); + } + + @Override + public void finishCallDistributedProcedure(ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection fragments) + { + Procedure procedure = procedureRegistry.resolve( + new ConnectorId(procedureName.getCatalogName()), + new SchemaTableName( + procedureName.getSchemaName(), + procedureName.getObjectName())); + verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure"); + ((DistributedProcedure) procedure).getFinishCallDistributedProcedure().finish(transactionContext, procedureHandle, fragments); + transactionContext.getTransaction().commitTransaction(); + transactionContext.destroy(); + } + @Override public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) { @@ -809,7 +864,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable } validateTableMode(session, icebergTable); - transaction = icebergTable.newTransaction(); + transactionContext = new IcebergTransactionContext(Optional.of(icebergTable), icebergTable.newTransaction()); return handle; } @@ -820,7 +875,7 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan IcebergTableHandle handle = (IcebergTableHandle) tableHandle; Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); - RowDelta rowDelta = transaction.newRowDelta(); + RowDelta rowDelta = transactionContext.getTransaction().newRowDelta(); List commitTasks = fragments.stream() .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) @@ -857,7 +912,8 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan } rowDelta.commit(); - transaction.commitTransaction(); + transactionContext.getTransaction().commitTransaction(); + transactionContext.destroy(); } @Override @@ -935,11 +991,12 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl */ private OptionalLong removeScanFiles(Table icebergTable, TupleDomain predicate) { - transaction = icebergTable.newTransaction(); - DeleteFiles deleteFiles = transaction.newDelete() + transactionContext = new IcebergTransactionContext(Optional.of(icebergTable), icebergTable.newTransaction()); + DeleteFiles deleteFiles = transactionContext.getTransaction().newDelete() .deleteFromRowFilter(toIcebergExpression(predicate)); deleteFiles.commit(); - transaction.commitTransaction(); + transactionContext.getTransaction().commitTransaction(); + transactionContext.destroy(); Map summary = icebergTable.currentSnapshot().summary(); long deletedRecords = Long.parseLong(summary.getOrDefault(DELETED_RECORDS_PROP, "0")); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java new file mode 100644 index 0000000000000..ee02893b2747a --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class IcebergDistributedProcedureHandle + implements ConnectorDistributedProcedureHandle +{ + private final String schemaName; + private final IcebergTableName tableName; + private final String schemaAsJson; + private final String partitionSpecAsJson; + private final List inputColumns; + private final String outputPath; + private final FileFormat fileFormat; + private final Map storageProperties; + + @JsonCreator + public IcebergDistributedProcedureHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") IcebergTableName tableName, + @JsonProperty("schemaAsJson") String schemaAsJson, + @JsonProperty("partitionSpecAsJson") String partitionSpecAsJson, + @JsonProperty("inputColumns") List inputColumns, + @JsonProperty("outputPath") String outputPath, + @JsonProperty("fileFormat") FileFormat fileFormat, + @JsonProperty("storageProperties") Map storageProperties) + { + this.schemaName = requireNonNull(schemaName, "schemaName is null"); + this.tableName = requireNonNull(tableName, "tableName is null"); + this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null"); + this.partitionSpecAsJson = requireNonNull(partitionSpecAsJson, "partitionSpecAsJson is null"); + this.inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null")); + this.outputPath = requireNonNull(outputPath, "filePrefix is null"); + this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); + this.storageProperties = requireNonNull(storageProperties, "storageProperties is null"); + } + + @JsonProperty + public String getSchemaName() + { + return schemaName; + } + + @JsonProperty + public IcebergTableName getTableName() + { + return tableName; + } + + @JsonProperty + public String getSchemaAsJson() + { + return schemaAsJson; + } + + @JsonProperty + public String getPartitionSpecAsJson() + { + return partitionSpecAsJson; + } + + @JsonProperty + public List getInputColumns() + { + return inputColumns; + } + + @JsonProperty + public String getOutputPath() + { + return outputPath; + } + + @JsonProperty + public FileFormat getFileFormat() + { + return fileFormat; + } + + @JsonProperty + public Map getStorageProperties() + { + return storageProperties; + } + + @Override + public String toString() + { + return schemaName + "." + tableName; + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java index 312c94d8cb0aa..1bd81045411b7 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java @@ -15,6 +15,7 @@ import com.facebook.presto.hive.HiveTransactionHandle; import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; @@ -62,6 +63,12 @@ public Class getInsertTableHandleClass() return IcebergWritableTableHandle.class; } + @Override + public Class getDistributedProcedureHandleClass() + { + return IcebergDistributedProcedureHandle.class; + } + @Override public Class getTransactionHandleClass() { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 1b6f34d17c7f7..d9e3393af8774 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -51,6 +51,7 @@ import com.facebook.presto.spi.ViewNotFoundException; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.RowExpressionService; import com.facebook.presto.spi.relation.VariableReferenceExpression; @@ -157,13 +158,14 @@ public IcebergHiveMetadata( ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, + IProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, NodeVersion nodeVersion, FilterStatsCalculatorService filterStatsCalculatorService) { - super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion); + super(typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion); this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null"); @@ -305,7 +307,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con FileFormat fileFormat = getFileFormat(tableMetadata.getProperties()); TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, populateTableProperties(tableMetadata, fileFormat, session)); - transaction = createTableTransaction(tableName, operations, metadata); + transactionContext = new IcebergTransactionContext(Optional.empty(), createTableTransaction(tableName, operations, metadata)); return new IcebergWritableTableHandle( schemaName, diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java index d69e9447772fd..e73c89a7626d7 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java @@ -21,6 +21,7 @@ import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import javax.inject.Inject; @@ -33,6 +34,7 @@ public class IcebergHiveMetadataFactory final ExtendedHiveMetastore metastore; final HdfsEnvironment hdfsEnvironment; final TypeManager typeManager; + final IProcedureRegistry procedureRegistry; final JsonCodec commitTaskCodec; final StandardFunctionResolution functionResolution; final RowExpressionService rowExpressionService; @@ -44,6 +46,7 @@ public IcebergHiveMetadataFactory( ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, + IProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -53,6 +56,7 @@ public IcebergHiveMetadataFactory( this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); @@ -62,6 +66,6 @@ public IcebergHiveMetadataFactory( public ConnectorMetadata create() { - return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService); + return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index 598f8427ff745..70aa09b40dac5 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -27,6 +27,7 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import org.apache.hadoop.fs.Path; import org.apache.iceberg.PartitionSpec; @@ -75,13 +76,14 @@ public class IcebergNativeMetadata public IcebergNativeMetadata( IcebergNativeCatalogFactory catalogFactory, TypeManager typeManager, + IProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, CatalogType catalogType, NodeVersion nodeVersion) { - super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion); + super(typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion); this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); this.catalogType = requireNonNull(catalogType, "catalogType is null"); } @@ -171,14 +173,14 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con FileFormat fileFormat = getFileFormat(tableMetadata.getProperties()); try { - transaction = catalogFactory.getCatalog(session).newCreateTableTransaction( - toIcebergTableIdentifier(schemaTableName), schema, partitionSpec, populateTableProperties(tableMetadata, fileFormat, session)); + transactionContext = new IcebergTransactionContext(Optional.empty(), catalogFactory.getCatalog(session).newCreateTableTransaction( + toIcebergTableIdentifier(schemaTableName), schema, partitionSpec, populateTableProperties(tableMetadata, fileFormat, session))); } catch (AlreadyExistsException e) { throw new TableAlreadyExistsException(schemaTableName); } - Table icebergTable = transaction.table(); + Table icebergTable = transactionContext.getTransaction().table(); return new IcebergWritableTableHandle( schemaName, new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty()), diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java index a59d63a915c83..a0c379a69e88a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java @@ -18,6 +18,7 @@ import com.facebook.presto.hive.NodeVersion; import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import javax.inject.Inject; @@ -28,6 +29,7 @@ public class IcebergNativeMetadataFactory implements IcebergMetadataFactory { final TypeManager typeManager; + final IProcedureRegistry procedureRegistry; final JsonCodec commitTaskCodec; final IcebergNativeCatalogFactory catalogFactory; final CatalogType catalogType; @@ -40,6 +42,7 @@ public IcebergNativeMetadataFactory( IcebergConfig config, IcebergNativeCatalogFactory catalogFactory, TypeManager typeManager, + IProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -47,6 +50,7 @@ public IcebergNativeMetadataFactory( { this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); @@ -57,6 +61,6 @@ public IcebergNativeMetadataFactory( public ConnectorMetadata create() { - return new IcebergNativeMetadata(catalogFactory, typeManager, functionResolution, rowExpressionService, commitTaskCodec, catalogType, nodeVersion); + return new IcebergNativeMetadata(catalogFactory, typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, catalogType, nodeVersion); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java index 7e91ebf4a7028..d43572c3dc9e6 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java @@ -16,6 +16,7 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorPageSink; @@ -73,6 +74,21 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa return createPageSink(session, (IcebergWritableTableHandle) insertTableHandle); } + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, PageSinkContext pageSinkContext) + { + IcebergDistributedProcedureHandle distributedProcedureHandle = (IcebergDistributedProcedureHandle) procedureHandle; + return createPageSink(session, new IcebergWritableTableHandle( + distributedProcedureHandle.getSchemaName(), + distributedProcedureHandle.getTableName(), + distributedProcedureHandle.getSchemaAsJson(), + distributedProcedureHandle.getPartitionSpecAsJson(), + distributedProcedureHandle.getInputColumns(), + distributedProcedureHandle.getOutputPath(), + distributedProcedureHandle.getFileFormat(), + distributedProcedureHandle.getStorageProperties())); + } + private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritableTableHandle tableHandle) { HdfsContext hdfsContext = new HdfsContext(session, tableHandle.getSchemaName(), tableHandle.getTableName().getTableName()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java index 0a2bc13de0d99..68f289b73696d 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java @@ -22,6 +22,7 @@ import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.google.common.collect.ImmutableList; @@ -37,6 +38,7 @@ import javax.inject.Inject; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -82,6 +84,15 @@ public ConnectorSplitSource getSplits( } TupleDomain predicate = layoutHandle.getValidPredicate(); + ConnectorMetadata connectorMetadata = transactionManager.get(transaction); + if (connectorMetadata != null) { + IcebergAbstractMetadata icebergMetadata = (IcebergAbstractMetadata) connectorMetadata; + Optional connectorSplitSource = icebergMetadata.getSplitSourceInCurrentCallProcedureTransaction(); + if (connectorSplitSource.isPresent()) { + return connectorSplitSource.get(); + } + } + Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName()); if (table.getIcebergTableName().getTableType() == CHANGELOG) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTransactionContext.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTransactionContext.java new file mode 100644 index 0000000000000..7dce34bb999fe --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTransactionContext.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.connector.ConnectorTransactionContext; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +public class IcebergTransactionContext + implements ConnectorTransactionContext +{ + final Optional table; + final Transaction transaction; + final Set scannedDataFiles = new HashSet<>(); + final Set fullyAppliedDeleteFiles = new HashSet<>(); + final Map relevantData = new HashMap<>(); + Optional connectorSplitSource = Optional.empty(); + + public IcebergTransactionContext(Optional
table, Transaction transaction) + { + this.table = table; + this.transaction = transaction; + } + + public Optional
getTable() + { + return table; + } + + public Transaction getTransaction() + { + return transaction; + } + + public void setConnectorSplitSource(ConnectorSplitSource connectorSplitSource) + { + requireNonNull(connectorSplitSource, "connectorSplitSource is null"); + this.connectorSplitSource = Optional.of(connectorSplitSource); + } + + public Optional getConnectorSplitSource() + { + return this.connectorSplitSource; + } + + public Set getScannedDataFiles() + { + return scannedDataFiles; + } + + public Set getFullyAppliedDeleteFiles() + { + return fullyAppliedDeleteFiles; + } + + public Map getRelevantData() + { + return relevantData; + } + + public void destroy() + { + this.relevantData.clear(); + this.scannedDataFiles.clear(); + this.fullyAppliedDeleteFiles.clear(); + this.connectorSplitSource.ifPresent(ConnectorSplitSource::close); + this.connectorSplitSource = null; + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java index bab2cb74a63d3..9d5f16e5e5c39 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java @@ -45,6 +45,7 @@ import com.facebook.presto.spi.function.FunctionMetadataManager; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.procedure.Procedure; import com.facebook.presto.spi.relation.RowExpressionService; import com.facebook.presto.spi.session.PropertyMetadata; @@ -92,6 +93,7 @@ public static Connector createConnector( binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion())); binder.bind(NodeManager.class).toInstance(context.getNodeManager()); binder.bind(TypeManager.class).toInstance(context.getTypeManager()); + binder.bind(IProcedureRegistry.class).toInstance(context.getProcedureRegistry()); binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory()); binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution()); binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager()); diff --git a/presto-main/src/main/java/com/facebook/presto/connector/ConnectorContextInstance.java b/presto-main/src/main/java/com/facebook/presto/connector/ConnectorContextInstance.java index c36ff87b0460b..9f329fedc8897 100644 --- a/presto-main/src/main/java/com/facebook/presto/connector/ConnectorContextInstance.java +++ b/presto-main/src/main/java/com/facebook/presto/connector/ConnectorContextInstance.java @@ -23,6 +23,7 @@ import com.facebook.presto.spi.function.FunctionMetadataManager; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import static java.util.Objects.requireNonNull; @@ -32,6 +33,7 @@ public class ConnectorContextInstance { private final NodeManager nodeManager; private final TypeManager typeManager; + private final IProcedureRegistry procedureRegistry; private final FunctionMetadataManager functionMetadataManager; private final StandardFunctionResolution functionResolution; private final PageSorter pageSorter; @@ -44,6 +46,7 @@ public class ConnectorContextInstance public ConnectorContextInstance( NodeManager nodeManager, TypeManager typeManager, + IProcedureRegistry procedureRegistry, FunctionMetadataManager functionMetadataManager, StandardFunctionResolution functionResolution, PageSorter pageSorter, @@ -55,6 +58,7 @@ public ConnectorContextInstance( { this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.functionMetadataManager = requireNonNull(functionMetadataManager, "functionMetadataManager is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.pageSorter = requireNonNull(pageSorter, "pageSorter is null"); @@ -77,6 +81,12 @@ public TypeManager getTypeManager() return typeManager; } + @Override + public IProcedureRegistry getProcedureRegistry() + { + return procedureRegistry; + } + @Override public FunctionMetadataManager getFunctionMetadataManager() { diff --git a/presto-main/src/main/java/com/facebook/presto/connector/ConnectorManager.java b/presto-main/src/main/java/com/facebook/presto/connector/ConnectorManager.java index 6ed7888b33d54..8dd55e784f87d 100644 --- a/presto-main/src/main/java/com/facebook/presto/connector/ConnectorManager.java +++ b/presto-main/src/main/java/com/facebook/presto/connector/ConnectorManager.java @@ -52,6 +52,7 @@ import com.facebook.presto.spi.connector.ConnectorRecordSetProvider; import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.connector.ConnectorTypeSerdeProvider; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.procedure.Procedure; import com.facebook.presto.spi.relation.DeterminismEvaluator; import com.facebook.presto.spi.relation.DomainTranslator; @@ -112,6 +113,7 @@ public class ConnectorManager private final HandleResolver handleResolver; private final InternalNodeManager nodeManager; private final TypeManager typeManager; + private final IProcedureRegistry procedureRegistry; private final PageSorter pageSorter; private final PageIndexerFactory pageIndexerFactory; private final NodeInfo nodeInfo; @@ -148,6 +150,7 @@ public ConnectorManager( InternalNodeManager nodeManager, NodeInfo nodeInfo, TypeManager typeManager, + IProcedureRegistry procedureRegistry, PageSorter pageSorter, PageIndexerFactory pageIndexerFactory, TransactionManager transactionManager, @@ -172,6 +175,7 @@ public ConnectorManager( this.handleResolver = requireNonNull(handleResolver, "handleResolver is null"); this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.pageSorter = requireNonNull(pageSorter, "pageSorter is null"); this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null"); this.nodeInfo = requireNonNull(nodeInfo, "nodeInfo is null"); @@ -376,6 +380,7 @@ private Connector createConnector(ConnectorId connectorId, ConnectorFactory fact ConnectorContext context = new ConnectorContextInstance( new ConnectorAwareNodeManager(nodeManager, nodeInfo.getEnvironment(), connectorId), typeManager, + procedureRegistry, metadataManager.getFunctionAndTypeManager(), new FunctionResolution(metadataManager.getFunctionAndTypeManager().getFunctionAndTypeResolver()), pageSorter, diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index e766606b0295f..be6632afe7590 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -474,6 +474,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, nodeManager, nodeInfo, metadata.getFunctionAndTypeManager(), + procedureRegistry, pageSorter, pageIndexerFactory, transactionManager, diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorContext.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorContext.java index a954291e9c822..0a056d09887bb 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorContext.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorContext.java @@ -22,6 +22,7 @@ import com.facebook.presto.spi.function.FunctionMetadataManager; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; public interface ConnectorContext @@ -36,6 +37,11 @@ default TypeManager getTypeManager() throw new UnsupportedOperationException(); } + default IProcedureRegistry getProcedureRegistry() + { + throw new UnsupportedOperationException(); + } + default FunctionMetadataManager getFunctionMetadataManager() { throw new UnsupportedOperationException();