diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java index 5e001c6cf..20af37e0c 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java @@ -78,6 +78,8 @@ public enum MetadataValue { MILLIS } + public static final String XTABLE_LOGICAL_TYPE = "xtableLogicalType"; + /** * Performs a level-order traversal of the schema and returns a list of all fields. Use this * method to get a list that includes nested fields. Use {@link InternalSchema#getFields()} when diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java index c2f1a223e..e1b1049d0 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java @@ -38,6 +38,7 @@ public enum InternalType { LIST, MAP, UNION, + UUID, FIXED, STRING, BYTES, diff --git a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java index 346dcded3..9f40d29e8 100644 --- a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java @@ -132,8 +132,13 @@ private InternalSchema toInternalSchema( break; } if (schema.getType() == Schema.Type.FIXED) { - metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, schema.getFixedSize()); - newDataType = InternalType.FIXED; + String xtableLogicalType = schema.getProp(InternalSchema.XTABLE_LOGICAL_TYPE); + if ("uuid".equals(xtableLogicalType)) { + newDataType = InternalType.UUID; + } else { + metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, schema.getFixedSize()); + newDataType = InternalType.FIXED; + } } else { newDataType = InternalType.BYTES; } @@ -435,6 +440,11 @@ private Schema fromInternalSchema(InternalSchema internalSchema, String currentP Schema.createFixed( internalSchema.getName(), internalSchema.getComment(), null, fixedSize), internalSchema); + case UUID: + Schema uuidSchema = + Schema.createFixed(internalSchema.getName(), internalSchema.getComment(), null, 16); + uuidSchema.addProp(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid"); + return finalizeSchema(uuidSchema, internalSchema); default: throw new UnsupportedSchemaTypeException( "Encountered unhandled type during InternalSchema to Avro conversion: " diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java index fa425ef2e..e312761f7 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java @@ -33,6 +33,7 @@ import org.apache.spark.sql.types.DecimalType; import org.apache.spark.sql.types.MapType; import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -73,7 +74,7 @@ public StructType fromInternalSchema(InternalSchema internalSchema) { field.getName(), convertFieldType(field), field.getSchema().isNullable(), - Metadata.empty())) + getMetaData(field.getSchema().getDataType()))) .toArray(StructField[]::new); return new StructType(fields); } @@ -90,6 +91,7 @@ private DataType convertFieldType(InternalField field) { return DataTypes.LongType; case BYTES: case FIXED: + case UUID: return DataTypes.BinaryType; case BOOLEAN: return DataTypes.BooleanType; @@ -142,12 +144,24 @@ private DataType convertFieldType(InternalField field) { } } + private Metadata getMetaData(InternalType type) { + if (type == InternalType.UUID) { + return new MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid").build(); + } else { + return Metadata.empty(); + } + } + public InternalSchema toInternalSchema(StructType structType) { - return toInternalSchema(structType, null, false, null); + return toInternalSchema(structType, null, false, null, null); } private InternalSchema toInternalSchema( - DataType dataType, String parentPath, boolean nullable, String comment) { + DataType dataType, + String parentPath, + boolean nullable, + String comment, + Metadata originalMetadata) { Map metadata = null; List fields = null; InternalType type; @@ -172,7 +186,12 @@ private InternalSchema toInternalSchema( type = InternalType.DOUBLE; break; case "binary": - type = InternalType.BYTES; + if (originalMetadata.contains(InternalSchema.XTABLE_LOGICAL_TYPE) + && "uuid".equals(originalMetadata.getString(InternalSchema.XTABLE_LOGICAL_TYPE))) { + type = InternalType.UUID; + } else { + type = InternalType.BYTES; + } break; case "long": type = InternalType.LONG; @@ -210,7 +229,8 @@ private InternalSchema toInternalSchema( field.dataType(), SchemaUtils.getFullyQualifiedPath(parentPath, field.name()), field.nullable(), - fieldComment); + fieldComment, + field.metadata()); return InternalField.builder() .name(field.name()) .fieldId(fieldId) @@ -238,6 +258,7 @@ private InternalSchema toInternalSchema( SchemaUtils.getFullyQualifiedPath( parentPath, InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME), arrayType.containsNull(), + null, null); InternalField elementField = InternalField.builder() @@ -256,6 +277,7 @@ private InternalSchema toInternalSchema( SchemaUtils.getFullyQualifiedPath( parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME), false, + null, null); InternalField keyField = InternalField.builder() @@ -269,6 +291,7 @@ private InternalSchema toInternalSchema( SchemaUtils.getFullyQualifiedPath( parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME), mapType.valueContainsNull(), + null, null); InternalField valueField = InternalField.builder() diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java index 0b95e65ea..a55968d9d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java @@ -159,6 +159,7 @@ private static PartialResult parseValue( break; case FIXED: case BYTES: + case UUID: parsedValue = valueAsString.getBytes(StandardCharsets.UTF_8); break; case BOOLEAN: diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java index 3acd78566..4cf825d77 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java @@ -199,6 +199,8 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) { return Types.DecimalType.of(precision, scale); case RECORD: return Types.StructType.of(convertFields(field.getSchema(), fieldIdTracker)); + case UUID: + return Types.UUIDType.get(); case MAP: InternalField key = field.getSchema().getFields().stream() @@ -305,7 +307,7 @@ private InternalSchema fromIcebergType( InternalSchema.MetadataKey.FIXED_BYTES_SIZE, fixedType.length()); break; case UUID: - type = InternalType.FIXED; + type = InternalType.UUID; metadata = Collections.singletonMap(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 16); break; case STRUCT: diff --git a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java index c6b75c316..dce0f21ab 100644 --- a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java @@ -127,6 +127,22 @@ static GenericTable getInstanceWithCustomPartitionConfig( } } + static GenericTable getInstanceWithUUIDColumns( + String tableName, + Path tempDir, + SparkSession sparkSession, + JavaSparkContext jsc, + String sourceFormat, + boolean isPartitioned) { + switch (sourceFormat) { + case ICEBERG: + return TestIcebergTable.forSchemaWithUUIDColumns( + tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration()); + default: + throw new IllegalArgumentException("Unsupported source format: " + sourceFormat); + } + } + static String getTableName() { return "test_table_" + UUID.randomUUID().toString().replaceAll("-", "_"); } diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 58f0f9828..3d539766a 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import java.net.URI; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -38,12 +39,14 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -82,6 +85,10 @@ import org.apache.spark.sql.delta.DeltaLog; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import org.apache.xtable.conversion.ConversionConfig; @@ -100,6 +107,7 @@ public class ITConversionController { @TempDir public static Path tempDir; private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("UTC")); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static JavaSparkContext jsc; private static SparkSession sparkSession; @@ -142,6 +150,19 @@ private static Stream generateTestParametersForFormatsSyncModesAndPar return arguments.stream(); } + private static Stream generateTestParametersForUUID() { + List arguments = new ArrayList<>(); + for (SyncMode syncMode : SyncMode.values()) { + for (boolean isPartitioned : new boolean[] {true, false}) { + // TODO: Add Hudi UUID support later (https://github.com/apache/incubator-xtable/issues/543) + // Current spark parquet reader can not handle fix-size byte array with UUID logic type + List targetTableFormats = Arrays.asList(DELTA); + arguments.add(Arguments.of(ICEBERG, targetTableFormats, syncMode, isPartitioned)); + } + } + return arguments.stream(); + } + private static Stream testCasesWithSyncModes() { return Stream.of(Arguments.of(SyncMode.INCREMENTAL), Arguments.of(SyncMode.FULL)); } @@ -261,6 +282,54 @@ public void testVariousOperations( } } + // The test content is the simplified version of testVariousOperations + // The difference is that the data source from Iceberg contains UUID columns + @ParameterizedTest + @MethodSource("generateTestParametersForUUID") + public void testVariousOperationsWithUUID( + String sourceTableFormat, + List targetTableFormats, + SyncMode syncMode, + boolean isPartitioned) { + String tableName = getTableName(); + ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); + String partitionConfig = null; + if (isPartitioned) { + partitionConfig = "level:VALUE"; + } + ConversionSourceProvider conversionSourceProvider = + getConversionSourceProvider(sourceTableFormat); + List insertRecords; + try (GenericTable table = + GenericTable.getInstanceWithUUIDColumns( + tableName, tempDir, sparkSession, jsc, sourceTableFormat, isPartitioned)) { + insertRecords = table.insertRows(100); + + ConversionConfig conversionConfig = + getTableSyncConfig( + sourceTableFormat, + syncMode, + tableName, + table, + targetTableFormats, + partitionConfig, + null); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 100); + + // Upsert some records and sync again + table.upsertRows(insertRecords.subList(0, 20)); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 100); + + table.deleteRows(insertRecords.subList(30, 50)); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 80); + checkDatasetEquivalenceWithFilter( + sourceTableFormat, table, targetTableFormats, table.getFilterQuery()); + } + } + @ParameterizedTest @MethodSource("testCasesWithPartitioningAndSyncModes") public void testConcurrentInsertWritesInSource( @@ -797,13 +866,84 @@ private void checkDatasetEquivalence( // if count is not known ahead of time, ensure datasets are non-empty assertFalse(dataset1Rows.isEmpty()); } + + if (containsUUIDFields(dataset1Rows) && containsUUIDFields(dataset2Rows)) { + compareDatasetWithUUID(dataset1Rows, dataset2Rows); + } else { + assertEquals( + dataset1Rows, + dataset2Rows, + String.format( + "Datasets are not equivalent when reading from Spark. Source: %s, Target: %s", + sourceFormat, format)); + } + }); + } + + /** + * Compares two datasets where dataset1Rows is for Iceberg and dataset2Rows is for other formats + * (such as Delta or Hudi). - For the "uuid_field", if present, the UUID from dataset1 (Iceberg) + * is compared with the Base64-encoded UUID from dataset2 (other formats), after decoding. - For + * all other fields, the values are compared directly. - If neither row contains the "uuid_field", + * the rows are compared as plain JSON strings. + * + * @param dataset1Rows List of JSON rows representing the dataset in Iceberg format (UUID is + * stored as a string). + * @param dataset2Rows List of JSON rows representing the dataset in other formats (UUID might be + * Base64-encoded). + */ + private void compareDatasetWithUUID(List dataset1Rows, List dataset2Rows) { + for (int i = 0; i < dataset1Rows.size(); i++) { + String row1 = dataset1Rows.get(i); + String row2 = dataset2Rows.get(i); + if (row1.contains("uuid_field") && row2.contains("uuid_field")) { + try { + JsonNode node1 = OBJECT_MAPPER.readTree(row1); + JsonNode node2 = OBJECT_MAPPER.readTree(row2); + + // check uuid field + String uuidStr1 = node1.get("uuid_field").asText(); + byte[] bytes = Base64.getDecoder().decode(node2.get("uuid_field").asText()); + ByteBuffer bb = ByteBuffer.wrap(bytes); + UUID uuid2 = new UUID(bb.getLong(), bb.getLong()); + String uuidStr2 = uuid2.toString(); assertEquals( - dataset1Rows, - dataset2Rows, + uuidStr1, + uuidStr2, String.format( "Datasets are not equivalent when reading from Spark. Source: %s, Target: %s", - sourceFormat, format)); - }); + uuidStr1, uuidStr2)); + + // check other fields + ((ObjectNode) node1).remove("uuid_field"); + ((ObjectNode) node2).remove("uuid_field"); + assertEquals( + node1.toString(), + node2.toString(), + String.format( + "Datasets are not equivalent when comparing other fields. Source: %s, Target: %s", + node1, node2)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } else { + assertEquals( + row1, + row2, + String.format( + "Datasets are not equivalent when reading from Spark. Source: %s, Target: %s", + row1, row2)); + } + } + } + + private boolean containsUUIDFields(List rows) { + for (String row : rows) { + if (row.contains("\"uuid_field\"")) { + return true; + } + } + return false; } private static Stream addBasicPartitionCases(Stream arguments) { diff --git a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java index bb63667a7..0c8336fef 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java @@ -88,7 +88,7 @@ public static TestIcebergTable forStandardSchemaAndPartitioning( hadoopConf, DEFAULT_RECORD_KEY_FIELD, Collections.singletonList(partitionField), - false); + TestIcebergDataHelper.SchemaType.COMMON); } public static TestIcebergTable forSchemaWithAdditionalColumnsAndPartitioning( @@ -99,7 +99,18 @@ public static TestIcebergTable forSchemaWithAdditionalColumnsAndPartitioning( hadoopConf, DEFAULT_RECORD_KEY_FIELD, Collections.singletonList(partitionField), - true); + TestIcebergDataHelper.SchemaType.COMMON_WITH_ADDITIONAL_COLUMNS); + } + + public static TestIcebergTable forSchemaWithUUIDColumns( + String tableName, String partitionField, Path tempDir, Configuration hadoopConf) { + return new TestIcebergTable( + tableName, + tempDir, + hadoopConf, + DEFAULT_RECORD_KEY_FIELD, + Collections.singletonList(partitionField), + TestIcebergDataHelper.SchemaType.COMMON_WITH_UUID_COLUMN); } public TestIcebergTable( @@ -108,12 +119,12 @@ public TestIcebergTable( Configuration hadoopConf, String recordKeyField, List partitionFields, - boolean includeAdditionalColumns) { + TestIcebergDataHelper.SchemaType schemaType) { this.tableName = tableName; this.basePath = tempDir.toUri().toString(); this.icebergDataHelper = TestIcebergDataHelper.createIcebergDataHelper( - recordKeyField, filterNullFields(partitionFields), includeAdditionalColumns); + recordKeyField, filterNullFields(partitionFields), schemaType); this.schema = icebergDataHelper.getTableSchema(); PartitionSpec partitionSpec = icebergDataHelper.getPartitionSpec(); diff --git a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java index 20d411393..0b6823a1f 100644 --- a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java +++ b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java @@ -844,4 +844,49 @@ public void testIdSupport() { .build(); assertEquals(internalSchema, AvroSchemaConverter.getInstance().toInternalSchema(schemaWithIds)); } + + @Test + public void testIcebergToAvroUUIDSupport() { + String schemaName = "testRecord"; + String doc = "What's up doc"; + Schema avroRepresentation = + new Schema.Parser() + .parse( + "{\"type\":\"record\",\"name\":\"testRecord\",\"doc\":\"What's up doc\",\"fields\":[" + + "{\"name\":\"requiredUUID\",\"type\":{\"type\":\"fixed\",\"name\":\"required_uuid\",\"size\":16,\"xtableLogicalType\":\"uuid\"}}," + + "{\"name\":\"optionalUUID\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"optional_uuid\",\"size\":16,\"xtableLogicalType\":\"uuid\"}],\"default\":null}" + + "]}"); + InternalSchema internalSchema = + InternalSchema.builder() + .name(schemaName) + .comment(doc) + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("requiredUUID") + .schema( + InternalSchema.builder() + .name("required_uuid") + .dataType(InternalType.UUID) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("optionalUUID") + .schema( + InternalSchema.builder() + .name("optional_uuid") + .dataType(InternalType.UUID) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + assertEquals( + avroRepresentation, AvroSchemaConverter.getInstance().fromInternalSchema(internalSchema)); + assertEquals( + internalSchema, AvroSchemaConverter.getInstance().toInternalSchema(avroRepresentation)); + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java index 45c90660d..4b0eacd06 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java @@ -25,6 +25,7 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -892,4 +893,46 @@ void generateColumnsAreNotTranslatedToInternalSchema() { Assertions.assertEquals( internalSchema, DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation)); } + + @Test + public void testIcebergToDeltaUUIDSupport() { + Metadata metadata = + new MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid").build(); + StructType structRepresentation = + new StructType() + .add("requiredUUID", DataTypes.BinaryType, false, metadata) + .add("optionalUUID", DataTypes.BinaryType, true, metadata); + InternalSchema internalSchema = + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("requiredUUID") + .schema( + InternalSchema.builder() + .name("binary") + .dataType(InternalType.UUID) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("optionalUUID") + .schema( + InternalSchema.builder() + .name("binary") + .dataType(InternalType.UUID) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + Assertions.assertEquals( + structRepresentation, + DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema)); + Assertions.assertEquals( + internalSchema, DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation)); + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java index cf806fd6a..1d10fe7af 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java @@ -104,12 +104,16 @@ public class TestIcebergDataHelper { Arrays.asList( NestedField.optional(31, "additional_column1", Types.StringType.get()), NestedField.optional(32, "additional_column2", Types.LongType.get())); + private static final List UUID_FIELDS = + Arrays.asList(NestedField.optional(33, "uuid_field", Types.UUIDType.get())); private static final Schema BASE_SCHEMA = new Schema(COMMON_FIELDS); private static final Schema SCHEMA_WITH_ADDITIONAL_COLUMNS = new Schema( Stream.concat(COMMON_FIELDS.stream(), ADDITIONAL_FIELDS.stream()) .collect(Collectors.toList())); - + private static final Schema SCHEMA_WITH_UUID_COLUMN = + new Schema( + Stream.concat(COMMON_FIELDS.stream(), UUID_FIELDS.stream()).collect(Collectors.toList())); private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); @@ -117,9 +121,15 @@ public class TestIcebergDataHelper { String recordKeyField; List partitionFieldNames; + public static enum SchemaType { + COMMON, + COMMON_WITH_ADDITIONAL_COLUMNS, + COMMON_WITH_UUID_COLUMN, + } + public static TestIcebergDataHelper createIcebergDataHelper( - String recordKeyField, List partitionFields, boolean includeAdditionalColumns) { - Schema tableSchema = getSchema(includeAdditionalColumns); + String recordKeyField, List partitionFields, SchemaType schemaType) { + Schema tableSchema = getSchema(schemaType); return TestIcebergDataHelper.builder() .tableSchema(tableSchema) .recordKeyField(recordKeyField) @@ -127,8 +137,17 @@ public static TestIcebergDataHelper createIcebergDataHelper( .build(); } - private static Schema getSchema(boolean includeAdditionalColumns) { - return includeAdditionalColumns ? SCHEMA_WITH_ADDITIONAL_COLUMNS : BASE_SCHEMA; + private static Schema getSchema(SchemaType schemaType) { + switch (schemaType) { + case COMMON: + return BASE_SCHEMA; + case COMMON_WITH_ADDITIONAL_COLUMNS: + return SCHEMA_WITH_ADDITIONAL_COLUMNS; + case COMMON_WITH_UUID_COLUMN: + return SCHEMA_WITH_UUID_COLUMN; + default: + throw new IllegalArgumentException("Unknown schema type: " + schemaType); + } } public List generateInsertRecords(int numRecords) { @@ -299,7 +318,11 @@ private Object generateRandomValueForType( case STRUCT: return generateInsertRecord(timeLowerBound, timeUpperBound, fieldType.asStructType()); case UUID: - return UUID.randomUUID().toString(); + UUID uuid = UUID.randomUUID(); + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]); + byteBuffer.putLong(uuid.getMostSignificantBits()); + byteBuffer.putLong(uuid.getLeastSignificantBits()); + return byteBuffer.array(); case LIST: Types.ListType listType = (Types.ListType) fieldType; int listSize = RANDOM.nextInt(5) + 1; diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java index 7559a5e82..287765413 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java @@ -372,7 +372,7 @@ public void testUuids() { .schema( InternalSchema.builder() .name("uuid") - .dataType(InternalType.FIXED) + .dataType(InternalType.UUID) .isNullable(false) .metadata(fixedMetadata) .build()) @@ -383,7 +383,7 @@ public void testUuids() { .schema( InternalSchema.builder() .name("uuid") - .dataType(InternalType.FIXED) + .dataType(InternalType.UUID) .isNullable(true) .metadata(fixedMetadata) .build()) @@ -391,6 +391,7 @@ public void testUuids() { .build())) .build(); assertEquals(expectedSchema, (SCHEMA_EXTRACTOR.fromIceberg(inputSchema))); + assertTrue(inputSchema.sameSchema(SCHEMA_EXTRACTOR.toIceberg(expectedSchema))); } @Test