-
Notifications
You must be signed in to change notification settings - Fork 149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[112] Add a UUID Type #518
[112] Add a UUID Type #518
Conversation
// Convert to fixed 16-byte array since Avro does not support UUID natively | ||
return finalizeSchema( | ||
Schema.createFixed(internalSchema.getName(), internalSchema.getComment(), null, 16), | ||
internalSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This converter is currently used exclusively for Hudi target conversions. Are there any other components or processes that need to handle the UUID internal type for Hudi (like HudiPartitionValuesExtractor.java)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing else is coming to mind. For partitions, we only support the year/month/day/hour and identity transforms from Iceberg current so those partition functions would just pass through the value if I'm remembering correctly.
Do we need to handle UUID here for Hudi partition value? https://github.com/apache/incubator-xtable/blob/main/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java#L160-L162 Like:
|
TODO: Probably we need some tests for round trip conversions, like Iceberg -> Delta Lake -> Iceberg to verify that UUIDs remain consistent throughout the conversions |
@@ -435,6 +435,11 @@ private Schema fromInternalSchema(InternalSchema internalSchema, String currentP | |||
Schema.createFixed( | |||
internalSchema.getName(), internalSchema.getComment(), null, fixedSize), | |||
internalSchema); | |||
case UUID: | |||
// Convert to fixed 16-byte array since Avro does not support UUID natively |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically there is a logical type for UUID in recent versions of avro https://avro.apache.org/docs/1.10.0/spec.html#UUID, but this is mentioning it is really just an annotation on a string field.
case UUID: | ||
// Convert to fixed 16-byte array since Avro does not support UUID natively | ||
return finalizeSchema( | ||
Schema.createFixed(internalSchema.getName(), internalSchema.getComment(), null, 16), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schema
exposes an addProp
method. We could try to add a prop here that allows us to determine that a fixed byte array should be treated as an UUID in XTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically there is a logical type for UUID in recent versions of avro https://avro.apache.org/docs/1.10.0/spec.html#UUID, but this is mentioning it is really just an annotation on a string field.
Schema
exposes anaddProp
method. We could try to add a prop here that allows us to determine that a fixed byte array should be treated as an UUID in XTable
Already added corresponding Prop in latest commit, thanks for providing this context Tim!
@danielhumanmod yes, let's add that |
@danielhumanmod we don't have any full round trip testing yet but I would like to see some added to our integration tests in the future. For this PR we can start by adding a UUID to the schema used by the Iceberg table generator that is used in ITConversionController |
@@ -99,7 +99,9 @@ public class TestIcebergDataHelper { | |||
NestedField.optional( | |||
28, "timestamp_micros_nullable_field", Types.TimestampType.withZone()), | |||
NestedField.optional( | |||
30, "timestamp_local_micros_nullable_field", Types.TimestampType.withoutZone())); | |||
30, "timestamp_local_micros_nullable_field", Types.TimestampType.withoutZone()), | |||
NestedField.optional(33, "uuid_field", Types.UUIDType.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @the-other-tim-brown ,After adding the new line to COMMON_FIELDS
, I encountered an error when converting from Iceberg to Hudi during the ITConversionController
test. I’m wondering if this issue might be due to a missing configuration (like need to updated the generated datasource somewhere?) or if there is something specific I should be aware of in this context.
If you could provide some guidance or context on this, it would be greatly appreciated.
Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: FIXED_LEN_BYTE_ARRAY (UUID).
at org.apache.spark.sql.errors.QueryCompilationErrors$.illegalParquetTypeError(QueryCompilationErrors.scala:1762)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.illegalType$1(ParquetSchemaConverter.scala:206)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertPrimitiveField$2(ParquetSchemaConverter.scala:310)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertPrimitiveField(ParquetSchemaConverter.scala:224)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:187)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3(ParquetSchemaConverter.scala:147)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3$adapted(ParquetSchemaConverter.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.immutable.Range.foreach(Range.scala:158)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertInternal(ParquetSchemaConverter.scala:117)
at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convert(ParquetSchemaConverter.scala:87)
at org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormatHelper$.buildImplicitSchemaChangeInfo(HoodieParquetFileFormatHelper.scala:31)
at org.apache.spark.sql.execution.datasources.parquet.Spark34LegacyHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark34LegacyHoodieParquetFileFormat.scala:254)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:231)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found relevant context in Spark sunchao/spark@feec791.
Based on the code in the convertPrimitiveField method, it seems that the Spark cannot read FIXED_LEN_BYTE_ARRAY fields with a UUID logical type annotation in Parquet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't work as well:
NestedField.optional(33, "uuid_field", Types.FixedType.ofLength(16))
But if we add this field in this way, it works:
NestedField.optional(33, "uuid_field", Types.BinaryType.get())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Iceberg to Delta work as expected? If only Hudi is broken, we can list that as a limitation and make a test case for Iceberg to Delta for now.
Thank you for digging in and providing the context up front of what you have tested and where the issue is, less homework for me :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Iceberg to Delta work as expected? If only Hudi is broken, we can list that as a limitation and make a test case for Iceberg to Delta for now.
The conversion from Iceberg to Delta success, but there is a diff in the result:
- Iceberg: "uuid_field":"8135e4c4-ea58-4ace-a819-4f835cb2b721"
- Delta: "uuid_field":"gTXkxOpYSs6oGU+DXLK3IQ=="
In fact these two data are the same(8135e4c4-ea58-4ace-a819-4f835cb2b721
equals 8135e4c4-ea58-4ace-a819-4f835cb2b721
), but assertion fails in this diff because the data format in two table is different - Iceberg supports UUIDs directly, while Delta represents them as binary.
Should we consider adding special comparison logic in checkDatasetEquivalence
for these specific cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for digging in and providing the context up front of what you have tested and where the issue is, less homework for me :)
My pleasure :) @the-other-tim-brown , this is a great opportunity for me to learn more about Spark and Parquet. Additionally, if we determine that this issue is indeed caused by incomplete support for UUIDs in the Spark Parquet converter, we could potentially contribute some code to the Spark community.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add in a special case on the assertion for these UUIDs. Are you thinking of just adding a transform on the field in the dataset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Iceberg to Delta work as expected? If only Hudi is broken, we can list that as a limitation and make a test case for Iceberg to Delta for now.
I think we can add in a special case on the assertion for these UUIDs. Are you thinking of just adding a transform on the field in the dataset?
Thanks for the feedback @the-other-tim-brown , in the latest commit, I have completed testing for UUID:
- Add a UUID test case for Iceberg -> Delta for now
- Add special case on assertion by processing binary UUID into string before comparison
- Make "xtableLogicalType" into a constant
case UUID: | ||
Schema uuidSchema = | ||
Schema.createFixed(internalSchema.getName(), internalSchema.getComment(), null, 16); | ||
uuidSchema.addProp("logicalType", "uuid"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be confusing to someone without knowledge of XTable being involved since logicalType
is an existing concept and there is a logical type with a string base type in avro. We should make it prefixed with xtable or something like that so someone looking at the schema in Hudi commit is aware that this is some special context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be confusing to someone without knowledge of XTable being involved since
logicalType
is an existing concept and there is a logical type with a string base type in avro. We should make it prefixed with xtable or something like that so someone looking at the schema in Hudi commit is aware that this is some special context
Thanks for the feedback! Already replace the prop with "xtableLogicalType" in the latest commit!=
@@ -90,6 +90,7 @@ private DataType convertFieldType(InternalField field) { | |||
return DataTypes.LongType; | |||
case BYTES: | |||
case FIXED: | |||
case UUID: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some metadata you an add to a field if I am remember correctly, we will want to pack some context about the fact that this is a UUID if possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some metadata you an add to a field if I am remember correctly, we will want to pack some context about the fact that this is a UUID if possible
That's a good catch, I added "xtableLogicalType" in metadata in latest commit
xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
Show resolved
Hide resolved
@@ -132,8 +132,13 @@ private InternalSchema toInternalSchema( | |||
break; | |||
} | |||
if (schema.getType() == Schema.Type.FIXED) { | |||
metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, schema.getFixedSize()); | |||
newDataType = InternalType.FIXED; | |||
String xtableLogicalType = schema.getProp("xtableLogicalType"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you put xtableLogicalType
into a constant that we can reference in the other files as well?
@@ -172,7 +186,11 @@ private InternalSchema toInternalSchema( | |||
type = InternalType.DOUBLE; | |||
break; | |||
case "binary": | |||
type = InternalType.BYTES; | |||
if ("uuid".equals(originalMetadata.getString("xtableLogicalType"))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the getString
is failing when I run locally. I think there needs to be a contains
check first or else you get this java.util.NoSuchElementException: key not found: xtableLogicalType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the
getString
is failing when I run locally. I think there needs to be acontains
check first or else you get thisjava.util.NoSuchElementException: key not found: xtableLogicalType
Sorry, I have committed this changes locally but forget to push, already fixed in latest commit
xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
Show resolved
Hide resolved
@danielhumanmod apologies for the delay on this review. I was out of office for the last 10 days. Can you resolve the conflict that GitHub is flagging? I am also going to spend a little time trying to resolve the hudi side of the reader today in parallel to see if we just need some property set on the reader. |
Thanks Tim! Sure, I have solved the conflict in latest commit. And appreciate your time to help on the Hudi issue! |
List<Arguments> arguments = new ArrayList<>(); | ||
for (SyncMode syncMode : SyncMode.values()) { | ||
for (boolean isPartitioned : new boolean[] {true, false}) { | ||
// TODO: Add Hudi UUID support later |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you create an issue for this in the project so we don't forget?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you create an issue for this in the project so we don't forget?
Sure, I have created an issue for this problem: #543
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you reference the issue in the comment as well?
@@ -261,6 +281,94 @@ public void testVariousOperations( | |||
} | |||
} | |||
|
|||
// The test content is the same as testVariousOperations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is only testing end to end UUID support, we can limit the amount of commits and keep this test smaller
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is only testing end to end UUID support, we can limit the amount of commits and keep this test smaller
Thanks for the suggestion, have simplified it in latest commit :)
sourceTableFormat, table, targetTableFormats, table.getFilterQuery()); | ||
} | ||
|
||
try (GenericTable tableWithUpdatedSchema = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can drop this as well. In the other testing, we the second try-with-resources is due to a change in schema in the source table so we test schema evolution but that is not being tested here. Let me know if I am missing something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can drop this as well. In the other testing, we the second try-with-resources is due to a change in schema in the source table so we test schema evolution but that is not being tested here. Let me know if I am missing something
Thanks for sharing this context, Tim. Yeah this part can be removed.
Gson gson = new Gson(); | ||
JsonParser parser = new JsonParser(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use jackson in other places for json, can we use that here to avoid requiring another library in case our dependencies change in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use jackson in other places for json, can we use that here to avoid requiring another library in case our dependencies change in the future?
Sure, thanks for the context! Already replaced that in latest commit.
JsonElement value = jsonObject.get(key); | ||
// Check if the key is "uuid_field" and if the value is Base64 encoded | ||
if (key.equals("uuid_field") | ||
&& value.getAsString().matches("^[A-Za-z0-9+/]+={0,2}$")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
matches
will keep recompiling the regex here. We can compile the pattern and define it as a constant for reuse to be slightly more efficient
jsonObject.addProperty(key, uuid.toString()); | ||
} | ||
} | ||
return gson.toJson(jsonObject); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to sanity check that this will maintain the field ordering. If not, we could get some flakey tests in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danielhumanmod After addressing these last minor issues, can you squash down to a single commit for me so it is ready to merge after the test complete?
// converting Base64-encoded UUID fields from Delta to their UUID string representations for | ||
// testing | ||
private List<String> processUUID(List<String> rows) { | ||
ObjectMapper objectMapper = new ObjectMapper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The instance is thread safe so you can define a static constant for this in the test class
@@ -100,6 +108,8 @@ public class ITConversionController { | |||
@TempDir public static Path tempDir; | |||
private static final DateTimeFormatter DATE_FORMAT = | |||
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("UTC")); | |||
// Compile the Base64 regex pattern once for reuse | |||
private static final Pattern BASE64_PATTERN = Pattern.compile("^[A-Za-z0-9+/]+={0,2}$"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add UUID
to the name so it's clear what it is the base 64 version of?
4977ab9
to
04b2ce2
Compare
5c4f3dd
to
79ba61c
Compare
Hi @the-other-tim-brown , I’ve addressed all the issues mentioned in the previous comments and have squashed all the commits into one. Additionally, I’ve refined the comparison logic in Thank you so much for your detailed review and suggestions—they were incredibly helpful! |
String row1 = dataset1Rows.get(i); | ||
String row2 = dataset2Rows.get(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're no longer asserting that the other fields are also equivalent. Is there a way we can do that as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're no longer asserting that the other fields are also equivalent. Is there a way we can do that as well?
Apologies for missing that earlier. You’re right—we do need to compare the other fields as well. This has already been fixed in the latest commit.
@@ -103,6 +109,7 @@ public class ITConversionController { | |||
|
|||
private static JavaSparkContext jsc; | |||
private static SparkSession sparkSession; | |||
private static final ObjectMapper objectMapper = new ObjectMapper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use UPPER_UNDERSCORE
when defining constants and move this up to line 109 to group with the DATE_FORMAT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use
UPPER_UNDERSCORE
when defining constants and move this up to line 109 to group with theDATE_FORMAT
Done! Thanks for the suggestion
79ba61c
to
a8664fd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your contribution!
Thank you so much for your detailed review and approval, @the-other-tim-brown ! Your feedback helped me improve the changes, and I learned a lot from the process. I’m really looking forward to contributing more to the project and continuing to collaborate with you in the future. |
Important Read
Aim to address #112
What is the purpose of the pull request
Create a UUID type in OneType and handle it appropriately in all converters. Only Iceberg has a concept of a UUID and it is a fixed size byte array in the underlying parquet files. We should aim to preserve the metadata that this byte array came from a UUID for round trip conversions.
Brief change log
UUID
type in InternalTypeVerify this pull request
This pull request is already covered by existing tests, all existing tests should pass