Skip to content

Commit

Permalink
fix: Support reading partitioned parquet from S3 bucket root URI (dee…
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Nov 1, 2024
1 parent c9bb557 commit 6d0344c
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public static URI convertToURI(final String source, final boolean isDirectory) {
return convertToURI(new File(uri), isDirectory);
}
String path = uri.getPath();
final boolean endsWithSlash = path.charAt(path.length() - 1) == URI_SEPARATOR_CHAR;
final boolean endsWithSlash = !path.isEmpty() && path.charAt(path.length() - 1) == URI_SEPARATOR_CHAR;
if (!isDirectory && endsWithSlash) {
throw new IllegalArgumentException("Non-directory URI should not end with a slash: " + uri);
}
Expand Down
3 changes: 3 additions & 0 deletions Base/src/test/java/io/deephaven/base/FileUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public void testConvertToS3URI() throws URISyntaxException {
// Check if multiple slashes get normalized
Assert.assertEquals("s3://bucket/key/", FileUtils.convertToURI("s3://bucket///key///", true).toString());

// Check if trailing slash gets added to bucket root
Assert.assertEquals("s3://bucket/", FileUtils.convertToURI("s3://bucket", true).toString());

try {
FileUtils.convertToURI("", false);
Assert.fail("Expected IllegalArgumentException");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,4 +587,66 @@ public void testReadWriteUsingProfile() throws IOException {
Files.delete(tempCredentialsFile);
}
}

@Test
public void testReadBucketRootKeyValuePartitioned() {
final TableDefinition definition = TableDefinition.of(
ColumnDefinition.ofInt("PC1").withPartitioning(),
ColumnDefinition.ofInt("PC2").withPartitioning(),
ColumnDefinition.ofInt("someIntColumn"),
ColumnDefinition.ofString("someStringColumn"));
final Table table = ((QueryTable) TableTools.emptyTable(500_000)
.updateView("PC1 = (int)(ii%3)",
"PC2 = (int)(ii%2)",
"someIntColumn = (int) i",
"someStringColumn = String.valueOf(i)"))
.withDefinitionUnsafe(definition);
final URI bucketRoot = URI.create(String.format("s3://%s", bucket));
final ParquetInstructions instructions = ParquetInstructions.builder()
.setSpecialInstructions(s3Instructions(
S3Instructions.builder()
.readTimeout(Duration.ofSeconds(10)))
.build())
.setTableDefinition(definition)
.setBaseNameForPartitionedParquetData("data")
.build();
writeKeyValuePartitionedTable(table, bucketRoot.toString(), instructions);
{
final Table fromS3 = ParquetTools.readTable(bucketRoot.toString(), instructions);
assertTableEquals(table.sort("PC1", "PC2"), fromS3.sort("PC1", "PC2"));
}
{
final URI bucketRootWithSlash = URI.create(String.format("s3://%s/", bucket));
final Table fromS3 = ParquetTools.readTable(bucketRootWithSlash.toString(), instructions);
assertTableEquals(table.sort("PC1", "PC2"), fromS3.sort("PC1", "PC2"));
}
}

@Test
public void testReadBucketRootFlatPartitioned() {
final Table table = getTable(100_000);
final ParquetInstructions instructions = ParquetInstructions.builder()
.setSpecialInstructions(s3Instructions(
S3Instructions.builder()
.readTimeout(Duration.ofSeconds(10)))
.build())
.build();
for (int i = 0; i < 3; ++i) {
final URI dest = uri("table" + i + ".parquet");
ParquetTools.writeTable(table, dest.toString(), instructions);
}

final URI bucketRoot = URI.create(String.format("s3://%s", bucket));
final Table expected = merge(table, table, table);
{
final Table fromS3AsFlat = ParquetTools.readTable(bucketRoot.toString(),
instructions.withLayout(ParquetInstructions.ParquetFileLayout.FLAT_PARTITIONED));
assertTableEquals(expected, fromS3AsFlat);
}
{
final Table fromS3AsKV = ParquetTools.readTable(bucketRoot.toString(),
instructions.withLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED));
assertTableEquals(expected, fromS3AsKV);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Stream<URI> createStream(
{
final S3Uri s3DirectoryURI = s3AsyncClient.utilities().parseUri(directory);
bucketName = s3DirectoryURI.bucket().orElseThrow();
directoryKey = s3DirectoryURI.key().orElseThrow();
directoryKey = s3DirectoryURI.key().orElse(""); // Empty string for the bucket root
}

@Override
Expand Down Expand Up @@ -209,8 +209,10 @@ public URI next() {
private void fetchNextBatch() throws IOException {
final ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix(directoryKey)
.maxKeys(MAX_KEYS_PER_BATCH);
if (!directoryKey.isEmpty()) {
requestBuilder.prefix(directoryKey);
}
if (!isRecursive) {
// Add a delimiter to the request if we don't want to fetch all files recursively
requestBuilder.delimiter("/");
Expand Down

0 comments on commit 6d0344c

Please sign in to comment.