From 0dc3dbbdaa1ecd533eea65459a8ccab4e42a727e Mon Sep 17 00:00:00 2001 From: wangd Date: Wed, 5 Jun 2024 00:11:32 +0800 Subject: [PATCH] Support Iceberg procedure `rewrite_data_files` --- .../src/main/sphinx/connector/iceberg.rst | 40 ++ .../presto/iceberg/IcebergCommonModule.java | 1 + .../iceberg/RewriteDataFilesProcedure.java | 202 +++++++ .../IcebergDistributedSmokeTestBase.java | 58 ++ .../iceberg/IcebergDistributedTestBase.java | 61 ++- .../TestRewriteDataFilesProcedure.java | 509 ++++++++++++++++++ .../iceberg/rest/TestIcebergSmokeRest.java | 28 + 7 files changed, 897 insertions(+), 2 deletions(-) create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java create mode 100644 presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestRewriteDataFilesProcedure.java diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 2ae48f4507673..35f600add0cdc 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -772,6 +772,46 @@ Examples: CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]); +Rewrite Data Files +^^^^^^^^^^^^^^^^^^ + +In Iceberg, we tracks all data files under different partition specs in a table. More data files leads to +more metadata stored in manifest files, and small data files causes an unnecessary amount of metadata and +less efficient queries from file open costs. Besides, data files under different partition specs might +prevent metadata level deletion or predicate thoroughly push down for Presto engine. + +Procedure `rewrite_data_files` is used for rewriting the data files of the specified table so that they are +merged into fewer but larger files under the newest partition spec. If the table is partitioned, the data +files compaction could act separately on the selected partitions. This can improve read performance by reducing +metadata overhead and runtime file open cost. + +The following arguments are available: + +===================== ========== =============== ======================================================================= +Argument Name required type Description +===================== ========== =============== ======================================================================= +``schema`` ✔️ string Schema of the table to update + +``table_name`` ✔️ string Name of the table to update + +``filter`` string predicate as a string used for filtering the files. Note that currently + we only support rewrite whole partitions, that is, the filter should be + on partition columns(defaults to `true`) + +``options`` map Options to be used for data files rewrite(to be expanded) +===================== ========== =============== ======================================================================= + +Examples: + +* Rewrite all the data files in table `db.sample` to the newest partition spec and combine small files to larger ones:: + + CALL iceberg.system.rewrite_data_files('db', 'sample'); + CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample'); + +* Rewrite the data files in partitions specified by a filter in table `db.sample` to the newest partition spec:: + + CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1'); + CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1'); SQL Support ----------- diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index ba6db6eec5dfa..7f23e2b9d8529 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -154,6 +154,7 @@ public void setup(Binder binder) procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON); // for orc binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java new file mode 100644 index 0000000000000..c9a146506a8cd --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/RewriteDataFilesProcedure.java @@ -0,0 +1,202 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.spi.ConnectorDistributedProcedureHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.procedure.DistributedProcedure; +import com.facebook.presto.spi.procedure.Procedure; +import com.google.common.base.VerifyException; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.airlift.slice.Slice; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.TableScanUtil; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; + +import static com.facebook.presto.common.type.StandardTypes.VARCHAR; +import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; +import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; +import static com.facebook.presto.iceberg.IcebergUtil.getColumns; +import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; +import static com.facebook.presto.spi.procedure.DistributedProcedure.SCHEMA; +import static com.facebook.presto.spi.procedure.DistributedProcedure.TABLE_NAME; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +public class RewriteDataFilesProcedure + implements Provider +{ + TypeManager typeManager; + JsonCodec commitTaskCodec; + + @Inject + public RewriteDataFilesProcedure( + TypeManager typeManager, + JsonCodec commitTaskCodec) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); + } + + @Override + public Procedure get() + { + return new DistributedProcedure( + "system", + "rewrite_data_files", + ImmutableList.of( + new Procedure.Argument(SCHEMA, VARCHAR), + new Procedure.Argument(TABLE_NAME, VARCHAR), + new Procedure.Argument("filter", VARCHAR, false, "TRUE"), + new Procedure.Argument("options", "map(varchar, varchar)", false, null)), + (session, transactionContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergTransactionContext) transactionContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments), + ((transactionContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergTransactionContext) transactionContext, tableHandle, fragments))); + } + + private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergTransactionContext transactionContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + Table icebergTable = transactionContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + IcebergTableHandle tableHandle = layoutHandle.getTable(); + + ConnectorSplitSource splitSource; + if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent()) { + splitSource = new FixedSplitSource(ImmutableList.of()); + } + else { + TupleDomain predicate = layoutHandle.getValidPredicate(); + TableScan tableScan = icebergTable.newScan() + .filter(toIcebergExpression(predicate)) + .useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get()); + + Consumer fileScanTaskConsumer = (task) -> { + transactionContext.getScannedDataFiles().add(task.file()); + if (!task.deletes().isEmpty()) { + task.deletes().forEach(deleteFile -> { + if (deleteFile.content() == FileContent.EQUALITY_DELETES && + !icebergTable.specs().get(deleteFile.specId()).isPartitioned() && + !predicate.isAll()) { + // Equality files with an unpartitioned spec are applied as global deletes + // So they should not be cleaned up unless the whole table is optimized + return; + } + transactionContext.getFullyAppliedDeleteFiles().add(deleteFile); + }); + } + }; + + splitSource = new CallDistributedProcedureSplitSource( + session, + tableScan, + TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()), + Optional.of(fileScanTaskConsumer), + getMinimumAssignedSplitWeight(session)); + } + transactionContext.setConnectorSplitSource(splitSource); + + return new IcebergDistributedProcedureHandle( + tableHandle.getSchemaName(), + tableHandle.getIcebergTableName(), + SchemaParser.toJson(icebergTable.schema()), + PartitionSpecParser.toJson(icebergTable.spec()), + getColumns(icebergTable.schema(), icebergTable.spec(), typeManager), + icebergTable.location(), + getFileFormat(icebergTable), + icebergTable.properties()); + } + } + + private void finishCallDistributedProcedure(IcebergTransactionContext transactionContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments) + { + if (fragments.isEmpty() && + transactionContext.getScannedDataFiles().isEmpty() && + transactionContext.getFullyAppliedDeleteFiles().isEmpty()) { + return; + } + + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle; + Table icebergTable = transactionContext.getTransaction().table(); + + List commitTasks = fragments.stream() + .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) + .collect(toImmutableList()); + + org.apache.iceberg.types.Type[] partitionColumnTypes = icebergTable.spec().fields().stream() + .map(field -> field.transform().getResultType( + icebergTable.schema().findType(field.sourceId()))) + .toArray(Type[]::new); + + Set newFiles = new HashSet<>(); + for (CommitTaskData task : commitTasks) { + DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) + .withPath(task.getPath()) + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withFormat(handle.getFileFormat().name()) + .withMetrics(task.getMetrics().metrics()); + + if (!icebergTable.spec().fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + } + newFiles.add(builder.build()); + } + + RewriteFiles rewriteFiles = transactionContext.getTransaction().newRewrite(); + Set scannedDataFiles = transactionContext.getScannedDataFiles(); + Set fullyAppliedDeleteFiles = transactionContext.getFullyAppliedDeleteFiles(); + rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of()); + + // Table.snapshot method returns null if there is no matching snapshot + Snapshot snapshot = requireNonNull( + handle.getTableName() + .getSnapshotId() + .map(icebergTable::snapshot) + .orElse(null), + "snapshot is null"); + if (icebergTable.currentSnapshot() != null) { + rewriteFiles.validateFromSnapshot(snapshot.snapshotId()); + } + rewriteFiles.commit(); + } + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index 3249a6e1cb16f..21600cc2d8bb9 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -1641,6 +1641,64 @@ public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(Str } } + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableAfterWholeRewriteDataFiles(String version, String mode) + { + String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*"; + String tableName = "test_rewrite_data_files_table"; + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + + // Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + // Do not support metadata delete with filter on column `c`, because we have data with old partition spec + assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage); + + // Call procedure rewrite_data_files without filter to rewrite all data files + assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch')", 5); + + // Then we can do metadata delete on column `c`, because all data files are rewritten under new partition spec + assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)"); + } + finally { + dropTable(getSession(), tableName); + } + } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableAfterPartialRewriteDataFiles(String version, String mode) + { + String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*"; + String tableName = "test_rewrite_data_files_table"; + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + + // Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + // Do not support metadata delete with filter on column `c`, because we have data with old partition spec + assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage); + + // Call procedure rewrite_data_files with filter to rewrite data files under the prior partition spec + assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'a in (1, 2)')", 2); + + // Then we can do metadata delete on column `c`, because all data files are now under new partition spec + assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3)"); + } + finally { + dropTable(getSession(), tableName); + } + } + @DataProvider(name = "version_and_mode") public Object[][] versionAndMode() { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 08229fc536b75..c94a20d07fd98 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -1575,6 +1575,63 @@ public void testMetadataDeleteOnV2MorTableWithUnsupportedSpecsWhoseDataAllDelete } } + @Test + public void testMetadataDeleteOnV2MorTableWithRewriteDataFiles() + { + String tableName = "test_rewrite_data_files_table"; + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '2', delete_mode = 'merge-on-read')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + assertUpdate("DELETE FROM " + tableName + " WHERE a = 1", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002')"); + + Table icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 1); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 1); + + // Evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 4); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 1); + + // Execute row level delete with filter on column `b` + assertUpdate("DELETE FROM " + tableName + " WHERE b = '1004'", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', 3), (5, '1005', 5)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 4); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 2); + + assertQueryFails("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'a > 3')", ".*"); + assertQueryFails("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'c > 3')", ".*"); + + assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch')", 3); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', 3), (5, '1005', 5)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 3); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + // Do metadata delete on column `a`, because all partition specs contains partition column `a` + assertUpdate("DELETE FROM " + tableName + " WHERE c = 5", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', 3)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 2); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + assertUpdate("call system.rewrite_data_files(table_name => '" + tableName + "', schema => 'tpch', filter => 'c > 2')", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', NULL), (3, '1003', 3)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 2); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List expectedFileContent) { // check delete file list @@ -1718,14 +1775,14 @@ private void testWithAllFileFormats(Session session, BiConsumer map = snapshot.summary(); int totalDataFiles = Integer.valueOf(map.get(TOTAL_DATA_FILES_PROP)); assertEquals(totalDataFiles, dataFilesCount); } - private void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount) + protected void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount) { Map map = snapshot.summary(); int totalDeleteFiles = Integer.valueOf(map.get(TOTAL_DELETE_FILES_PROP)); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestRewriteDataFilesProcedure.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestRewriteDataFilesProcedure.java new file mode 100644 index 0000000000000..c98af75b1184e --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestRewriteDataFilesProcedure.java @@ -0,0 +1,509 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.CloseableIterator; +import org.testng.annotations.Test; + +import java.io.File; +import java.nio.file.Path; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.FileFormat.PARQUET; +import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static java.lang.String.format; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; +import static org.apache.iceberg.expressions.Expressions.alwaysTrue; +import static org.testng.Assert.assertEquals; + +public class TestRewriteDataFilesProcedure + extends AbstractTestQueryFramework +{ + public static final String ICEBERG_CATALOG = "test_hive"; + public static final String TEST_SCHEMA = "tpch"; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return createIcebergQueryRunner( + ImmutableMap.of(), + ImmutableMap.of("iceberg.catalog.type", HADOOP.name()), + PARQUET, + false, + false, + OptionalInt.of(1), + Optional.empty()); + } + + public void dropTable(String tableName) + { + assertQuerySucceeds("DROP TABLE IF EXISTS iceberg." + TEST_SCHEMA + "." + tableName); + } + + @Test + public void testRewriteDataFilesInEmptyTable() + { + String tableName = "default_empty_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer)"); + assertUpdate(format("CALL system.rewrite_data_files('%s', '%s')", TEST_SCHEMA, tableName), 0); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesOnPartitionTable() + { + String tableName = "example_partition_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar) with (partitioning = ARRAY['c2'])"); + + // create 5 files for each partition (c2 = 'foo' and c2 = 'bar') + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 10,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 10); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 10, 0); + + assertUpdate("DELETE from " + tableName + " WHERE c1 = 7", 1); + assertUpdate("DELETE from " + tableName + " WHERE c1 in (8, 10)", 2); + + table.refresh(); + assertHasSize(table.snapshots(), 7); + //The number of data files is 10,and the number of delete files is 3 + assertHasDataFiles(table.currentSnapshot(), 10); + assertHasDeleteFiles(table.currentSnapshot(), 3); + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(9, 'foo')"); + + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, TEST_SCHEMA), 7); + + table.refresh(); + assertHasSize(table.snapshots(), 8); + //The number of data files is 2,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 2); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .filter(alwaysTrue()) + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 2, 0); + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(9, 'foo')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesOnNonPartitionTable() + { + String tableName = "example_non_partition_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar)"); + + // create 5 files + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 5,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 5, 0); + + assertUpdate("DELETE from " + tableName + " WHERE c1 = 7", 1); + assertUpdate("DELETE from " + tableName + " WHERE c1 in (9, 10)", 2); + + table.refresh(); + assertHasSize(table.snapshots(), 7); + //The number of data files is 5,and the number of delete files is 2 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 2); + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(8, 'bar')"); + + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, TEST_SCHEMA), 7); + + table.refresh(); + assertHasSize(table.snapshots(), 8); + //The number of data files is 1,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 1); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .filter(alwaysTrue()) + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 1, 0); + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(8, 'bar')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesWithFilter() + { + String tableName = "example_partition_filter_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar) with (partitioning = ARRAY['c2'])"); + + // create 5 files for each partition (c2 = 'foo' and c2 = 'bar') + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 10,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 10); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 10, 0); + + // do not support rewrite files filtered by non-identity columns + assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, TEST_SCHEMA), ".*"); + + // select 5 files to rewrite + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c2 = ''bar''')", tableName, TEST_SCHEMA), 5); + table.refresh(); + assertHasSize(table.snapshots(), 6); + //The number of data files is 6,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 6); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 6, 0); + + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(7, 'foo'), (8, 'bar'), " + + "(9, 'foo'), (10, 'bar')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesWithDeterministicTrueFilter() + { + String tableName = "example_non_partition_true_filter_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar)"); + + // create 5 files + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 5,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 5, 0); + + // do not support rewrite files filtered by non-identity columns + assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, TEST_SCHEMA), ".*"); + + // the filter is `true` means select all files to rewrite + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => '1 = 1')", tableName, TEST_SCHEMA), 10); + + table.refresh(); + assertHasSize(table.snapshots(), 6); + //The number of data files is 1,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 1); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 1, 0); + + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(7, 'foo'), (8, 'bar'), " + + "(9, 'foo'), (10, 'bar')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesWithDeterministicFalseFilter() + { + String tableName = "example_non_partition_false_filter_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar)"); + + // create 5 files + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); + assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 5); + //The number of data files is 5,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 5, 0); + + // the filter is `false` means select no file to rewrite + assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => '1 = 0')", tableName, TEST_SCHEMA), 0); + + table.refresh(); + assertHasSize(table.snapshots(), 5); + //The number of data files is still 5,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 5); + assertHasDeleteFiles(table.currentSnapshot(), 0); + fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 5, 0); + + assertQuery("select * from " + tableName, + "values(1, 'foo'), (2, 'bar'), " + + "(3, 'foo'), (4, 'bar'), " + + "(5, 'foo'), (6, 'bar'), " + + "(7, 'foo'), (8, 'bar'), " + + "(9, 'foo'), (10, 'bar')"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testRewriteDataFilesWithDeleteAndPartitionEvolution() + { + String tableName = "example_partition_evolution_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (a int, b varchar)"); + assertUpdate("INSERT INTO " + tableName + " values(1, '1001'), (2, '1002')", 2); + assertUpdate("DELETE FROM " + tableName + " WHERE a = 1", 1); + assertQuery("select * from " + tableName, "values(2, '1002')"); + + Table table = loadTable(tableName); + assertHasSize(table.snapshots(), 2); + //The number of data files is 1,and the number of delete files is 1 + assertHasDataFiles(table.currentSnapshot(), 1); + assertHasDeleteFiles(table.currentSnapshot(), 1); + + assertUpdate("alter table " + tableName + " add column c int with (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " values(5, '1005', 5), (6, '1006', 6), (7, '1007', 7)", 3); + assertUpdate("DELETE FROM " + tableName + " WHERE b = '1006'", 1); + assertQuery("select * from " + tableName, "values(2, '1002', NULL), (5, '1005', 5), (7, '1007', 7)"); + + table.refresh(); + assertHasSize(table.snapshots(), 4); + //The number of data files is 4,and the number of delete files is 2 + assertHasDataFiles(table.currentSnapshot(), 4); + assertHasDeleteFiles(table.currentSnapshot(), 2); + + assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'a > 3')", tableName, TEST_SCHEMA), ".*"); + assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c > 3')", tableName, TEST_SCHEMA), ".*"); + + assertUpdate(format("call system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, TEST_SCHEMA), 3); + table.refresh(); + assertHasSize(table.snapshots(), 5); + //The number of data files is 3,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 3); + assertHasDeleteFiles(table.currentSnapshot(), 0); + CloseableIterator fileScanTasks = table.newScan() + .useSnapshot(table.currentSnapshot().snapshotId()) + .planFiles().iterator(); + assertFilesPlan(fileScanTasks, 3, 0); + assertQuery("select * from " + tableName, "values(2, '1002', NULL), (5, '1005', 5), (7, '1007', 7)"); + + assertUpdate("delete from " + tableName + " where b = '1002'", 1); + table.refresh(); + assertHasSize(table.snapshots(), 6); + //The number of data files is 3,and the number of delete files is 1 + assertHasDataFiles(table.currentSnapshot(), 3); + assertHasDeleteFiles(table.currentSnapshot(), 1); + assertUpdate(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c is null')", tableName, TEST_SCHEMA), 0); + + table.refresh(); + assertHasSize(table.snapshots(), 7); + //The number of data files is 2,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 2); + assertHasDeleteFiles(table.currentSnapshot(), 0); + assertQuery("select * from " + tableName, "values(5, '1005', 5), (7, '1007', 7)"); + + // This is a metadata delete + assertUpdate("delete from " + tableName + " where c = 7", 1); + table.refresh(); + assertHasSize(table.snapshots(), 8); + //The number of data files is 1,and the number of delete files is 0 + assertHasDataFiles(table.currentSnapshot(), 1); + assertHasDeleteFiles(table.currentSnapshot(), 0); + assertQuery("select * from " + tableName, "values(5, '1005', 5)"); + } + finally { + dropTable(tableName); + } + } + + @Test + public void testInvalidParameterCases() + { + String tableName = "invalid_parameter_table"; + try { + assertUpdate("CREATE TABLE " + tableName + " (a int, b varchar, c int)"); + assertQueryFails("CALL system.rewrite_data_files('n', table_name => 't')", ".*Named and positional arguments cannot be mixed"); + assertQueryFails("CALL custom.rewrite_data_files('n', 't')", "Procedure not registered: custom.rewrite_data_files"); + assertQueryFails("CALL system.rewrite_data_files()", ".*Required procedure argument 'schema' is missing"); + assertQueryFails("CALL system.rewrite_data_files('s', 'n')", "Schema s does not exist"); + assertQueryFails("CALL system.rewrite_data_files('', '')", "Table name is empty"); + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => '''hello''')", tableName, TEST_SCHEMA), ".*WHERE clause must evaluate to a boolean: actual type varchar\\(5\\)"); + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => '1001')", tableName, TEST_SCHEMA), ".*WHERE clause must evaluate to a boolean: actual type integer"); + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'a')", tableName, TEST_SCHEMA), ".*WHERE clause must evaluate to a boolean: actual type integer"); + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'n')", tableName, TEST_SCHEMA), ".*Column 'n' cannot be resolved"); + } + finally { + dropTable(tableName); + } + } + + private Table loadTable(String tableName) + { + Catalog catalog = CatalogUtil.loadCatalog(HadoopCatalog.class.getName(), ICEBERG_CATALOG, getProperties(), new Configuration()); + return catalog.loadTable(TableIdentifier.of(TEST_SCHEMA, tableName)); + } + + private Map getProperties() + { + File metastoreDir = getCatalogDirectory(); + return ImmutableMap.of("warehouse", metastoreDir.toString()); + } + + private File getCatalogDirectory() + { + Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + return dataDirectory.toFile(); + } + + private void assertHasSize(Iterable iterable, int size) + { + AtomicInteger count = new AtomicInteger(0); + iterable.forEach(obj -> count.incrementAndGet()); + assertEquals(count.get(), size); + } + + private void assertHasDataFiles(Snapshot snapshot, int dataFilesCount) + { + Map map = snapshot.summary(); + int totalDataFiles = Integer.valueOf(map.get(TOTAL_DATA_FILES_PROP)); + assertEquals(totalDataFiles, dataFilesCount); + } + + private void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount) + { + Map map = snapshot.summary(); + int totalDeleteFiles = Integer.valueOf(map.get(TOTAL_DELETE_FILES_PROP)); + assertEquals(totalDeleteFiles, deleteFilesCount); + } + + private void assertFilesPlan(CloseableIterator iterator, int dataFileCount, int deleteFileCount) + { + AtomicInteger dataCount = new AtomicInteger(0); + AtomicInteger deleteCount = new AtomicInteger(0); + while (iterator.hasNext()) { + FileScanTask fileScanTask = iterator.next(); + dataCount.incrementAndGet(); + deleteCount.addAndGet(fileScanTask.deletes().size()); + } + assertEquals(dataCount.get(), dataFileCount); + assertEquals(deleteCount.get(), deleteFileCount); + + try { + iterator.close(); + iterator = CloseableIterator.empty(); + } + catch (Exception e) { + // do nothing + } + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java index 65c34bc31bbee..620ddd92bd1d2 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java @@ -192,4 +192,32 @@ public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(Str super.testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(version, mode); } } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableAfterWholeRewriteDataFiles(String version, String mode) + { + if (version.equals("1")) { + // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) + assertThatThrownBy(() -> super.testMetadataDeleteOnTableAfterWholeRewriteDataFiles(version, mode)) + .isInstanceOf(RuntimeException.class); + } + else { + // v2 succeeds + super.testMetadataDeleteOnTableAfterWholeRewriteDataFiles(version, mode); + } + } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableAfterPartialRewriteDataFiles(String version, String mode) + { + if (version.equals("1")) { + // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) + assertThatThrownBy(() -> super.testMetadataDeleteOnTableAfterPartialRewriteDataFiles(version, mode)) + .isInstanceOf(RuntimeException.class); + } + else { + // v2 succeeds + super.testMetadataDeleteOnTableAfterPartialRewriteDataFiles(version, mode); + } + } }