From 6599d70bd6ded77dea7f2bc36b3a6f05c7f82309 Mon Sep 17 00:00:00 2001 From: Vamshi Gudavarthi Date: Sat, 18 Nov 2023 20:53:06 -0800 Subject: [PATCH] Changes' --- .../test/java/io/onetable/GenericTable.java | 33 ++++ .../java/io/onetable/ITOneTableClient.java | 142 ++++++++++++++++++ .../io/onetable/TestAbstractHudiTable.java | 2 +- .../io/onetable/TestCommonTableHelper.java | 70 +++++++++ .../java/io/onetable/TestIcebergTable.java | 28 +++- .../java/io/onetable/TestJavaHudiTable.java | 43 ++---- .../java/io/onetable/TestSparkDeltaTable.java | 27 +++- .../java/io/onetable/TestSparkHudiTable.java | 33 ++-- .../onetable/delta/ITDeltaSourceClient.java | 39 ++++- .../io/onetable/delta/TestDeltaHelper.java | 55 ++++--- .../delta/TestDeltaPartitionExtractor.java | 29 ++-- .../io/onetable/hudi/ITHudiTargetClient.java | 2 +- .../iceberg/TestIcebergDataHelper.java | 18 ++- .../TestIcebergPartitionValueConverter.java | 4 +- .../test/resources/schemas/common_schema.avsc | 62 ++++++++ 15 files changed, 474 insertions(+), 113 deletions(-) create mode 100644 core/src/test/java/io/onetable/TestCommonTableHelper.java create mode 100644 core/src/test/resources/schemas/common_schema.avsc diff --git a/core/src/test/java/io/onetable/GenericTable.java b/core/src/test/java/io/onetable/GenericTable.java index 3eda7c48..0e7fa56b 100644 --- a/core/src/test/java/io/onetable/GenericTable.java +++ b/core/src/test/java/io/onetable/GenericTable.java @@ -125,6 +125,39 @@ static GenericTable getInstanceWithCustomPartitionConfig( } } + static GenericTable getInstanceWithCommonSchema( + String tableName, + Path tempDir, + SparkSession sparkSession, + JavaSparkContext jsc, + TableFormat sourceFormat) { + switch (sourceFormat) { + case HUDI: + return TestSparkHudiTable.forGivenSchemaAndPartitioning( + tableName, + tempDir, + jsc, + TestCommonTableHelper.getCommonSchemaInAvroFormat(), + TestCommonTableHelper.getHudiPartitionConfig()); + case DELTA: + return TestSparkDeltaTable.forGivenSchemaAndPartitioning( + tableName, + tempDir, + sparkSession, + TestCommonTableHelper.getCommonSchemaInStructFormat(), + TestCommonTableHelper.getDeltaPartitionField()); + case ICEBERG: + return TestIcebergTable.forGivenSchemaAndPartitioning( + tableName, + tempDir, + jsc.hadoopConfiguration(), + TestCommonTableHelper.getCommonSchemaInIcebergFormat(), + TestCommonTableHelper.getIcebergPartitionField()); + default: + throw new IllegalArgumentException("Unsupported source format: " + sourceFormat); + } + } + static String getTableName() { return "test_table_" + UUID.randomUUID().toString().replaceAll("-", "_"); } diff --git a/core/src/test/java/io/onetable/ITOneTableClient.java b/core/src/test/java/io/onetable/ITOneTableClient.java index 7609cf85..0ca9d976 100644 --- a/core/src/test/java/io/onetable/ITOneTableClient.java +++ b/core/src/test/java/io/onetable/ITOneTableClient.java @@ -670,6 +670,148 @@ public void testMetadataRetention() throws Exception { } } + @Test + public void roundTripTesting() { + SourceClientProvider hudiSourceClientProvider = getSourceClientProvider(TableFormat.HUDI); + SourceClientProvider icebergSourceClientProvider = + getSourceClientProvider(TableFormat.ICEBERG); + SourceClientProvider deltaSourceClientProvider = getSourceClientProvider(TableFormat.DELTA); + String onetablePartitionConfig = "level:VALUE"; + SyncMode syncMode = SyncMode.INCREMENTAL; + + // Create table in hudi and sync to iceberg and delta. + String tableName = getTableName(); + OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration()); + List insertRecords; + try (GenericTable table = + GenericTable.getInstanceWithCommonSchema( + tableName, tempDir, sparkSession, jsc, TableFormat.HUDI)) { + insertRecords = table.insertRows(100); + + PerTableConfig perTableConfig = + PerTableConfig.builder() + .tableName(tableName) + .targetTableFormats(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA)) + .tableBasePath(table.getBasePath()) + .tableDataPath(table.getDataPath()) + .hudiSourceConfig( + HudiSourceConfig.builder() + .partitionFieldSpecConfig(onetablePartitionConfig) + .build()) + .syncMode(syncMode) + .build(); + oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + checkDatasetEquivalence( + TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 100); + + // make multiple commits and then sync + table.insertRows(100); + table.upsertRows(insertRecords.subList(0, 20)); + oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + checkDatasetEquivalence( + TableFormat.HUDI, table, Arrays.asList(TableFormat.DELTA, TableFormat.ICEBERG), 200); + } + + // Now Get the table as Delta and do Sync to Hudi and Iceberg. + try (GenericTable table = + GenericTable.getInstanceWithCommonSchema( + tableName, tempDir, sparkSession, jsc, TableFormat.DELTA)) { + PerTableConfig perTableConfig = + PerTableConfig.builder() + .tableName(tableName) + .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG)) + .tableBasePath(table.getBasePath()) + .tableDataPath(table.getDataPath()) + .hudiSourceConfig( + HudiSourceConfig.builder() + .partitionFieldSpecConfig(onetablePartitionConfig) + .build()) + .syncMode(syncMode) + .build(); + + table.insertRows(100); + oneTableClient.sync(perTableConfig, deltaSourceClientProvider); + checkDatasetEquivalence( + TableFormat.DELTA, table, Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG), 300); + + table.insertRows(100); + oneTableClient.sync(perTableConfig, deltaSourceClientProvider); + checkDatasetEquivalence( + TableFormat.DELTA, table, Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG), 400); + + table.upsertRows(insertRecords.subList(0, 20)); + oneTableClient.sync(perTableConfig, deltaSourceClientProvider); + checkDatasetEquivalence( + TableFormat.DELTA, table, Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG), 400); + } + + // Get table as Iceberg and do Sync to Hudi and Delta. + try (GenericTable table = + GenericTable.getInstanceWithCommonSchema( + tableName, tempDir, sparkSession, jsc, TableFormat.ICEBERG)) { + PerTableConfig perTableConfig = + PerTableConfig.builder() + .tableName(tableName) + .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.DELTA)) + .tableBasePath(table.getBasePath()) + .tableDataPath(table.getDataPath()) + .hudiSourceConfig( + HudiSourceConfig.builder() + .partitionFieldSpecConfig(onetablePartitionConfig) + .build()) + .syncMode(syncMode) + .build(); + + table.insertRows(100); + oneTableClient.sync(perTableConfig, icebergSourceClientProvider); + checkDatasetEquivalence( + TableFormat.ICEBERG, table, Arrays.asList(TableFormat.HUDI, TableFormat.DELTA), 500); + + table.insertRows(100); + oneTableClient.sync(perTableConfig, icebergSourceClientProvider); + checkDatasetEquivalence( + TableFormat.ICEBERG, table, Arrays.asList(TableFormat.HUDI, TableFormat.DELTA), 600); + + table.upsertRows(insertRecords.subList(0, 20)); + oneTableClient.sync(perTableConfig, icebergSourceClientProvider); + checkDatasetEquivalence( + TableFormat.ICEBERG, table, Arrays.asList(TableFormat.HUDI, TableFormat.DELTA), 600); + } + + // Get one last time, table as Hudi and do Sync to Iceberg and Delta. + try (GenericTable table = + GenericTable.getInstanceWithCommonSchema( + tableName, tempDir, sparkSession, jsc, TableFormat.HUDI)) { + PerTableConfig perTableConfig = + PerTableConfig.builder() + .tableName(tableName) + .targetTableFormats(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA)) + .tableBasePath(table.getBasePath()) + .tableDataPath(table.getDataPath()) + .hudiSourceConfig( + HudiSourceConfig.builder() + .partitionFieldSpecConfig(onetablePartitionConfig) + .build()) + .syncMode(syncMode) + .build(); + + table.insertRows(100); + oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + checkDatasetEquivalence( + TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 700); + + table.insertRows(100); + oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + checkDatasetEquivalence( + TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 800); + + table.upsertRows(insertRecords.subList(0, 20)); + oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + checkDatasetEquivalence( + TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 800); + } + } + private Map getTimeTravelOption(TableFormat tableFormat, Instant time) { Map options = new HashMap<>(); switch (tableFormat) { diff --git a/core/src/test/java/io/onetable/TestAbstractHudiTable.java b/core/src/test/java/io/onetable/TestAbstractHudiTable.java index 8fe5c1be..7e2fce69 100644 --- a/core/src/test/java/io/onetable/TestAbstractHudiTable.java +++ b/core/src/test/java/io/onetable/TestAbstractHudiTable.java @@ -761,6 +761,6 @@ public String getFilterQuery() { @Override public String getOrderByColumn() { - return "_hoodie_record_key"; + return RECORD_KEY_FIELD_NAME; } } diff --git a/core/src/test/java/io/onetable/TestCommonTableHelper.java b/core/src/test/java/io/onetable/TestCommonTableHelper.java new file mode 100644 index 00000000..beb76a91 --- /dev/null +++ b/core/src/test/java/io/onetable/TestCommonTableHelper.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.onetable; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; + +import org.apache.avro.Schema; +import org.apache.spark.sql.types.StructType; + +import io.onetable.avro.AvroSchemaConverter; +import io.onetable.delta.DeltaSchemaExtractor; +import io.onetable.iceberg.IcebergSchemaExtractor; +import io.onetable.model.schema.OneSchema; + +public class TestCommonTableHelper { + private static Schema COMMON_SCHEMA; + private static OneSchema COMMON_ONE_SCHEMA; + + static { + try (InputStream inputStream = + GenericTable.class.getClassLoader().getResourceAsStream("schemas/common_schema.avsc")) { + COMMON_SCHEMA = new Schema.Parser().parse(inputStream); + COMMON_ONE_SCHEMA = AvroSchemaConverter.getInstance().toOneSchema(COMMON_SCHEMA); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + public static Schema getCommonSchemaInAvroFormat() { + return COMMON_SCHEMA; + } + + public static StructType getCommonSchemaInStructFormat() { + return DeltaSchemaExtractor.getInstance().fromOneSchema(COMMON_ONE_SCHEMA); + } + + public static org.apache.iceberg.Schema getCommonSchemaInIcebergFormat() { + return IcebergSchemaExtractor.getInstance().toIceberg(COMMON_ONE_SCHEMA); + } + + public static String getHudiPartitionConfig() { + return "level:SIMPLE"; + } + + public static String getDeltaPartitionField() { + return "level"; + } + + public static String getIcebergPartitionField() { + return "level"; + } +} diff --git a/core/src/test/java/io/onetable/TestIcebergTable.java b/core/src/test/java/io/onetable/TestIcebergTable.java index ada0880d..3d5bf0c4 100644 --- a/core/src/test/java/io/onetable/TestIcebergTable.java +++ b/core/src/test/java/io/onetable/TestIcebergTable.java @@ -18,6 +18,7 @@ package io.onetable; +import static io.onetable.iceberg.TestIcebergDataHelper.DEFAULT_RECORD_KEY_FIELD; import static io.onetable.iceberg.TestIcebergDataHelper.createIcebergDataHelper; import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP; import static org.junit.jupiter.api.Assertions.*; @@ -31,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -70,7 +72,6 @@ @Getter public class TestIcebergTable implements GenericTable { - private static final String DEFAULT_RECORD_KEY_FIELD = "id"; private static final List DEFAULT_PARTITION_FIELDS = Collections.singletonList("level"); private final String tableName; @@ -87,7 +88,7 @@ public static TestIcebergTable forStandardSchemaAndPartitioning( tableName, tempDir, hadoopConf, - DEFAULT_RECORD_KEY_FIELD, + Optional.empty(), Collections.singletonList(partitionField), false); } @@ -98,23 +99,38 @@ public static TestIcebergTable forSchemaWithAdditionalColumnsAndPartitioning( tableName, tempDir, hadoopConf, - DEFAULT_RECORD_KEY_FIELD, + Optional.empty(), Collections.singletonList(partitionField), true); } + public static TestIcebergTable forGivenSchemaAndPartitioning( + String tableName, + Path tempDir, + Configuration hadoopConf, + Schema tableSchema, + String partitionField) { + return new TestIcebergTable( + tableName, + tempDir, + hadoopConf, + Optional.of(tableSchema), + Collections.singletonList(partitionField), + false); + } + public TestIcebergTable( String tableName, Path tempDir, Configuration hadoopConf, - String recordKeyField, + Optional tableSchema, List partitionFields, boolean includeAdditionalColumns) { this.tableName = tableName; this.basePath = tempDir.toUri().toString(); this.icebergDataHelper = createIcebergDataHelper( - recordKeyField, filterNullFields(partitionFields), includeAdditionalColumns); + tableSchema, filterNullFields(partitionFields), includeAdditionalColumns); this.schema = icebergDataHelper.getTableSchema(); PartitionSpec partitionSpec = icebergDataHelper.getPartitionSpec(); @@ -294,7 +310,7 @@ public List getColumnsToSelect() { @Override public String getFilterQuery() { - return String.format("%s > 'aaa'", icebergDataHelper.getRecordKeyField()); + return String.format("%s > 'aaa'", DEFAULT_RECORD_KEY_FIELD); } public Long getLastCommitTimestamp() { diff --git a/core/src/test/java/io/onetable/TestJavaHudiTable.java b/core/src/test/java/io/onetable/TestJavaHudiTable.java index 01307870..8ae179ba 100644 --- a/core/src/test/java/io/onetable/TestJavaHudiTable.java +++ b/core/src/test/java/io/onetable/TestJavaHudiTable.java @@ -36,6 +36,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.HoodieJavaWriteClient; @@ -96,39 +97,23 @@ public static TestJavaHudiTable forStandardSchema( } /** - * Create a test table instance with a schema that has more fields than an instance returned by - * {@link #forStandardSchema(String, Path, String, HoodieTableType)}. Specifically this instance - * will add a top level field, nested field, field within a list, and field within a map to ensure - * schema evolution is properly handled. + * Create a test table instance for general testing with given schema and partitioning(if + * enabled). * - * @param tableName name of the table used in the test, should be unique per test within a shared - * directory - * @param tempDir directory where table will be written, typically a temporary directory that will - * be cleaned up after the tests. - * @param partitionConfig sets the property `hoodie.datasource.write.partitionpath.field` for the - * {@link CustomKeyGenerator}. If null, {@link NonpartitionedKeyGenerator} will be used. - * @param tableType the table type to use (MoR or CoW) - * @return an instance of the class with this configuration + * @param tableName + * @param tempDir + * @param jsc + * @param partitionConfig + * @return */ - public static TestJavaHudiTable withAdditionalColumns( - String tableName, Path tempDir, String partitionConfig, HoodieTableType tableType) { - return new TestJavaHudiTable( - tableName, - addSchemaEvolutionFieldsToBase(BASIC_SCHEMA), - tempDir, - partitionConfig, - tableType, - null); - } - - public static TestJavaHudiTable withAdditionalTopLevelField( + public static TestJavaHudiTable forGivenSchemaAndPartitioning( String tableName, Path tempDir, - String partitionConfig, - HoodieTableType tableType, - Schema previousSchema) { - return new TestJavaHudiTable( - tableName, addTopLevelField(previousSchema), tempDir, partitionConfig, tableType, null); + JavaSparkContext jsc, + Schema tableSchema, + String partitionConfig) { + return withSchema( + tableName, tempDir, partitionConfig, HoodieTableType.COPY_ON_WRITE, tableSchema); } public static TestJavaHudiTable withSchema( diff --git a/core/src/test/java/io/onetable/TestSparkDeltaTable.java b/core/src/test/java/io/onetable/TestSparkDeltaTable.java index 89bc9f2d..793d6647 100644 --- a/core/src/test/java/io/onetable/TestSparkDeltaTable.java +++ b/core/src/test/java/io/onetable/TestSparkDeltaTable.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import lombok.Getter; @@ -39,6 +40,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.delta.DeltaLog; @@ -63,18 +65,31 @@ public class TestSparkDeltaTable implements GenericTable, Closeable public static TestSparkDeltaTable forStandardSchemaAndPartitioning( String tableName, Path tempDir, SparkSession sparkSession, String partitionField) { - return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, false); + return new TestSparkDeltaTable( + tableName, tempDir, sparkSession, Optional.empty(), partitionField, false); } public static TestSparkDeltaTable forSchemaWithAdditionalColumnsAndPartitioning( String tableName, Path tempDir, SparkSession sparkSession, String partitionField) { - return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, true); + return new TestSparkDeltaTable( + tableName, tempDir, sparkSession, Optional.empty(), partitionField, true); + } + + public static TestSparkDeltaTable forGivenSchemaAndPartitioning( + String tableName, + Path tempDir, + SparkSession sparkSession, + StructType tableSchema, + String partitionField) { + return new TestSparkDeltaTable( + tableName, tempDir, sparkSession, Optional.of(tableSchema), partitionField, false); } public TestSparkDeltaTable( String name, Path tempDir, SparkSession sparkSession, + Optional tableSchema, String partitionField, boolean includeAdditionalColumns) { try { @@ -83,7 +98,8 @@ public TestSparkDeltaTable( this.sparkSession = sparkSession; this.partitionField = partitionField; this.includeAdditionalColumns = includeAdditionalColumns; - this.testDeltaHelper = createTestDataHelper(partitionField, includeAdditionalColumns); + this.testDeltaHelper = + createTestDataHelper(tableSchema, partitionField, includeAdditionalColumns); testDeltaHelper.createTable(sparkSession, tableName, basePath); this.deltaLog = DeltaLog.forTable(sparkSession, basePath); this.deltaTable = DeltaTable.forPath(sparkSession, basePath); @@ -118,7 +134,7 @@ public List insertRecordsForSpecialPartition(int numRows) { @Override public String getOrderByColumn() { - return "id"; + return "key"; } @SneakyThrows @@ -199,7 +215,8 @@ public void runVacuum() { } private String initBasePath(Path tempDir, String tableName) throws IOException { - Path basePath = tempDir.resolve(tableName); + // To decouple table name and base path. + Path basePath = tempDir.resolve(tableName + "_v1"); Files.createDirectories(basePath); return basePath.toUri().toString(); } diff --git a/core/src/test/java/io/onetable/TestSparkHudiTable.java b/core/src/test/java/io/onetable/TestSparkHudiTable.java index 205d54c6..4e3e45f5 100644 --- a/core/src/test/java/io/onetable/TestSparkHudiTable.java +++ b/core/src/test/java/io/onetable/TestSparkHudiTable.java @@ -115,34 +115,23 @@ public static TestSparkHudiTable forStandardSchema( } /** - * Create a test table instance with a schema that has more fields than an instance returned by - * {@link #forStandardSchema(String, Path, JavaSparkContext, String, HoodieTableType)}. - * Specifically this instance will add a top level field, nested field, field within a list, and - * field within a map to ensure schema evolution is properly handled. + * Create a test table instance for general testing with given schema and partitioning(if + * enabled). * - * @param tableName name of the table used in the test, should be unique per test within a shared - * directory - * @param tempDir directory where table will be written, typically a temporary directory that will - * be cleaned up after the tests. - * @param jsc the {@link JavaSparkContext} to use when writing data with Hudi - * @param partitionConfig sets the property `hoodie.datasource.write.partitionpath.field` for the - * {@link CustomKeyGenerator}. If null, {@link NonpartitionedKeyGenerator} will be used. - * @param tableType the table type to use (MoR or CoW) - * @return an instance of the class with this configuration + * @param tableName + * @param tempDir + * @param jsc + * @param partitionConfig + * @return */ - public static TestSparkHudiTable withAdditionalColumns( + public static TestSparkHudiTable forGivenSchemaAndPartitioning( String tableName, Path tempDir, JavaSparkContext jsc, - String partitionConfig, - HoodieTableType tableType) { + Schema tableSchema, + String partitionConfig) { return new TestSparkHudiTable( - tableName, - addSchemaEvolutionFieldsToBase(BASIC_SCHEMA), - tempDir, - jsc, - partitionConfig, - tableType); + tableName, tableSchema, tempDir, jsc, partitionConfig, HoodieTableType.COPY_ON_WRITE); } private TestSparkHudiTable( diff --git a/core/src/test/java/io/onetable/delta/ITDeltaSourceClient.java b/core/src/test/java/io/onetable/delta/ITDeltaSourceClient.java index c50ddd66..149468c2 100644 --- a/core/src/test/java/io/onetable/delta/ITDeltaSourceClient.java +++ b/core/src/test/java/io/onetable/delta/ITDeltaSourceClient.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -323,7 +324,12 @@ public void testInsertsUpsertsAndDeletes(boolean isPartitioned) { String tableName = getTableName(); TestSparkDeltaTable testSparkDeltaTable = new TestSparkDeltaTable( - tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false); + tableName, + tempDir, + sparkSession, + Optional.empty(), + isPartitioned ? "yearOfBirth" : null, + false); List> allActiveFiles = new ArrayList<>(); List allTableChanges = new ArrayList<>(); List rows = testSparkDeltaTable.insertRows(50); @@ -378,7 +384,12 @@ public void testsShowingVacuumHasNoEffectOnIncrementalSync() { String tableName = getTableName(); TestSparkDeltaTable testSparkDeltaTable = new TestSparkDeltaTable( - tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false); + tableName, + tempDir, + sparkSession, + Optional.empty(), + isPartitioned ? "yearOfBirth" : null, + false); // Insert 50 rows to 2018 partition. List commit1Rows = testSparkDeltaTable.insertRowsForPartition(50, 2018); Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp(); @@ -428,7 +439,12 @@ public void testVacuum(boolean isPartitioned) { String tableName = getTableName(); TestSparkDeltaTable testSparkDeltaTable = new TestSparkDeltaTable( - tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false); + tableName, + tempDir, + sparkSession, + Optional.empty(), + isPartitioned ? "yearOfBirth" : null, + false); List> allActiveFiles = new ArrayList<>(); List allTableChanges = new ArrayList<>(); List rows = testSparkDeltaTable.insertRows(50); @@ -482,7 +498,12 @@ public void testAddColumns(boolean isPartitioned) { String tableName = getTableName(); TestSparkDeltaTable testSparkDeltaTable = new TestSparkDeltaTable( - tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, true); + tableName, + tempDir, + sparkSession, + Optional.empty(), + isPartitioned ? "yearOfBirth" : null, + true); List> allActiveFiles = new ArrayList<>(); List allTableChanges = new ArrayList<>(); List rows = testSparkDeltaTable.insertRows(50); @@ -526,7 +547,8 @@ public void testAddColumns(boolean isPartitioned) { public void testDropPartition() { String tableName = getTableName(); TestSparkDeltaTable testSparkDeltaTable = - new TestSparkDeltaTable(tableName, tempDir, sparkSession, "yearOfBirth", false); + new TestSparkDeltaTable( + tableName, tempDir, sparkSession, Optional.empty(), "yearOfBirth", false); List> allActiveFiles = new ArrayList<>(); List allTableChanges = new ArrayList<>(); @@ -583,7 +605,12 @@ public void testOptimizeAndClustering(boolean isPartitioned) { String tableName = getTableName(); TestSparkDeltaTable testSparkDeltaTable = new TestSparkDeltaTable( - tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false); + tableName, + tempDir, + sparkSession, + Optional.empty(), + isPartitioned ? "yearOfBirth" : null, + false); List> allActiveFiles = new ArrayList<>(); List allTableChanges = new ArrayList<>(); List rows = testSparkDeltaTable.insertRows(50); diff --git a/core/src/test/java/io/onetable/delta/TestDeltaHelper.java b/core/src/test/java/io/onetable/delta/TestDeltaHelper.java index a1d1ad16..934552a9 100644 --- a/core/src/test/java/io/onetable/delta/TestDeltaHelper.java +++ b/core/src/test/java/io/onetable/delta/TestDeltaHelper.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -76,7 +77,7 @@ public class TestDeltaHelper { private static final StructField[] COMMON_FIELDS = new StructField[] { - new StructField("id", IntegerType, false, Metadata.empty()), + new StructField("key", StringType, false, Metadata.empty()), new StructField("firstName", StringType, true, Metadata.empty()), new StructField("lastName", StringType, true, Metadata.empty()), new StructField("gender", StringType, true, Metadata.empty()), @@ -108,7 +109,13 @@ public class TestDeltaHelper { private static final StructField[] ADDITIONAL_FIELDS = new StructField[] {new StructField("street", StringType, true, Metadata.empty())}; private static final StructField[] DATE_PARTITIONED_FIELDS = - new StructField[] {new StructField("yearOfBirth", IntegerType, true, Metadata.empty())}; + new StructField[] { + new StructField( + "yearOfBirth", + IntegerType, + true, + Metadata.fromJson("{\"delta.generationExpression\": \"YEAR(birthDate)\"}")) + }; private static final Random RANDOM = new Random(); private static final String[] GENDERS = {"Male", "Female"}; @@ -118,8 +125,11 @@ public class TestDeltaHelper { boolean includeAdditionalColumns; public static TestDeltaHelper createTestDataHelper( - String partitionField, boolean includeAdditionalColumns) { - StructType tableSchema = generateDynamicSchema(partitionField, includeAdditionalColumns); + Optional tblInputSchema, + String partitionField, + boolean includeAdditionalColumns) { + StructType tableSchema = + tblInputSchema.orElse(generateDynamicSchema(partitionField, includeAdditionalColumns)); return TestDeltaHelper.builder() .tableStructSchema(tableSchema) .partitionField(partitionField) @@ -142,26 +152,31 @@ private static StructType generateDynamicSchema( public void createTable(SparkSession sparkSession, String tableName, String basePath) { DeltaTableBuilder tableBuilder = DeltaTable.createIfNotExists(sparkSession).tableName(tableName).location(basePath); - Arrays.stream(COMMON_FIELDS).forEach(tableBuilder::addColumn); - if ("yearOfBirth".equals(partitionField)) { - tableBuilder - .addColumn( - DeltaTable.columnBuilder("yearOfBirth") - .dataType(IntegerType) - .generatedAlwaysAs("YEAR(birthDate)") - .build()) - .partitionedBy("yearOfBirth"); - } else if ("level".equals(partitionField)) { - tableBuilder.partitionedBy(partitionField); + for (StructField sf : tableStructSchema.fields()) { + tableBuilder = addFieldToTableBuilder(tableBuilder, sf); + } + if ("yearOfBirth".equals(partitionField) || "level".equals(partitionField)) { + tableBuilder = tableBuilder.partitionedBy(partitionField); } else if (partitionField != null) { throw new IllegalArgumentException("Unexpected partition field: " + partitionField); } - if (includeAdditionalColumns) { - tableBuilder.addColumn("street", StringType); - } tableBuilder.execute(); } + private DeltaTableBuilder addFieldToTableBuilder(DeltaTableBuilder tableBuilder, StructField sf) { + if (sf.metadata().contains("delta.generationExpression")) { + String generatedExpression = sf.metadata().getString("delta.generationExpression"); + if (generatedExpression != null) { + return tableBuilder.addColumn( + DeltaTable.columnBuilder(sf.name()) + .dataType(sf.dataType()) + .generatedAlwaysAs(generatedExpression) + .build()); + } + } + return tableBuilder.addColumn(sf.name(), sf.dataType()); + } + public Row generateRandomRow() { int year = 2013 + RANDOM.nextInt(11); String levelValue = LEVEL_VALUES.get(RANDOM.nextInt(LEVEL_VALUES.size())); @@ -186,8 +201,8 @@ private Object[] generateRandomValuesForGivenYearAndLevel(int yearValue, String private Object generateValueForField(StructField field, int yearValue, String levelValue) { switch (field.name()) { - case "id": - return ID_GENERATOR.incrementAndGet(); + case "key": + return generateRandomString(); case "gender": return GENDERS[RANDOM.nextInt(GENDERS.length)]; case "birthDate": diff --git a/core/src/test/java/io/onetable/delta/TestDeltaPartitionExtractor.java b/core/src/test/java/io/onetable/delta/TestDeltaPartitionExtractor.java index 8e9da45e..5379e3c3 100644 --- a/core/src/test/java/io/onetable/delta/TestDeltaPartitionExtractor.java +++ b/core/src/test/java/io/onetable/delta/TestDeltaPartitionExtractor.java @@ -47,7 +47,7 @@ public class TestDeltaPartitionExtractor { private static final Map STRUCT_FIELD_MAP = new HashMap() { { - put("id", DataTypes.createStructField("id", DataTypes.IntegerType, false)); + put("key", DataTypes.createStructField("key", DataTypes.IntegerType, false)); put("firstName", DataTypes.createStructField("firstName", DataTypes.StringType, false)); put("gender", DataTypes.createStructField("gender", DataTypes.StringType, false)); put( @@ -115,7 +115,7 @@ public class TestDeltaPartitionExtractor { @Test public void testUnpartitionedTable() { StructType tableSchema = - getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate")); + getSchemaWithFields(Arrays.asList("key", "firstName", "gender", "birthDate")); OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema); List onePartitionFields = deltaPartitionExtractor.convertFromDeltaPartitionFormat(oneSchema, new StructType()); @@ -125,7 +125,7 @@ public void testUnpartitionedTable() { @Test public void testSimplePartitionedTable() { StructType tableSchema = - getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate")); + getSchemaWithFields(Arrays.asList("key", "firstName", "gender", "birthDate")); StructType partitionSchema = getSchemaWithFields(Arrays.asList("gender")); OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema); List expectedOnePartitionFields = @@ -146,7 +146,8 @@ public void testSimplePartitionedTable() { @Test public void testDatePartitionedGeneratedColumnsTable() { StructType tableSchema = - getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "dateOfBirth")); + getSchemaWithFields( + Arrays.asList("key", "firstName", "gender", "birthDate", "dateOfBirth")); StructType partitionSchema = getSchemaWithFields(Arrays.asList("dateOfBirth")); OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema); List expectedOnePartitionFields = @@ -164,7 +165,7 @@ public void testDatePartitionedGeneratedColumnsTable() { @Test public void testDateFormatPartitionedGeneratedColumnsTable() { StructType tableSchema = - getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "dateFmt")); + getSchemaWithFields(Arrays.asList("key", "firstName", "gender", "birthDate", "dateFmt")); StructType partitionSchema = getSchemaWithFields(Arrays.asList("dateFmt")); OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema); List expectedOnePartitionFields = @@ -182,7 +183,8 @@ public void testDateFormatPartitionedGeneratedColumnsTable() { @Test public void yearPartitionedGeneratedColumnsTable() { StructType tableSchema = - getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "yearOfBirth")); + getSchemaWithFields( + Arrays.asList("key", "firstName", "gender", "birthDate", "yearOfBirth")); StructType partitionSchema = getSchemaWithFields(Arrays.asList("yearOfBirth")); OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema); List expectedOnePartitionFields = @@ -200,8 +202,9 @@ public void yearPartitionedGeneratedColumnsTable() { @Test public void yearAndSimpleCombinedPartitionedGeneratedColumnsTable() { StructType tableSchema = - getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "yearOfBirth")); - StructType partitionSchema = getSchemaWithFields(Arrays.asList("yearOfBirth", "id")); + getSchemaWithFields( + Arrays.asList("key", "firstName", "gender", "birthDate", "yearOfBirth")); + StructType partitionSchema = getSchemaWithFields(Arrays.asList("yearOfBirth", "key")); OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema); List expectedOnePartitionFields = Arrays.asList( @@ -213,7 +216,7 @@ public void yearAndSimpleCombinedPartitionedGeneratedColumnsTable() { OnePartitionField.builder() .sourceField( OneField.builder() - .name("id") + .name("key") .schema(OneSchema.builder().name("integer").dataType(OneType.INT).build()) .build()) .transformType(PartitionTransformType.VALUE) @@ -228,7 +231,7 @@ public void yearMonthDayHourPartitionedGeneratedColumnsTable() { StructType tableSchema = getSchemaWithFields( Arrays.asList( - "id", + "key", "firstName", "gender", "birthDate", @@ -257,16 +260,16 @@ public void yearMonthDayHourPartitionedGeneratedColumnsTable() { @Test public void testCombinationOfPlainAndGeneratedColumns() { StructType tableSchema = - getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "dateFmt")); + getSchemaWithFields(Arrays.asList("key", "firstName", "gender", "birthDate", "dateFmt")); StructType partitionSchema = - getSchemaWithFields(Arrays.asList("id", "dateFmt", "gender", "dateOfBirth")); + getSchemaWithFields(Arrays.asList("key", "dateFmt", "gender", "dateOfBirth")); OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema); List expectedOnePartitionFields = Arrays.asList( OnePartitionField.builder() .sourceField( OneField.builder() - .name("id") + .name("key") .schema(OneSchema.builder().name("integer").dataType(OneType.INT).build()) .build()) .transformType(PartitionTransformType.VALUE) diff --git a/core/src/test/java/io/onetable/hudi/ITHudiTargetClient.java b/core/src/test/java/io/onetable/hudi/ITHudiTargetClient.java index c2fd7396..84c6afe8 100644 --- a/core/src/test/java/io/onetable/hudi/ITHudiTargetClient.java +++ b/core/src/test/java/io/onetable/hudi/ITHudiTargetClient.java @@ -99,7 +99,7 @@ public class ITHudiTargetClient { private static final HoodieEngineContext CONTEXT = new HoodieJavaEngineContext(CONFIGURATION); private static final String TABLE_NAME = "test_table"; - private static final String KEY_FIELD_NAME = "id"; + private static final String KEY_FIELD_NAME = "key"; private static final String PARTITION_FIELD_NAME = "partition_field"; private static final String OTHER_FIELD_NAME = "content"; private static final long FILE_SIZE = 100L; diff --git a/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java b/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java index f2ad0c61..174a2507 100644 --- a/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java +++ b/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; @@ -55,10 +56,12 @@ @Builder @Value public class TestIcebergDataHelper { + public static final String DEFAULT_RECORD_KEY_FIELD = "key"; + private static final Random RANDOM = new Random(); private static final List COMMON_FIELDS = Arrays.asList( - NestedField.optional(1, "id", Types.StringType.get()), + NestedField.optional(1, "key", Types.StringType.get()), NestedField.optional(2, "ts", Types.LongType.get()), NestedField.optional(3, "level", Types.StringType.get()), NestedField.optional(4, "severity", Types.IntegerType.get()), @@ -114,15 +117,14 @@ public class TestIcebergDataHelper { private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); Schema tableSchema; - String recordKeyField; List partitionFieldNames; public static TestIcebergDataHelper createIcebergDataHelper( - String recordKeyField, List partitionFields, boolean includeAdditionalColumns) { - Schema tableSchema = getSchema(includeAdditionalColumns); + Optional tblSchema, List partitionFields, boolean includeAdditionalColumns) { + + Schema schema = tblSchema.orElse(getSchema(includeAdditionalColumns)); return TestIcebergDataHelper.builder() - .tableSchema(tableSchema) - .recordKeyField(recordKeyField) + .tableSchema(schema) .partitionFieldNames(partitionFields) .build(); } @@ -193,7 +195,7 @@ private Record generateUpsertRecord( String fieldName = field.name(); Object value; - if (fieldName.equals(recordKeyField) + if (fieldName.equals(DEFAULT_RECORD_KEY_FIELD) || (partitionFieldNames != null && partitionFieldNames.contains(fieldName))) { value = existingRecord.getField(fieldName); } else { @@ -237,7 +239,7 @@ private Object generateValueForField( Type fieldType = field.type(); if (partitionValue != null && partitionFieldNames.contains(fieldName)) { return partitionValue; - } else if (fieldName.equals(recordKeyField)) { + } else if (fieldName.equals(DEFAULT_RECORD_KEY_FIELD)) { return keyValue; } else if (fieldName.equals("ts")) { return System.currentTimeMillis(); diff --git a/core/src/test/java/io/onetable/iceberg/TestIcebergPartitionValueConverter.java b/core/src/test/java/io/onetable/iceberg/TestIcebergPartitionValueConverter.java index 7a3c8848..2499f078 100644 --- a/core/src/test/java/io/onetable/iceberg/TestIcebergPartitionValueConverter.java +++ b/core/src/test/java/io/onetable/iceberg/TestIcebergPartitionValueConverter.java @@ -45,12 +45,12 @@ public class TestIcebergPartitionValueConverter { IcebergPartitionValueConverter.getInstance(); private static final Schema SCHEMA = new Schema( - Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(1, "key", Types.IntegerType.get()), Types.NestedField.optional(2, "name", Types.StringType.get()), Types.NestedField.optional(3, "birthDate", Types.TimestampType.withZone())); private static final Schema SCHEMA_WITH_PARTITION = new Schema( - Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(1, "key", Types.IntegerType.get()), Types.NestedField.optional(2, "name", Types.StringType.get()), Types.NestedField.optional(3, "birthDate", Types.TimestampType.withZone()), Types.NestedField.optional(4, "birthDate_year", Types.IntegerType.get())); diff --git a/core/src/test/resources/schemas/common_schema.avsc b/core/src/test/resources/schemas/common_schema.avsc new file mode 100644 index 00000000..8659f3f0 --- /dev/null +++ b/core/src/test/resources/schemas/common_schema.avsc @@ -0,0 +1,62 @@ +{ + "type": "record", + "name": "CommonSchema", + "namespace": "test", + "fields": [ + { + "name": "key", + "type": "string" + }, + { + "name": "ts", + "type": "long" + }, + { + "name": "level", + "type": "string" + }, + { + "name": "severity", + "type": ["null", "int"], + "default": null + }, + { + "name": "double_field", + "type": "double", + "default": 0.0 + }, + { + "name": "float_field", + "type": "float", + "default": 0.0 + }, + { + "name": "int_field", + "type": "int", + "default": 0 + }, + { + "name": "long_field", + "type": "long", + "default": 0 + }, + { + "name": "boolean_field", + "type": "boolean", + "default": false + }, + { + "name": "string_field", + "type": { + "type": "string", + "avro.java.string": "String" + }, + "default": "" + }, + { + "name": "bytes_field", + "type": "bytes", + "default": "" + } + ] +} \ No newline at end of file