Skip to content

Commit

Permalink
Support Iceberg procedure rewrite_data_files
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed May 12, 2024
1 parent 3d1560a commit f89dc40
Show file tree
Hide file tree
Showing 3 changed files with 721 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void setup(Binder binder)
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);

// for orc
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.procedure.DistributedProcedure;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.TableScanUtil;

import javax.inject.Inject;
import javax.inject.Provider;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;

import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static com.facebook.presto.hive.rule.FilterPushdownUtils.isEntireColumn;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class RewriteDataFilesProcedure
implements Provider<Procedure>
{
TypeManager typeManager;
JsonCodec<CommitTaskData> commitTaskCodec;

@Inject
public RewriteDataFilesProcedure(
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
}

@Override
public Procedure get()
{
return new DistributedProcedure(
"system",
"rewrite_data_files",
ImmutableList.of(
new Procedure.Argument(SCHEMA, VARCHAR),
new Procedure.Argument(TABLE_NAME, VARCHAR),
new Procedure.Argument("filter", VARCHAR, false, "TRUE"),
new Procedure.Argument("options", "map(varchar, varchar)", false, null)),
(session, transactionContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergTransactionContext) transactionContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
((transactionContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergTransactionContext) transactionContext, tableHandle, fragments)));
}

private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergTransactionContext transactionContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
Table icebergTable = transactionContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
IcebergTableHandle tableHandle = layoutHandle.getTable();

ConnectorSplitSource splitSource;
if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) {
splitSource = new FixedSplitSource(ImmutableList.of());
}
else {
TupleDomain<IcebergColumnHandle> predicate = isPushdownFilterEnabled(session) ?
layoutHandle.getPartitionColumnPredicate()
.transform(IcebergColumnHandle.class::cast)
.intersect(layoutHandle.getDomainPredicate()
.transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null)
.transform(layoutHandle.getPredicateColumns()::get)) :
tableHandle.getPredicate();
TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(predicate))
.useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get());

Consumer<FileScanTask> fileScanTaskConsumer = (task) -> {
transactionContext.getScannedDataFiles().add(task.file());
if (!task.deletes().isEmpty()) {
task.deletes().forEach(deleteFile -> {
if (deleteFile.content() == FileContent.EQUALITY_DELETES &&
!icebergTable.specs().get(deleteFile.specId()).isPartitioned() &&
!predicate.isAll()) {
// Equality files with an unpartitioned spec are applied as global deletes
// So they should not be cleaned up unless the whole table is optimized
return;
}
transactionContext.getFullyAppliedDeleteFiles().add(deleteFile);
});
transactionContext.getFullyAppliedDeleteFiles().addAll(task.deletes());
}
};

splitSource = new CallDistributedProcedureSplitSource(
session,
tableScan,
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
Optional.of(fileScanTaskConsumer),
getMinimumAssignedSplitWeight(session));
}
transactionContext.setConnectorSplitSource(splitSource);

return new IcebergDistributedProcedureHandle(
tableHandle.getSchemaName(),
tableHandle.getIcebergTableName(),
SchemaParser.toJson(icebergTable.schema()),
PartitionSpecParser.toJson(icebergTable.spec()),
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
getFileFormat(icebergTable),
icebergTable.properties());
}
}

private void finishCallDistributedProcedure(IcebergTransactionContext transactionContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
{
if (fragments.isEmpty() &&
transactionContext.getScannedDataFiles().isEmpty() &&
transactionContext.getFullyAppliedDeleteFiles().isEmpty()) {
return;
}

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle;
Table icebergTable = transactionContext.getTransaction().table();

List<CommitTaskData> commitTasks = fragments.stream()
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());

org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
.map(field -> field.transform().getResultType(
icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);

Set<DataFile> newFiles = new HashSet<>();
for (CommitTaskData task : commitTasks) {
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(handle.getFileFormat().name())
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
newFiles.add(builder.build());
}

RewriteFiles rewriteFiles = transactionContext.getTransaction().newRewrite();
Set<DataFile> scannedDataFiles = transactionContext.getScannedDataFiles();
Set<DeleteFile> fullyAppliedDeleteFiles = transactionContext.getFullyAppliedDeleteFiles();
rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of());

// Table.snapshot method returns null if there is no matching snapshot
Snapshot snapshot = requireNonNull(
handle.getTableName()
.getSnapshotId()
.map(icebergTable::snapshot)
.orElse(null),
"snapshot is null");
if (icebergTable.currentSnapshot() != null) {
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
}
rewriteFiles.commit();
}
}
}
Loading

0 comments on commit f89dc40

Please sign in to comment.