From 3d1560a1dc4bce22c8a63f6e74bf8d702480003f Mon Sep 17 00:00:00 2001 From: wangd Date: Sun, 5 May 2024 21:57:07 +0800 Subject: [PATCH] Refactor Iceberg connector to support call distributed procedure --- .../CallDistributedProcedureSplitSource.java | 135 ++++++++++++++++++ .../iceberg/IcebergAbstractMetadata.java | 92 ++++++++++-- .../IcebergDistributedProcedureHandle.java | 112 +++++++++++++++ .../presto/iceberg/IcebergHandleResolver.java | 7 + .../presto/iceberg/IcebergHiveMetadata.java | 6 +- .../iceberg/IcebergHiveMetadataFactory.java | 6 +- .../presto/iceberg/IcebergNativeMetadata.java | 10 +- .../iceberg/IcebergNativeMetadataFactory.java | 6 +- .../iceberg/IcebergPageSinkProvider.java | 16 +++ .../presto/iceberg/IcebergSplitManager.java | 12 ++ .../iceberg/IcebergTransactionContext.java | 91 ++++++++++++ .../InternalIcebergConnectorFactory.java | 2 + 12 files changed, 474 insertions(+), 21 deletions(-) create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTransactionContext.java diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java new file mode 100644 index 0000000000000..72799f30d4948 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CallDistributedProcedureSplitSource.java @@ -0,0 +1,135 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.iceberg.delete.DeleteFile; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.SplitWeight; +import com.facebook.presto.spi.connector.ConnectorPartitionHandle; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Closer; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy; +import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat; +import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber; +import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys; +import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterators.limit; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedFuture; + +public class CallDistributedProcedureSplitSource + implements ConnectorSplitSource +{ + private CloseableIterator fileScanTaskIterator; + private Optional> fileScanTaskConsumer; + + private final TableScan tableScan; + private final Closer closer = Closer.create(); + private final double minimumAssignedSplitWeight; + private final ConnectorSession session; + + public CallDistributedProcedureSplitSource( + ConnectorSession session, + TableScan tableScan, + CloseableIterable fileScanTaskIterable, + Optional> fileScanTaskConsumer, + double minimumAssignedSplitWeight) + { + this.session = requireNonNull(session, "session is null"); + this.tableScan = requireNonNull(tableScan, "tableScan is null"); + this.fileScanTaskIterator = fileScanTaskIterable.iterator(); + this.fileScanTaskConsumer = requireNonNull(fileScanTaskConsumer, "fileScanTaskConsumer is null"); + this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; + closer.register(fileScanTaskIterator); + } + + @Override + public CompletableFuture getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize) + { + // TODO: move this to a background thread + List splits = new ArrayList<>(); + Iterator iterator = limit(fileScanTaskIterator, maxSize); + while (iterator.hasNext()) { + FileScanTask task = iterator.next(); + fileScanTaskConsumer.ifPresent(consumer -> consumer.accept(task)); + splits.add(toIcebergSplit(task)); + } + return completedFuture(new ConnectorSplitBatch(splits, isFinished())); + } + + @Override + public boolean isFinished() + { + return !fileScanTaskIterator.hasNext(); + } + + @Override + public void close() + { + try { + closer.close(); + // TODO: remove this after org.apache.iceberg.io.CloseableIterator'withClose + // correct release resources holds by iterator. + fileScanTaskIterator = CloseableIterator.empty(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private ConnectorSplit toIcebergSplit(FileScanTask task) + { + PartitionSpec spec = task.spec(); + Optional partitionData = partitionDataFromStructLike(spec, task.file().partition()); + + // TODO: We should leverage residual expression and convert that to TupleDomain. + // The predicate here is used by readers for predicate push down at reader level, + // so when we do not use residual expression, we are just wasting CPU cycles + // on reader side evaluating a condition that we know will always be true. + + return new IcebergSplit( + task.file().path().toString(), + task.start(), + task.length(), + fromIcebergFileFormat(task.file().format()), + ImmutableList.of(), + getPartitionKeys(task), + PartitionSpecParser.toJson(spec), + partitionData.map(PartitionData::toJson), + getNodeSelectionStrategy(session), + SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)), + task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()), + Optional.empty(), + getDataSequenceNumber(task.file())); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index b95ce83575abf..1822891b15c6c 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -15,6 +15,7 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.QualifiedObjectName; import com.facebook.presto.common.Subfield; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.BigintType; @@ -27,10 +28,13 @@ import com.facebook.presto.iceberg.changelog.ChangelogUtil; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorNewTableLayout; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.ConnectorTableLayout; import com.facebook.presto.spi.ConnectorTableLayoutHandle; @@ -47,6 +51,9 @@ import com.facebook.presto.spi.connector.ConnectorOutputMetadata; import com.facebook.presto.spi.connector.ConnectorTableVersion; import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.procedure.DistributedProcedure; +import com.facebook.presto.spi.procedure.IProcedureRegistry; +import com.facebook.presto.spi.procedure.Procedure; import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.RowExpressionService; import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; @@ -70,6 +77,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileMetadata; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; @@ -161,10 +169,11 @@ public abstract class IcebergAbstractMetadata implements ConnectorMetadata { protected final TypeManager typeManager; + protected final IProcedureRegistry procedureRegistry; protected final JsonCodec commitTaskCodec; protected final NodeVersion nodeVersion; protected final RowExpressionService rowExpressionService; - protected Transaction transaction; + protected IcebergTransactionContext transactionContext; private final StandardFunctionResolution functionResolution; private final ConcurrentMap icebergTables = new ConcurrentHashMap<>(); @@ -172,12 +181,14 @@ public abstract class IcebergAbstractMetadata public IcebergAbstractMetadata( TypeManager typeManager, + IProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, NodeVersion nodeVersion) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); @@ -199,6 +210,11 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName public abstract void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName); + public Optional getSplitSourceInCurrentCallProcedureTransaction() + { + return transactionContext == null ? Optional.empty() : transactionContext.getConnectorSplitSource(); + } + /** * This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker */ @@ -428,7 +444,7 @@ public Optional finishCreateTable(ConnectorSession sess protected ConnectorInsertTableHandle beginIcebergTableInsert(IcebergTableHandle table, Table icebergTable) { - transaction = icebergTable.newTransaction(); + transactionContext = new IcebergTransactionContext(Optional.of(icebergTable), icebergTable.newTransaction()); return new IcebergWritableTableHandle( table.getSchemaName(), @@ -445,12 +461,13 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(IcebergTableHandle public Optional finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) { if (fragments.isEmpty()) { - transaction.commitTransaction(); + transactionContext.getTransaction().commitTransaction(); + transactionContext.destroy(); return Optional.empty(); } IcebergWritableTableHandle table = (IcebergWritableTableHandle) insertHandle; - Table icebergTable = transaction.table(); + Table icebergTable = transactionContext.getTransaction().table(); List commitTasks = fragments.stream() .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) @@ -461,7 +478,7 @@ public Optional finishInsert(ConnectorSession session, icebergTable.schema().findType(field.sourceId()))) .toArray(Type[]::new); - AppendFiles appendFiles = transaction.newFastAppend(); + AppendFiles appendFiles = transactionContext.getTransaction().newFastAppend(); for (CommitTaskData task : commitTasks) { DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) .withPath(task.getPath()) @@ -479,7 +496,8 @@ public Optional finishInsert(ConnectorSession session, } appendFiles.commit(); - transaction.commitTransaction(); + transactionContext.getTransaction().commitTransaction(); + transactionContext.destroy(); return Optional.of(new HiveWrittenPartitions(commitTasks.stream() .map(CommitTaskData::getPath) @@ -770,6 +788,44 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa } } + @Override + public ConnectorDistributedProcedureHandle beginCallDistributedProcedure( + ConnectorSession session, + QualifiedObjectName procedureName, + ConnectorTableLayoutHandle tableLayoutHandle, + Object[] arguments) + { + IcebergTableHandle handle = ((IcebergTableLayoutHandle) tableLayoutHandle).getTable(); + Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); + + if (handle.isSnapshotSpecified()) { + throw new PrestoException(NOT_SUPPORTED, "This connector do not allow table execute at specified snapshot"); + } + + transactionContext = new IcebergTransactionContext(Optional.of(icebergTable), icebergTable.newTransaction()); + Procedure procedure = procedureRegistry.resolve( + new ConnectorId(procedureName.getCatalogName()), + new SchemaTableName( + procedureName.getSchemaName(), + procedureName.getObjectName())); + verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure"); + return ((DistributedProcedure) procedure).getBeginCallDistributedProcedure().begin(session, transactionContext, tableLayoutHandle, arguments); + } + + @Override + public void finishCallDistributedProcedure(ConnectorSession session, ConnectorDistributedProcedureHandle tableHandle, QualifiedObjectName procedureName, Collection fragments) + { + Procedure procedure = procedureRegistry.resolve( + new ConnectorId(procedureName.getCatalogName()), + new SchemaTableName( + procedureName.getSchemaName(), + procedureName.getObjectName())); + verify(procedure instanceof DistributedProcedure, "procedure must be DistributedProcedure"); + ((DistributedProcedure) procedure).getFinishCallDistributedProcedure().finish(transactionContext, tableHandle, fragments); + transactionContext.getTransaction().commitTransaction(); + transactionContext.destroy(); + } + @Override public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) { @@ -790,7 +846,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable } validateTableMode(session, icebergTable); - transaction = icebergTable.newTransaction(); + transactionContext = new IcebergTransactionContext(Optional.of(icebergTable), icebergTable.newTransaction()); return handle; } @@ -801,7 +857,7 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan IcebergTableHandle handle = (IcebergTableHandle) tableHandle; Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); - RowDelta rowDelta = transaction.newRowDelta(); + RowDelta rowDelta = transactionContext.getTransaction().newRowDelta(); List commitTasks = fragments.stream() .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) @@ -838,7 +894,8 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan } rowDelta.commit(); - transaction.commitTransaction(); + transactionContext.getTransaction().commitTransaction(); + transactionContext.destroy(); } @Override @@ -872,7 +929,15 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); boolean supportsMetadataDelete = true; - for (PartitionSpec spec : icebergTable.specs().values()) { + // Get partition specs that really need to be checked + Set partitionSpecs = handle.getIcebergTableName().getSnapshotId().map( + snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream() + .map(ManifestFile::partitionSpecId) + .map(icebergTable.specs()::get) + .collect(toImmutableSet())) + .orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().values())); // No snapshot, so no data. This case doesn't matter. + + for (PartitionSpec spec : partitionSpecs) { // Currently we do not support delete when any partition columns in predicate is not transform by identity() Set partitionColumnSourceIds = spec.fields().stream() .filter(field -> field.transform().isIdentity()) @@ -928,15 +993,16 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl */ private long removeScanFiles(Table icebergTable, Iterable scan) { - transaction = icebergTable.newTransaction(); - DeleteFiles deletes = transaction.newDelete(); + transactionContext = new IcebergTransactionContext(Optional.of(icebergTable), icebergTable.newTransaction()); + DeleteFiles deletes = transactionContext.getTransaction().newDelete(); AtomicLong rowsDeleted = new AtomicLong(0L); scan.forEach(t -> { deletes.deleteFile(t.file()); rowsDeleted.addAndGet(t.estimatedRowsCount()); }); deletes.commit(); - transaction.commitTransaction(); + transactionContext.getTransaction().commitTransaction(); + transactionContext.destroy(); return rowsDeleted.get(); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java new file mode 100644 index 0000000000000..ee02893b2747a --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDistributedProcedureHandle.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class IcebergDistributedProcedureHandle + implements ConnectorDistributedProcedureHandle +{ + private final String schemaName; + private final IcebergTableName tableName; + private final String schemaAsJson; + private final String partitionSpecAsJson; + private final List inputColumns; + private final String outputPath; + private final FileFormat fileFormat; + private final Map storageProperties; + + @JsonCreator + public IcebergDistributedProcedureHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") IcebergTableName tableName, + @JsonProperty("schemaAsJson") String schemaAsJson, + @JsonProperty("partitionSpecAsJson") String partitionSpecAsJson, + @JsonProperty("inputColumns") List inputColumns, + @JsonProperty("outputPath") String outputPath, + @JsonProperty("fileFormat") FileFormat fileFormat, + @JsonProperty("storageProperties") Map storageProperties) + { + this.schemaName = requireNonNull(schemaName, "schemaName is null"); + this.tableName = requireNonNull(tableName, "tableName is null"); + this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null"); + this.partitionSpecAsJson = requireNonNull(partitionSpecAsJson, "partitionSpecAsJson is null"); + this.inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null")); + this.outputPath = requireNonNull(outputPath, "filePrefix is null"); + this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); + this.storageProperties = requireNonNull(storageProperties, "storageProperties is null"); + } + + @JsonProperty + public String getSchemaName() + { + return schemaName; + } + + @JsonProperty + public IcebergTableName getTableName() + { + return tableName; + } + + @JsonProperty + public String getSchemaAsJson() + { + return schemaAsJson; + } + + @JsonProperty + public String getPartitionSpecAsJson() + { + return partitionSpecAsJson; + } + + @JsonProperty + public List getInputColumns() + { + return inputColumns; + } + + @JsonProperty + public String getOutputPath() + { + return outputPath; + } + + @JsonProperty + public FileFormat getFileFormat() + { + return fileFormat; + } + + @JsonProperty + public Map getStorageProperties() + { + return storageProperties; + } + + @Override + public String toString() + { + return schemaName + "." + tableName; + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java index 312c94d8cb0aa..1bd81045411b7 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java @@ -15,6 +15,7 @@ import com.facebook.presto.hive.HiveTransactionHandle; import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; @@ -62,6 +63,12 @@ public Class getInsertTableHandleClass() return IcebergWritableTableHandle.class; } + @Override + public Class getDistributedProcedureHandleClass() + { + return IcebergDistributedProcedureHandle.class; + } + @Override public Class getTransactionHandleClass() { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 35f0107a7aa61..d3c99ea64516f 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -51,6 +51,7 @@ import com.facebook.presto.spi.ViewNotFoundException; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.RowExpressionService; import com.facebook.presto.spi.relation.VariableReferenceExpression; @@ -156,13 +157,14 @@ public IcebergHiveMetadata( ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, + IProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, NodeVersion nodeVersion, FilterStatsCalculatorService filterStatsCalculatorService) { - super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion); + super(typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion); this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null"); @@ -302,7 +304,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con FileFormat fileFormat = getFileFormat(tableMetadata.getProperties()); TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, populateTableProperties(tableMetadata, fileFormat)); - transaction = createTableTransaction(tableName, operations, metadata); + transactionContext = new IcebergTransactionContext(Optional.empty(), createTableTransaction(tableName, operations, metadata)); return new IcebergWritableTableHandle( schemaName, diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java index d69e9447772fd..e73c89a7626d7 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java @@ -21,6 +21,7 @@ import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import javax.inject.Inject; @@ -33,6 +34,7 @@ public class IcebergHiveMetadataFactory final ExtendedHiveMetastore metastore; final HdfsEnvironment hdfsEnvironment; final TypeManager typeManager; + final IProcedureRegistry procedureRegistry; final JsonCodec commitTaskCodec; final StandardFunctionResolution functionResolution; final RowExpressionService rowExpressionService; @@ -44,6 +46,7 @@ public IcebergHiveMetadataFactory( ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, + IProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -53,6 +56,7 @@ public IcebergHiveMetadataFactory( this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); @@ -62,6 +66,6 @@ public IcebergHiveMetadataFactory( public ConnectorMetadata create() { - return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService); + return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index ed39adbe34fdc..ae8104b3bea52 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -27,6 +27,7 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import org.apache.hadoop.fs.Path; import org.apache.iceberg.PartitionSpec; @@ -74,13 +75,14 @@ public class IcebergNativeMetadata public IcebergNativeMetadata( IcebergResourceFactory resourceFactory, TypeManager typeManager, + IProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, CatalogType catalogType, NodeVersion nodeVersion) { - super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion); + super(typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion); this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null"); this.catalogType = requireNonNull(catalogType, "catalogType is null"); } @@ -168,14 +170,14 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con FileFormat fileFormat = getFileFormat(tableMetadata.getProperties()); try { - transaction = resourceFactory.getCatalog(session).newCreateTableTransaction( - toIcebergTableIdentifier(schemaTableName), schema, partitionSpec, populateTableProperties(tableMetadata, fileFormat)); + transactionContext = new IcebergTransactionContext(Optional.empty(), resourceFactory.getCatalog(session).newCreateTableTransaction( + toIcebergTableIdentifier(schemaTableName), schema, partitionSpec, populateTableProperties(tableMetadata, fileFormat))); } catch (AlreadyExistsException e) { throw new TableAlreadyExistsException(schemaTableName); } - Table icebergTable = transaction.table(); + Table icebergTable = transactionContext.getTransaction().table(); return new IcebergWritableTableHandle( schemaName, new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty()), diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java index 35ee1ebe6af11..7692373559928 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java @@ -18,6 +18,7 @@ import com.facebook.presto.hive.NodeVersion; import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.relation.RowExpressionService; import javax.inject.Inject; @@ -28,6 +29,7 @@ public class IcebergNativeMetadataFactory implements IcebergMetadataFactory { final TypeManager typeManager; + final IProcedureRegistry procedureRegistry; final JsonCodec commitTaskCodec; final IcebergResourceFactory resourceFactory; final CatalogType catalogType; @@ -40,6 +42,7 @@ public IcebergNativeMetadataFactory( IcebergConfig config, IcebergResourceFactory resourceFactory, TypeManager typeManager, + IProcedureRegistry procedureRegistry, StandardFunctionResolution functionResolution, RowExpressionService rowExpressionService, JsonCodec commitTaskCodec, @@ -47,6 +50,7 @@ public IcebergNativeMetadataFactory( { this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.procedureRegistry = requireNonNull(procedureRegistry, "procedureRegistry is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); @@ -57,6 +61,6 @@ public IcebergNativeMetadataFactory( public ConnectorMetadata create() { - return new IcebergNativeMetadata(resourceFactory, typeManager, functionResolution, rowExpressionService, commitTaskCodec, catalogType, nodeVersion); + return new IcebergNativeMetadata(resourceFactory, typeManager, procedureRegistry, functionResolution, rowExpressionService, commitTaskCodec, catalogType, nodeVersion); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java index 7e91ebf4a7028..d43572c3dc9e6 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java @@ -16,6 +16,7 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorPageSink; @@ -73,6 +74,21 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa return createPageSink(session, (IcebergWritableTableHandle) insertTableHandle); } + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, PageSinkContext pageSinkContext) + { + IcebergDistributedProcedureHandle distributedProcedureHandle = (IcebergDistributedProcedureHandle) procedureHandle; + return createPageSink(session, new IcebergWritableTableHandle( + distributedProcedureHandle.getSchemaName(), + distributedProcedureHandle.getTableName(), + distributedProcedureHandle.getSchemaAsJson(), + distributedProcedureHandle.getPartitionSpecAsJson(), + distributedProcedureHandle.getInputColumns(), + distributedProcedureHandle.getOutputPath(), + distributedProcedureHandle.getFileFormat(), + distributedProcedureHandle.getStorageProperties())); + } + private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritableTableHandle tableHandle) { HdfsContext hdfsContext = new HdfsContext(session, tableHandle.getSchemaName(), tableHandle.getTableName().getTableName()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java index e9ed7f0454de3..e8fb49d83884a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java @@ -21,6 +21,7 @@ import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.google.common.collect.ImmutableList; @@ -34,6 +35,8 @@ import javax.inject.Inject; +import java.util.Optional; + import static com.facebook.presto.hive.rule.FilterPushdownUtils.isEntireColumn; import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; @@ -79,6 +82,15 @@ public ConnectorSplitSource getSplits( .transform(layoutHandle.getPredicateColumns()::get)) : table.getPredicate(); + ConnectorMetadata connectorMetadata = transactionManager.get(transaction); + if (connectorMetadata != null) { + IcebergAbstractMetadata icebergMetadata = (IcebergAbstractMetadata) connectorMetadata; + Optional connectorSplitSource = icebergMetadata.getSplitSourceInCurrentCallProcedureTransaction(); + if (connectorSplitSource.isPresent()) { + return connectorSplitSource.get(); + } + } + Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName()); if (table.getIcebergTableName().getTableType() == CHANGELOG) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTransactionContext.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTransactionContext.java new file mode 100644 index 0000000000000..7dce34bb999fe --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTransactionContext.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.connector.ConnectorTransactionContext; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +public class IcebergTransactionContext + implements ConnectorTransactionContext +{ + final Optional table; + final Transaction transaction; + final Set scannedDataFiles = new HashSet<>(); + final Set fullyAppliedDeleteFiles = new HashSet<>(); + final Map relevantData = new HashMap<>(); + Optional connectorSplitSource = Optional.empty(); + + public IcebergTransactionContext(Optional
table, Transaction transaction) + { + this.table = table; + this.transaction = transaction; + } + + public Optional
getTable() + { + return table; + } + + public Transaction getTransaction() + { + return transaction; + } + + public void setConnectorSplitSource(ConnectorSplitSource connectorSplitSource) + { + requireNonNull(connectorSplitSource, "connectorSplitSource is null"); + this.connectorSplitSource = Optional.of(connectorSplitSource); + } + + public Optional getConnectorSplitSource() + { + return this.connectorSplitSource; + } + + public Set getScannedDataFiles() + { + return scannedDataFiles; + } + + public Set getFullyAppliedDeleteFiles() + { + return fullyAppliedDeleteFiles; + } + + public Map getRelevantData() + { + return relevantData; + } + + public void destroy() + { + this.relevantData.clear(); + this.scannedDataFiles.clear(); + this.fullyAppliedDeleteFiles.clear(); + this.connectorSplitSource.ifPresent(ConnectorSplitSource::close); + this.connectorSplitSource = null; + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java index d2c2f4c82eed2..a03b7232fd6df 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java @@ -45,6 +45,7 @@ import com.facebook.presto.spi.function.FunctionMetadataManager; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.procedure.IProcedureRegistry; import com.facebook.presto.spi.procedure.Procedure; import com.facebook.presto.spi.relation.RowExpressionService; import com.facebook.presto.spi.session.PropertyMetadata; @@ -94,6 +95,7 @@ public static Connector createConnector(String catalogName, Map binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion())); binder.bind(NodeManager.class).toInstance(context.getNodeManager()); binder.bind(TypeManager.class).toInstance(context.getTypeManager()); + binder.bind(IProcedureRegistry.class).toInstance(context.getProcedureRegistry()); binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory()); binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution()); binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager());