Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[112] Add a UUID Type #518

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public enum MetadataValue {
MILLIS
}

public static final String XTABLE_LOGICAL_TYPE = "xtableLogicalType";

/**
* Performs a level-order traversal of the schema and returns a list of all fields. Use this
* method to get a list that includes nested fields. Use {@link InternalSchema#getFields()} when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public enum InternalType {
LIST,
MAP,
UNION,
UUID,
FIXED,
STRING,
BYTES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,13 @@ private InternalSchema toInternalSchema(
break;
}
if (schema.getType() == Schema.Type.FIXED) {
metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, schema.getFixedSize());
newDataType = InternalType.FIXED;
String xtableLogicalType = schema.getProp(InternalSchema.XTABLE_LOGICAL_TYPE);
if ("uuid".equals(xtableLogicalType)) {
newDataType = InternalType.UUID;
} else {
metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, schema.getFixedSize());
newDataType = InternalType.FIXED;
}
} else {
newDataType = InternalType.BYTES;
}
Expand Down Expand Up @@ -435,6 +440,11 @@ private Schema fromInternalSchema(InternalSchema internalSchema, String currentP
Schema.createFixed(
internalSchema.getName(), internalSchema.getComment(), null, fixedSize),
internalSchema);
case UUID:
Schema uuidSchema =
Schema.createFixed(internalSchema.getName(), internalSchema.getComment(), null, 16);
uuidSchema.addProp(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid");
return finalizeSchema(uuidSchema, internalSchema);
default:
throw new UnsupportedSchemaTypeException(
"Encountered unhandled type during InternalSchema to Avro conversion: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

Expand Down Expand Up @@ -73,7 +74,7 @@ public StructType fromInternalSchema(InternalSchema internalSchema) {
field.getName(),
convertFieldType(field),
field.getSchema().isNullable(),
Metadata.empty()))
getMetaData(field.getSchema().getDataType())))
.toArray(StructField[]::new);
return new StructType(fields);
}
Expand All @@ -90,6 +91,7 @@ private DataType convertFieldType(InternalField field) {
return DataTypes.LongType;
case BYTES:
case FIXED:
case UUID:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some metadata you an add to a field if I am remember correctly, we will want to pack some context about the fact that this is a UUID if possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some metadata you an add to a field if I am remember correctly, we will want to pack some context about the fact that this is a UUID if possible

That's a good catch, I added "xtableLogicalType" in metadata in latest commit

return DataTypes.BinaryType;
case BOOLEAN:
return DataTypes.BooleanType;
Expand Down Expand Up @@ -142,12 +144,24 @@ private DataType convertFieldType(InternalField field) {
}
}

private Metadata getMetaData(InternalType type) {
if (type == InternalType.UUID) {
return new MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid").build();
} else {
return Metadata.empty();
}
}

public InternalSchema toInternalSchema(StructType structType) {
return toInternalSchema(structType, null, false, null);
return toInternalSchema(structType, null, false, null, null);
}

private InternalSchema toInternalSchema(
DataType dataType, String parentPath, boolean nullable, String comment) {
DataType dataType,
String parentPath,
boolean nullable,
String comment,
Metadata originalMetadata) {
danielhumanmod marked this conversation as resolved.
Show resolved Hide resolved
Map<InternalSchema.MetadataKey, Object> metadata = null;
List<InternalField> fields = null;
InternalType type;
Expand All @@ -172,7 +186,12 @@ private InternalSchema toInternalSchema(
type = InternalType.DOUBLE;
break;
case "binary":
type = InternalType.BYTES;
if (originalMetadata.contains(InternalSchema.XTABLE_LOGICAL_TYPE)
&& "uuid".equals(originalMetadata.getString(InternalSchema.XTABLE_LOGICAL_TYPE))) {
type = InternalType.UUID;
} else {
type = InternalType.BYTES;
}
break;
case "long":
type = InternalType.LONG;
Expand Down Expand Up @@ -210,7 +229,8 @@ private InternalSchema toInternalSchema(
field.dataType(),
SchemaUtils.getFullyQualifiedPath(parentPath, field.name()),
field.nullable(),
fieldComment);
fieldComment,
field.metadata());
return InternalField.builder()
.name(field.name())
.fieldId(fieldId)
Expand Down Expand Up @@ -238,6 +258,7 @@ private InternalSchema toInternalSchema(
SchemaUtils.getFullyQualifiedPath(
parentPath, InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME),
arrayType.containsNull(),
null,
null);
InternalField elementField =
InternalField.builder()
Expand All @@ -256,6 +277,7 @@ private InternalSchema toInternalSchema(
SchemaUtils.getFullyQualifiedPath(
parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
false,
null,
null);
InternalField keyField =
InternalField.builder()
Expand All @@ -269,6 +291,7 @@ private InternalSchema toInternalSchema(
SchemaUtils.getFullyQualifiedPath(
parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
mapType.valueContainsNull(),
null,
null);
InternalField valueField =
InternalField.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ private static PartialResult parseValue(
break;
case FIXED:
case BYTES:
case UUID:
parsedValue = valueAsString.getBytes(StandardCharsets.UTF_8);
break;
case BOOLEAN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) {
return Types.DecimalType.of(precision, scale);
case RECORD:
return Types.StructType.of(convertFields(field.getSchema(), fieldIdTracker));
case UUID:
return Types.UUIDType.get();
case MAP:
InternalField key =
field.getSchema().getFields().stream()
Expand Down Expand Up @@ -305,7 +307,7 @@ private InternalSchema fromIcebergType(
InternalSchema.MetadataKey.FIXED_BYTES_SIZE, fixedType.length());
break;
case UUID:
type = InternalType.FIXED;
type = InternalType.UUID;
metadata = Collections.singletonMap(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 16);
break;
case STRUCT:
Expand Down
16 changes: 16 additions & 0 deletions xtable-core/src/test/java/org/apache/xtable/GenericTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,22 @@ static GenericTable getInstanceWithCustomPartitionConfig(
}
}

static GenericTable getInstanceWithUUIDColumns(
String tableName,
Path tempDir,
SparkSession sparkSession,
JavaSparkContext jsc,
String sourceFormat,
boolean isPartitioned) {
switch (sourceFormat) {
case ICEBERG:
return TestIcebergTable.forSchemaWithUUIDColumns(
tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration());
default:
throw new IllegalArgumentException("Unsupported source format: " + sourceFormat);
}
}

static String getTableName() {
return "test_table_" + UUID.randomUUID().toString().replaceAll("-", "_");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;

import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -38,12 +39,14 @@
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -82,6 +85,10 @@

import org.apache.spark.sql.delta.DeltaLog;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;

import org.apache.xtable.conversion.ConversionConfig;
Expand All @@ -100,6 +107,7 @@ public class ITConversionController {
@TempDir public static Path tempDir;
private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("UTC"));
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static JavaSparkContext jsc;
private static SparkSession sparkSession;
Expand Down Expand Up @@ -142,6 +150,19 @@ private static Stream<Arguments> generateTestParametersForFormatsSyncModesAndPar
return arguments.stream();
}

private static Stream<Arguments> generateTestParametersForUUID() {
List<Arguments> arguments = new ArrayList<>();
for (SyncMode syncMode : SyncMode.values()) {
for (boolean isPartitioned : new boolean[] {true, false}) {
// TODO: Add Hudi UUID support later (https://github.com/apache/incubator-xtable/issues/543)
// Current spark parquet reader can not handle fix-size byte array with UUID logic type
List<String> targetTableFormats = Arrays.asList(DELTA);
arguments.add(Arguments.of(ICEBERG, targetTableFormats, syncMode, isPartitioned));
}
}
return arguments.stream();
}

private static Stream<Arguments> testCasesWithSyncModes() {
return Stream.of(Arguments.of(SyncMode.INCREMENTAL), Arguments.of(SyncMode.FULL));
}
Expand Down Expand Up @@ -261,6 +282,54 @@ public void testVariousOperations(
}
}

danielhumanmod marked this conversation as resolved.
Show resolved Hide resolved
// The test content is the simplified version of testVariousOperations
// The difference is that the data source from Iceberg contains UUID columns
@ParameterizedTest
@MethodSource("generateTestParametersForUUID")
public void testVariousOperationsWithUUID(
String sourceTableFormat,
List<String> targetTableFormats,
SyncMode syncMode,
boolean isPartitioned) {
String tableName = getTableName();
ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration());
String partitionConfig = null;
if (isPartitioned) {
partitionConfig = "level:VALUE";
}
ConversionSourceProvider<?> conversionSourceProvider =
getConversionSourceProvider(sourceTableFormat);
List<?> insertRecords;
try (GenericTable table =
GenericTable.getInstanceWithUUIDColumns(
tableName, tempDir, sparkSession, jsc, sourceTableFormat, isPartitioned)) {
insertRecords = table.insertRows(100);

ConversionConfig conversionConfig =
getTableSyncConfig(
sourceTableFormat,
syncMode,
tableName,
table,
targetTableFormats,
partitionConfig,
null);
conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 100);

// Upsert some records and sync again
table.upsertRows(insertRecords.subList(0, 20));
conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 100);

table.deleteRows(insertRecords.subList(30, 50));
conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 80);
checkDatasetEquivalenceWithFilter(
sourceTableFormat, table, targetTableFormats, table.getFilterQuery());
}
}

@ParameterizedTest
@MethodSource("testCasesWithPartitioningAndSyncModes")
public void testConcurrentInsertWritesInSource(
Expand Down Expand Up @@ -797,13 +866,84 @@ private void checkDatasetEquivalence(
// if count is not known ahead of time, ensure datasets are non-empty
assertFalse(dataset1Rows.isEmpty());
}
danielhumanmod marked this conversation as resolved.
Show resolved Hide resolved

if (containsUUIDFields(dataset1Rows) && containsUUIDFields(dataset2Rows)) {
compareDatasetWithUUID(dataset1Rows, dataset2Rows);
} else {
assertEquals(
dataset1Rows,
dataset2Rows,
String.format(
"Datasets are not equivalent when reading from Spark. Source: %s, Target: %s",
sourceFormat, format));
}
});
}

/**
* Compares two datasets where dataset1Rows is for Iceberg and dataset2Rows is for other formats
* (such as Delta or Hudi). - For the "uuid_field", if present, the UUID from dataset1 (Iceberg)
* is compared with the Base64-encoded UUID from dataset2 (other formats), after decoding. - For
* all other fields, the values are compared directly. - If neither row contains the "uuid_field",
* the rows are compared as plain JSON strings.
*
* @param dataset1Rows List of JSON rows representing the dataset in Iceberg format (UUID is
* stored as a string).
* @param dataset2Rows List of JSON rows representing the dataset in other formats (UUID might be
* Base64-encoded).
*/
private void compareDatasetWithUUID(List<String> dataset1Rows, List<String> dataset2Rows) {
for (int i = 0; i < dataset1Rows.size(); i++) {
String row1 = dataset1Rows.get(i);
String row2 = dataset2Rows.get(i);
Comment on lines +897 to +898
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're no longer asserting that the other fields are also equivalent. Is there a way we can do that as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're no longer asserting that the other fields are also equivalent. Is there a way we can do that as well?

Apologies for missing that earlier. You’re right—we do need to compare the other fields as well. This has already been fixed in the latest commit.

if (row1.contains("uuid_field") && row2.contains("uuid_field")) {
try {
JsonNode node1 = OBJECT_MAPPER.readTree(row1);
JsonNode node2 = OBJECT_MAPPER.readTree(row2);

// check uuid field
String uuidStr1 = node1.get("uuid_field").asText();
byte[] bytes = Base64.getDecoder().decode(node2.get("uuid_field").asText());
ByteBuffer bb = ByteBuffer.wrap(bytes);
UUID uuid2 = new UUID(bb.getLong(), bb.getLong());
String uuidStr2 = uuid2.toString();
assertEquals(
dataset1Rows,
dataset2Rows,
uuidStr1,
uuidStr2,
String.format(
"Datasets are not equivalent when reading from Spark. Source: %s, Target: %s",
sourceFormat, format));
});
uuidStr1, uuidStr2));

// check other fields
((ObjectNode) node1).remove("uuid_field");
((ObjectNode) node2).remove("uuid_field");
assertEquals(
node1.toString(),
node2.toString(),
String.format(
"Datasets are not equivalent when comparing other fields. Source: %s, Target: %s",
node1, node2));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
} else {
assertEquals(
row1,
row2,
String.format(
"Datasets are not equivalent when reading from Spark. Source: %s, Target: %s",
row1, row2));
}
}
}

private boolean containsUUIDFields(List<String> rows) {
for (String row : rows) {
if (row.contains("\"uuid_field\"")) {
return true;
}
}
return false;
}

private static Stream<Arguments> addBasicPartitionCases(Stream<Arguments> arguments) {
Expand Down
Loading