Skip to content

Commit

Permalink
Support Iceberg procedure rewrite_data_files
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Jun 11, 2024
1 parent 66c6d86 commit 0dc3dbb
Show file tree
Hide file tree
Showing 7 changed files with 897 additions and 2 deletions.
40 changes: 40 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,46 @@ Examples:

CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]);

Rewrite Data Files
^^^^^^^^^^^^^^^^^^

In Iceberg, we tracks all data files under different partition specs in a table. More data files leads to
more metadata stored in manifest files, and small data files causes an unnecessary amount of metadata and
less efficient queries from file open costs. Besides, data files under different partition specs might
prevent metadata level deletion or predicate thoroughly push down for Presto engine.

Procedure `rewrite_data_files` is used for rewriting the data files of the specified table so that they are
merged into fewer but larger files under the newest partition spec. If the table is partitioned, the data
files compaction could act separately on the selected partitions. This can improve read performance by reducing
metadata overhead and runtime file open cost.

The following arguments are available:

===================== ========== =============== =======================================================================
Argument Name required type Description
===================== ========== =============== =======================================================================
``schema`` ✔️ string Schema of the table to update

``table_name`` ✔️ string Name of the table to update

``filter`` string predicate as a string used for filtering the files. Note that currently
we only support rewrite whole partitions, that is, the filter should be
on partition columns(defaults to `true`)

``options`` map Options to be used for data files rewrite(to be expanded)
===================== ========== =============== =======================================================================

Examples:

* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones::

CALL iceberg.system.rewrite_data_files('db', 'sample');
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample');

* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec::

CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1');
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1');

SQL Support
-----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void setup(Binder binder)
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);

// for orc
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.procedure.DistributedProcedure;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.TableScanUtil;

import javax.inject.Inject;
import javax.inject.Provider;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;

import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class RewriteDataFilesProcedure
implements Provider<Procedure>
{
TypeManager typeManager;
JsonCodec<CommitTaskData> commitTaskCodec;

@Inject
public RewriteDataFilesProcedure(
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
}

@Override
public Procedure get()
{
return new DistributedProcedure(
"system",
"rewrite_data_files",
ImmutableList.of(
new Procedure.Argument(SCHEMA, VARCHAR),
new Procedure.Argument(TABLE_NAME, VARCHAR),
new Procedure.Argument("filter", VARCHAR, false, "TRUE"),
new Procedure.Argument("options", "map(varchar, varchar)", false, null)),
(session, transactionContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergTransactionContext) transactionContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
((transactionContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergTransactionContext) transactionContext, tableHandle, fragments)));
}

private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergTransactionContext transactionContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
Table icebergTable = transactionContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
IcebergTableHandle tableHandle = layoutHandle.getTable();

ConnectorSplitSource splitSource;
if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) {
splitSource = new FixedSplitSource(ImmutableList.of());
}
else {
TupleDomain<IcebergColumnHandle> predicate = layoutHandle.getValidPredicate();
TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(predicate))
.useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get());

Consumer<FileScanTask> fileScanTaskConsumer = (task) -> {
transactionContext.getScannedDataFiles().add(task.file());
if (!task.deletes().isEmpty()) {
task.deletes().forEach(deleteFile -> {
if (deleteFile.content() == FileContent.EQUALITY_DELETES &&
!icebergTable.specs().get(deleteFile.specId()).isPartitioned() &&
!predicate.isAll()) {
// Equality files with an unpartitioned spec are applied as global deletes
// So they should not be cleaned up unless the whole table is optimized
return;
}
transactionContext.getFullyAppliedDeleteFiles().add(deleteFile);
});
}
};

splitSource = new CallDistributedProcedureSplitSource(
session,
tableScan,
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
Optional.of(fileScanTaskConsumer),
getMinimumAssignedSplitWeight(session));
}
transactionContext.setConnectorSplitSource(splitSource);

return new IcebergDistributedProcedureHandle(
tableHandle.getSchemaName(),
tableHandle.getIcebergTableName(),
SchemaParser.toJson(icebergTable.schema()),
PartitionSpecParser.toJson(icebergTable.spec()),
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
getFileFormat(icebergTable),
icebergTable.properties());
}
}

private void finishCallDistributedProcedure(IcebergTransactionContext transactionContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
{
if (fragments.isEmpty() &&
transactionContext.getScannedDataFiles().isEmpty() &&
transactionContext.getFullyAppliedDeleteFiles().isEmpty()) {
return;
}

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle;
Table icebergTable = transactionContext.getTransaction().table();

List<CommitTaskData> commitTasks = fragments.stream()
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());

org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
.map(field -> field.transform().getResultType(
icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);

Set<DataFile> newFiles = new HashSet<>();
for (CommitTaskData task : commitTasks) {
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(handle.getFileFormat().name())
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
newFiles.add(builder.build());
}

RewriteFiles rewriteFiles = transactionContext.getTransaction().newRewrite();
Set<DataFile> scannedDataFiles = transactionContext.getScannedDataFiles();
Set<DeleteFile> fullyAppliedDeleteFiles = transactionContext.getFullyAppliedDeleteFiles();
rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of());

// Table.snapshot method returns null if there is no matching snapshot
Snapshot snapshot = requireNonNull(
handle.getTableName()
.getSnapshotId()
.map(icebergTable::snapshot)
.orElse(null),
"snapshot is null");
if (icebergTable.currentSnapshot() != null) {
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
}
rewriteFiles.commit();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,64 @@ public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(Str
}
}

@Test(dataProvider = "version_and_mode")
public void testMetadataDeleteOnTableAfterWholeRewriteDataFiles(String version, String mode)
{
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
String tableName = "test_rewrite_data_files_table";
try {
// Create a table with partition column `a`, and insert some data under this partition spec
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);

// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);

// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);

// Call procedure rewrite_data_files without filter to rewrite all data files
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch')", 5);

// Then we can do metadata delete on column `c`, because all data files are rewritten under new partition spec
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
}
finally {
dropTable(getSession(), tableName);
}
}

@Test(dataProvider = "version_and_mode")
public void testMetadataDeleteOnTableAfterPartialRewriteDataFiles(String version, String mode)
{
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
String tableName = "test_rewrite_data_files_table";
try {
// Create a table with partition column `a`, and insert some data under this partition spec
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);

// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);

// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);

// Call procedure rewrite_data_files with filter to rewrite data files under the prior partition spec
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'a in (1, 2)')", 2);

// Then we can do metadata delete on column `c`, because all data files are now under new partition spec
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
}
finally {
dropTable(getSession(), tableName);
}
}

@DataProvider(name = "version_and_mode")
public Object[][] versionAndMode()
{
Expand Down
Loading

0 comments on commit 0dc3dbb

Please sign in to comment.