Skip to content

Commit

Permalink
Support Iceberg procedure rewrite_data_files
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Nov 3, 2024
1 parent 06fd562 commit f756641
Show file tree
Hide file tree
Showing 7 changed files with 901 additions and 2 deletions.
41 changes: 41 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,47 @@ Examples:

CALL iceberg.system.set_table_property('schema_name', 'table_name', 'commit.retry.num-retries', '10');

Rewrite Data Files
^^^^^^^^^^^^^^^^^^

Iceberg tracks all data files under different partition specs in a table. More data files requires
more metadata to be stored in manifest files, and small data files can cause unnecessary amount metadata and
less efficient queries from file open costs. Also, data files under different partition specs can
prevent metadata level deletion or thorough predicate push down for Presto.

Use `rewrite_data_files` to rewrite the data files of a specified table so that they are
merged into fewer but larger files under the newest partition spec. If the table is partitioned, the data
files compaction can act separately on the selected partitions to improve read performance by reducing
metadata overhead and runtime file open cost.

The following arguments are available:

===================== ========== =============== =======================================================================
Argument Name required type Description
===================== ========== =============== =======================================================================
``schema`` ✔️ string Schema of the table to update.

``table_name`` ✔️ string Name of the table to update.

``filter`` string Predicate as a string used for filtering the files. Currently
only rewrite of whole partitions is supported. Filter on partition
columns. The default value is `true`.

``options`` map Options to be used for data files rewrite. (to be expanded)
===================== ========== =============== =======================================================================

Examples:

* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones::

CALL iceberg.system.rewrite_data_files('db', 'sample');
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample');

* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec::

CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1');
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1');

SQL Support
-----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public void setup(Binder binder)
procedures.addBinding().toProvider(FastForwardBranchProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(SetCurrentSnapshotProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(SetTablePropertyProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);

// for orc
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.spi.ConnectorDistributedProcedureHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.procedure.DistributedProcedure;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.TableScanUtil;

import javax.inject.Inject;
import javax.inject.Provider;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;

import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA;
import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class RewriteDataFilesProcedure
implements Provider<Procedure>
{
TypeManager typeManager;
JsonCodec<CommitTaskData> commitTaskCodec;

@Inject
public RewriteDataFilesProcedure(
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
}

@Override
public Procedure get()
{
return new DistributedProcedure(
"system",
"rewrite_data_files",
ImmutableList.of(
new Procedure.Argument(SCHEMA, VARCHAR),
new Procedure.Argument(TABLE_NAME, VARCHAR),
new Procedure.Argument("filter", VARCHAR, false, "TRUE"),
new Procedure.Argument("options", "map(varchar, varchar)", false, null)),
(session, transactionContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergTransactionContext) transactionContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments),
((transactionContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergTransactionContext) transactionContext, tableHandle, fragments)));
}

private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergTransactionContext transactionContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
Table icebergTable = transactionContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
IcebergTableHandle tableHandle = layoutHandle.getTable();

ConnectorSplitSource splitSource;
if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) {
splitSource = new FixedSplitSource(ImmutableList.of());
}
else {
TupleDomain<IcebergColumnHandle> predicate = layoutHandle.getValidPredicate();
TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(predicate))
.useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get());

Consumer<FileScanTask> fileScanTaskConsumer = (task) -> {
transactionContext.getScannedDataFiles().add(task.file());
if (!task.deletes().isEmpty()) {
task.deletes().forEach(deleteFile -> {
if (deleteFile.content() == FileContent.EQUALITY_DELETES &&
!icebergTable.specs().get(deleteFile.specId()).isPartitioned() &&
!predicate.isAll()) {
// Equality files with an unpartitioned spec are applied as global deletes
// So they should not be cleaned up unless the whole table is optimized
return;
}
transactionContext.getFullyAppliedDeleteFiles().add(deleteFile);
});
}
};

splitSource = new CallDistributedProcedureSplitSource(
session,
tableScan,
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
Optional.of(fileScanTaskConsumer),
getMinimumAssignedSplitWeight(session));
}
transactionContext.setConnectorSplitSource(splitSource);

return new IcebergDistributedProcedureHandle(
tableHandle.getSchemaName(),
tableHandle.getIcebergTableName(),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
getFileFormat(icebergTable),
getCompressionCodec(session),
icebergTable.properties());
}
}

private void finishCallDistributedProcedure(IcebergTransactionContext transactionContext, ConnectorDistributedProcedureHandle procedureHandle, Collection<Slice> fragments)
{
if (fragments.isEmpty() &&
transactionContext.getScannedDataFiles().isEmpty() &&
transactionContext.getFullyAppliedDeleteFiles().isEmpty()) {
return;
}

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle;
Table icebergTable = transactionContext.getTransaction().table();

List<CommitTaskData> commitTasks = fragments.stream()
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());

org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
.map(field -> field.transform().getResultType(
icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);

Set<DataFile> newFiles = new HashSet<>();
for (CommitTaskData task : commitTasks) {
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(handle.getFileFormat().name())
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
newFiles.add(builder.build());
}

RewriteFiles rewriteFiles = transactionContext.getTransaction().newRewrite();
Set<DataFile> scannedDataFiles = transactionContext.getScannedDataFiles();
Set<DeleteFile> fullyAppliedDeleteFiles = transactionContext.getFullyAppliedDeleteFiles();
rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of());

// Table.snapshot method returns null if there is no matching snapshot
Snapshot snapshot = requireNonNull(
handle.getTableName()
.getSnapshotId()
.map(icebergTable::snapshot)
.orElse(null),
"snapshot is null");
if (icebergTable.currentSnapshot() != null) {
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
}
rewriteFiles.commit();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,64 @@ public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(Str
}
}

@Test(dataProvider = "version_and_mode")
public void testMetadataDeleteOnTableAfterWholeRewriteDataFiles(String version, String mode)
{
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
String tableName = "test_rewrite_data_files_table";
try {
// Create a table with partition column `a`, and insert some data under this partition spec
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);

// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);

// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);

// Call procedure rewrite_data_files without filter to rewrite all data files
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch')", 5);

// Then we can do metadata delete on column `c`, because all data files are rewritten under new partition spec
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
}
finally {
dropTable(getSession(), tableName);
}
}

@Test(dataProvider = "version_and_mode")
public void testMetadataDeleteOnTableAfterPartialRewriteDataFiles(String version, String mode)
{
String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*";
String tableName = "test_rewrite_data_files_table";
try {
// Create a table with partition column `a`, and insert some data under this partition spec
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);

// Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);

// Do not support metadata delete with filter on column `c`, because we have data with old partition spec
assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage);

// Call procedure rewrite_data_files with filter to rewrite data files under the prior partition spec
assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'a in (1, 2)')", 2);

// Then we can do metadata delete on column `c`, because all data files are now under new partition spec
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)");
}
finally {
dropTable(getSession(), tableName);
}
}

@DataProvider(name = "version_and_mode")
public Object[][] versionAndMode()
{
Expand Down
Loading

0 comments on commit f756641

Please sign in to comment.