diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java index 507bf46b286..84c2872c108 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java @@ -17,7 +17,6 @@ import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; -import java.nio.file.Path; import java.util.*; import java.util.stream.Stream; @@ -111,15 +110,9 @@ public InputStream getInputStream(final SeekableByteChannel channel, final int s } @Override - public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boolean append) throws IOException { - final String pathKey = path.toAbsolutePath().toString(); - final ChannelType channelType = append ? ChannelType.WriteAppend : ChannelType.Write; - final KeyedObjectHashMap channelPool = channelPools.get(channelType); - final CachedChannel result = tryGetPooledChannel(pathKey, channelPool); - return result == null - ? new CachedChannel(wrappedProvider.getWriteChannel(path, append), channelType, pathKey) - : result.position(append ? result.size() : 0); // The seek isn't really necessary for append; will be at - // end no matter what. + public final CompletableOutputStream getOutputStream(@NotNull final URI uri, final int bufferSizeHint) + throws IOException { + return wrappedProvider.getOutputStream(uri, bufferSizeHint); } @Override diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/CompletableOutputStream.java b/Util/channel/src/main/java/io/deephaven/util/channel/CompletableOutputStream.java new file mode 100644 index 00000000000..28f3b8af129 --- /dev/null +++ b/Util/channel/src/main/java/io/deephaven/util/channel/CompletableOutputStream.java @@ -0,0 +1,51 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.util.channel; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * An {@link OutputStream} that can be marked as done, completed, or rolled back. + *

+ * The {@link #done()} method is used to flush all buffered data to the underlying storage, {@link #complete()} to + * finalize the write operation, and {@link #rollback()} to cancel the write. Closing this output stream without calling + * complete will not flush data to the underlying storage. + *

+ * One usage pattern can be like this: + * + *

+ * try (final CompletableOutputStream outputStream = CreateCompletableOutputStream()) {
+ *     try {
+ *         IOUtils.copy(inputStream, outputStream);
+ *         outputStream.done(); // Optional; use this to flush buffered data without completing the stream
+ *         outputStream.complete();
+ *     } catch (IOException e) {
+ *         outputStream.rollback();
+ *     }
+ * }
+ * 
+ */ +public abstract class CompletableOutputStream extends OutputStream { + + /** + * Flush all buffered data to the underlying storage. This is optional and should be called after the user is done + * writing to the output stream. All writes to the output stream after calling this method will lead to an + * {@link IOException}. + */ + public abstract void done() throws IOException; + + /** + * Flush all buffered data and save all written data to the underlying storage. This method should be called after + * the user is done writing to the output stream. All writes to the output stream after calling this method will + * lead to an {@link IOException}. + */ + public abstract void complete() throws IOException; + + /** + * Try to roll back any data written to the underlying storage, reverting back to the original state before opening + * this stream. This is an optional operation, as some implementations may not be able to support it. + */ + public abstract void rollback() throws IOException; +} diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java deleted file mode 100644 index 48083b074c3..00000000000 --- a/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java +++ /dev/null @@ -1,86 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.util.channel; - -import io.deephaven.base.FileUtils; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.nio.channels.FileChannel; -import java.nio.channels.SeekableByteChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.stream.Stream; - -public class LocalFSChannelProvider implements SeekableChannelsProvider { - private static final int MAX_READ_BUFFER_SIZE = 1 << 16; // 64 KiB - - @Override - public SeekableChannelContext makeContext() { - // No additional context required for local FS - return SeekableChannelContext.NULL; - } - - @Override - public boolean isCompatibleWith(@Nullable final SeekableChannelContext channelContext) { - // Context is not used, hence always compatible - return true; - } - - @Override - public boolean exists(@NotNull final URI uri) { - return Files.exists(Path.of(uri)); - } - - @Override - public SeekableByteChannel getReadChannel(@Nullable final SeekableChannelContext channelContext, - @NotNull final URI uri) - throws IOException { - // context is unused here - return FileChannel.open(Path.of(uri), StandardOpenOption.READ); - } - - @Override - public InputStream getInputStream(final SeekableByteChannel channel, final int sizeHint) { - // FileChannel is not buffered, need to buffer - final int bufferSize = Math.min(sizeHint, MAX_READ_BUFFER_SIZE); - return new BufferedInputStream(Channels.newInputStreamNoClose(channel), bufferSize); - } - - @Override - public SeekableByteChannel getWriteChannel(@NotNull final Path filePath, final boolean append) throws IOException { - final FileChannel result = FileChannel.open(filePath, - StandardOpenOption.WRITE, - StandardOpenOption.CREATE, - append ? StandardOpenOption.APPEND : StandardOpenOption.TRUNCATE_EXISTING); - if (append) { - result.position(result.size()); - } else { - result.position(0); - } - return result; - } - - @Override - public final Stream list(@NotNull final URI directory) throws IOException { - // Assuming that the URI is a file, not a directory. The caller should manage file vs. directory handling in - // the processor. - return Files.list(Path.of(directory)).map(path -> FileUtils.convertToURI(path, false)); - } - - @Override - public final Stream walk(@NotNull final URI directory) throws IOException { - // Assuming that the URI is a file, not a directory. The caller should manage file vs. directory handling in - // the processor. - return Files.walk(Path.of(directory)).map(path -> FileUtils.convertToURI(path, false)); - } - - @Override - public void close() {} -} diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java index 951224b7d8f..c2adf817e29 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java @@ -10,8 +10,6 @@ import java.io.InputStream; import java.net.URI; import java.nio.channels.SeekableByteChannel; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.stream.Stream; import static io.deephaven.base.FileUtils.convertToURI; @@ -92,11 +90,17 @@ SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContex */ InputStream getInputStream(SeekableByteChannel channel, int sizeHint) throws IOException; - default SeekableByteChannel getWriteChannel(@NotNull final String path, final boolean append) throws IOException { - return getWriteChannel(Paths.get(path), append); - } + /** + * Creates a {@link CompletableOutputStream} to write to the given URI. + * + * @param uri the URI to write to + * @param bufferSizeHint the number of bytes the caller expects to buffer before flushing + * @return the output stream + * @throws IOException if an IO exception occurs + * @see CompletableOutputStream + */ + CompletableOutputStream getOutputStream(@NotNull final URI uri, int bufferSizeHint) throws IOException; - SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) throws IOException; /** * Returns a stream of URIs, the elements of which are the entries in the directory. The listing is non-recursive. diff --git a/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java b/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java index 0f23fab7d39..5fa45196075 100644 --- a/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java +++ b/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java @@ -13,7 +13,6 @@ import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; -import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -54,50 +53,17 @@ public void testSimpleRead() throws IOException { } @Test - public void testSimpleReadWrite() throws IOException { + public void testSimplePooledReadChannelClose() throws IOException { SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100); for (int i = 0; i < 1000; i++) { - SeekableByteChannel rc = - ((i / 100) % 2 == 0 ? cachedChannelProvider.getReadChannel(wrappedProvider.makeContext(), "r" + i) - : cachedChannelProvider.getWriteChannel("w" + i, false)); + SeekableByteChannel rc = cachedChannelProvider.getReadChannel(wrappedProvider.makeContext(), "r" + i); rc.close(); } assertEquals(900, closed.size()); assertTrue(closed.get(0).endsWith("r0")); } - @Test - public void testSimpleWrite() throws IOException { - SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); - CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100); - for (int i = 0; i < 1000; i++) { - SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("w" + i, false); - // Call write to hit the assertions inside the mock channel - final ByteBuffer buffer = ByteBuffer.allocate(1); - rc.write(buffer); - rc.close(); - } - assertEquals(900, closed.size()); - for (int i = 0; i < 900; i++) { - assertTrue(closed.get(i).endsWith("w" + (i))); - } - } - - @Test - public void testSimpleAppend() throws IOException { - SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); - CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100); - for (int i = 0; i < 1000; i++) { - SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("a" + i, true); - rc.close(); - } - assertEquals(900, closed.size()); - for (int i = 0; i < 900; i++) { - assertTrue(closed.get(i).endsWith("a" + (i))); - } - } - @Test public void testCloseOrder() throws IOException { SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); @@ -147,33 +113,6 @@ public void testReuse() throws IOException { assertEquals(0, closed.size()); } - @Test - public void testReuse10() throws IOException { - final SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); - final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100); - final SeekableByteChannel[] someResult = new SeekableByteChannel[100]; - for (int pi = 0; pi < 10; ++pi) { - for (int ci = 0; ci < 10; ++ci) { - someResult[pi * 10 + ci] = cachedChannelProvider.getWriteChannel("w" + pi % 10, false); - } - for (int ci = 0; ci < 10; ++ci) { - someResult[pi * 10 + 9 - ci].close(); - } - } - for (int step = 0; step < 10; ++step) { - final SeekableByteChannel[] reused = new SeekableByteChannel[100]; - for (int ri = 0; ri < 100; ++ri) { - SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("w" + (ri / 10) % 10, false); - assertSame(rc, someResult[ri % 100]); - reused[ri] = rc; - } - for (int ri = 0; ri < 100; ++ri) { - reused[99 - ri].close(); - } - } - assertEquals(0, closed.size()); - } - @Test void testRewrapCachedChannelProvider() { final SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); @@ -231,13 +170,8 @@ public SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channe } @Override - public SeekableByteChannel getWriteChannel(@NotNull String path, boolean append) { - return new TestMockChannel(count.getAndIncrement(), path); - } - - @Override - public SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) { - return new TestMockChannel(count.getAndIncrement(), path.toString()); + public CompletableOutputStream getOutputStream(@NotNull final URI uri, int bufferSizeHint) { + throw new UnsupportedOperationException("getOutputStream"); } @Override diff --git a/extensions/parquet/base/build.gradle b/extensions/parquet/base/build.gradle index b6edd934c89..76bcfd2f7ed 100644 --- a/extensions/parquet/base/build.gradle +++ b/extensions/parquet/base/build.gradle @@ -19,6 +19,7 @@ dependencies { implementation project(':Configuration') implementation project(':DataStructures') implementation libs.commons.io + implementation libs.guava compileOnly libs.jetbrains.annotations testImplementation libs.junit4 diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java index 7072ab0120f..8d3b523af01 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java @@ -3,6 +3,7 @@ // package io.deephaven.parquet.base; +import com.google.common.io.CountingOutputStream; import org.apache.parquet.format.converter.ParquetMetadataConverter; import io.deephaven.parquet.compress.CompressorAdapter; import io.deephaven.util.QueryConstants; @@ -40,7 +41,7 @@ final class ColumnWriterImpl implements ColumnWriter { private static final int MIN_SLAB_SIZE = 64; - private final PositionedBufferedOutputStream bufferedOutput; + private final CountingOutputStream countingOutput; private final ColumnDescriptor column; private final RowGroupWriterImpl owner; private final CompressorAdapter compressorAdapter; @@ -68,12 +69,12 @@ final class ColumnWriterImpl implements ColumnWriter { ColumnWriterImpl( final RowGroupWriterImpl owner, - final PositionedBufferedOutputStream bufferedOutput, + final CountingOutputStream countingOutput, final ColumnDescriptor column, final CompressorAdapter compressorAdapter, final int targetPageSize, final ByteBufferAllocator allocator) { - this.bufferedOutput = bufferedOutput; + this.countingOutput = countingOutput; this.column = column; this.compressorAdapter = compressorAdapter; this.targetPageSize = targetPageSize; @@ -132,7 +133,7 @@ public void addDictionaryPage(@NotNull final Object dictionaryValues, final int // noinspection unchecked dictionaryWriter.writeBulk(dictionaryValues, valuesCount, NullStatistics.INSTANCE); - dictionaryOffset = bufferedOutput.position(); + dictionaryOffset = countingOutput.getCount(); writeDictionaryPage(dictionaryWriter.getByteBufferView(), valuesCount); pageCount++; hasDictionary = true; @@ -140,7 +141,7 @@ public void addDictionaryPage(@NotNull final Object dictionaryValues, final int } private void writeDictionaryPage(final ByteBuffer dictionaryBuffer, final int valuesCount) throws IOException { - final long currentChunkDictionaryPageOffset = bufferedOutput.position(); + final long currentChunkDictionaryPageOffset = countingOutput.getCount(); final int uncompressedSize = dictionaryBuffer.remaining(); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -157,11 +158,11 @@ private void writeDictionaryPage(final ByteBuffer dictionaryBuffer, final int va compressedPageSize, valuesCount, Encoding.PLAIN, - bufferedOutput); - final long headerSize = bufferedOutput.position() - currentChunkDictionaryPageOffset; + countingOutput); + final long headerSize = countingOutput.getCount() - currentChunkDictionaryPageOffset; this.uncompressedLength += uncompressedSize + headerSize; this.compressedLength += compressedPageSize + headerSize; - compressedBytes.writeAllTo(bufferedOutput); + compressedBytes.writeAllTo(countingOutput); encodings.add(Encoding.PLAIN); } @@ -294,7 +295,7 @@ public void writePageV2( final BytesInput compressedData = BytesInput.from(baos); final int compressedSize = (int) (compressedData.size() + repetitionLevels.size() + definitionLevels.size()); - final long initialOffset = bufferedOutput.position(); + final long initialOffset = countingOutput.getCount(); if (firstDataPageOffset == -1) { firstDataPageOffset = initialOffset; } @@ -303,20 +304,20 @@ public void writePageV2( valueCount, nullCount, rowCount, rlByteLength, dlByteLength, - bufferedOutput); - final long headerSize = bufferedOutput.position() - initialOffset; + countingOutput); + final long headerSize = countingOutput.getCount() - initialOffset; this.uncompressedLength += (uncompressedSize + headerSize); this.compressedLength += (compressedSize + headerSize); this.totalValueCount += valueCount; this.pageCount += 1; - definitionLevels.writeAllTo(bufferedOutput); - compressedData.writeAllTo(bufferedOutput); + definitionLevels.writeAllTo(countingOutput); + compressedData.writeAllTo(countingOutput); } private void writePage(final BytesInput bytes, final int valueCount, final long rowCount, final Encoding valuesEncoding) throws IOException { - final long initialOffset = bufferedOutput.position(); + final long initialOffset = countingOutput.getCount(); if (firstDataPageOffset == -1) { firstDataPageOffset = initialOffset; } @@ -346,15 +347,15 @@ private void writePage(final BytesInput bytes, final int valueCount, final long (int) compressedSize, valueCount, valuesEncoding, - bufferedOutput); - final long headerSize = bufferedOutput.position() - initialOffset; + countingOutput); + final long headerSize = countingOutput.getCount() - initialOffset; this.uncompressedLength += (uncompressedSize + headerSize); this.compressedLength += (compressedSize + headerSize); this.totalValueCount += valueCount; this.pageCount += 1; - compressedBytes.writeAllTo(bufferedOutput); - offsetIndexBuilder.add((int) (bufferedOutput.position() - initialOffset), rowCount); + compressedBytes.writeAllTo(countingOutput); + offsetIndexBuilder.add((int) (countingOutput.getCount() - initialOffset), rowCount); encodings.add(valuesEncoding); encodingStatsBuilder.addDataEncoding(valuesEncoding); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/NullParquetMetadataFileWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/NullParquetMetadataFileWriter.java index bb9be1350e6..e0878476b09 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/NullParquetMetadataFileWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/NullParquetMetadataFileWriter.java @@ -3,8 +3,11 @@ // package io.deephaven.parquet.base; +import io.deephaven.util.channel.CompletableOutputStream; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import java.net.URI; + /** * A no-op implementation of MetadataFileWriterBase when we don't want to write metadata files for Parquet files. */ @@ -13,11 +16,10 @@ public enum NullParquetMetadataFileWriter implements ParquetMetadataFileWriter { INSTANCE; @Override - public void addParquetFileMetadata(final String parquetFilePath, final ParquetMetadata metadata) {} - - @Override - public void writeMetadataFiles(final String metadataFilePath, final String commonMetadataFilePath) {} + public void addParquetFileMetadata(final URI parquetFileURI, final ParquetMetadata metadata) {} @Override - public void clear() {} + public void writeMetadataFiles( + final CompletableOutputStream metadataOutputStream, + final CompletableOutputStream commonMetadataOutputStream) {} } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java index 81dc13a4430..8cf51a65e7e 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java @@ -3,13 +3,12 @@ // package io.deephaven.parquet.base; +import com.google.common.io.CountingOutputStream; import org.apache.parquet.format.converter.ParquetMetadataConverter; -import io.deephaven.util.channel.SeekableChannelsProvider; import io.deephaven.parquet.compress.CompressorAdapter; import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory; import org.apache.parquet.Version; import org.apache.parquet.bytes.ByteBufferAllocator; -import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.format.Util; import org.apache.parquet.hadoop.metadata.*; @@ -19,20 +18,21 @@ import org.jetbrains.annotations.NotNull; import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import static io.deephaven.parquet.base.ParquetUtils.MAGIC; -import static io.deephaven.parquet.base.ParquetUtils.PARQUET_OUTPUT_BUFFER_SIZE; import static org.apache.parquet.format.Util.writeFileMetaData; -public final class ParquetFileWriter { +public final class ParquetFileWriter implements AutoCloseable { private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); private static final int VERSION = 1; - private final PositionedBufferedOutputStream bufferedOutput; + private final CountingOutputStream countingOutput; private final MessageType type; private final int targetPageSize; private final ByteBufferAllocator allocator; @@ -40,13 +40,12 @@ public final class ParquetFileWriter { private final Map extraMetaData; private final List blocks = new ArrayList<>(); private final List> offsetIndexes = new ArrayList<>(); - private final String destFilePathForMetadata; + private final URI dest; private final ParquetMetadataFileWriter metadataFileWriter; public ParquetFileWriter( - final String destFilePath, - final String destFilePathForMetadata, - final SeekableChannelsProvider channelsProvider, + final URI dest, + final OutputStream destOutputStream, final int targetPageSize, final ByteBufferAllocator allocator, final MessageType type, @@ -56,44 +55,52 @@ public ParquetFileWriter( this.targetPageSize = targetPageSize; this.allocator = allocator; this.extraMetaData = new HashMap<>(extraMetaData); - bufferedOutput = new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(destFilePath, false), - PARQUET_OUTPUT_BUFFER_SIZE); - bufferedOutput.write(MAGIC); + this.countingOutput = new CountingOutputStream(destOutputStream); + countingOutput.write(MAGIC); this.type = type; this.compressorAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName(codecName); - this.destFilePathForMetadata = destFilePathForMetadata; + this.dest = dest; this.metadataFileWriter = metadataFileWriter; } public RowGroupWriter addRowGroup(final long size) { final RowGroupWriterImpl rowGroupWriter = - new RowGroupWriterImpl(bufferedOutput, type, targetPageSize, allocator, compressorAdapter); + new RowGroupWriterImpl(countingOutput, type, targetPageSize, allocator, compressorAdapter); rowGroupWriter.getBlock().setRowCount(size); blocks.add(rowGroupWriter.getBlock()); offsetIndexes.add(rowGroupWriter.offsetIndexes()); return rowGroupWriter; } + @Override public void close() throws IOException { serializeOffsetIndexes(); final ParquetMetadata footer = new ParquetMetadata(new FileMetaData(type, extraMetaData, Version.FULL_VERSION), blocks); - serializeFooter(footer, bufferedOutput); - metadataFileWriter.addParquetFileMetadata(destFilePathForMetadata, footer); - // Flush any buffered data and close the channel - bufferedOutput.close(); + serializeFooter(footer, countingOutput); + metadataFileWriter.addParquetFileMetadata(dest, footer); + // Flush any buffered data, do not close the stream since it is managed by the layer above + countingOutput.flush(); compressorAdapter.close(); } - public static void serializeFooter(final ParquetMetadata footer, - final PositionedBufferedOutputStream bufferedOutput) + public static void serializeFooter(final ParquetMetadata footer, final CountingOutputStream countingOutput) throws IOException { - final long footerIndex = bufferedOutput.position(); + final long footerIndex = countingOutput.getCount(); final org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(VERSION, footer); - writeFileMetaData(parquetMetadata, bufferedOutput); - BytesUtils.writeIntLittleEndian(bufferedOutput, (int) (bufferedOutput.position() - footerIndex)); - bufferedOutput.write(MAGIC); + writeFileMetaData(parquetMetadata, countingOutput); + countingOutput.write(intToLittleEndian((int) (countingOutput.getCount() - footerIndex))); + countingOutput.write(MAGIC); + } + + private static byte[] intToLittleEndian(final int value) { + return new byte[] { + (byte) (value & 0xFF), + (byte) ((value >> 8) & 0xFF), + (byte) ((value >> 16) & 0xFF), + (byte) ((value >> 24) & 0xFF) + }; } private void serializeOffsetIndexes() throws IOException { @@ -106,9 +113,10 @@ private void serializeOffsetIndexes() throws IOException { continue; } final ColumnChunkMetaData column = columns.get(cIndex); - final long offset = bufferedOutput.position(); - Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), bufferedOutput); - column.setOffsetIndexReference(new IndexReference(offset, (int) (bufferedOutput.position() - offset))); + final long offset = countingOutput.getCount(); + Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), countingOutput); + column.setOffsetIndexReference( + new IndexReference(offset, (int) (countingOutput.getCount() - offset))); } } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetMetadataFileWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetMetadataFileWriter.java index 42a93dc24e1..0a8c9c4ef42 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetMetadataFileWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetMetadataFileWriter.java @@ -3,9 +3,11 @@ // package io.deephaven.parquet.base; +import io.deephaven.util.channel.CompletableOutputStream; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import java.io.IOException; +import java.net.URI; /** * Used to write {@value ParquetUtils#METADATA_FILE_NAME} and {@value ParquetUtils#COMMON_METADATA_FILE_NAME} files for @@ -17,21 +19,19 @@ public interface ParquetMetadataFileWriter { * Add the parquet metadata for the provided parquet file to the list of metadata to be written to combined metadata * files. * - * @param parquetFilePath The parquet file destination path + * @param parquetFileURI The parquet file destination URI * @param metadata The parquet metadata corresponding to the parquet file */ - void addParquetFileMetadata(String parquetFilePath, ParquetMetadata metadata); + void addParquetFileMetadata(URI parquetFileURI, ParquetMetadata metadata); /** - * Write the combined metadata files for all metadata accumulated so far and clear the list. + * Write the combined metadata to the provided streams and clear the metadata accumulated so far. The output streams + * should be marked as {@link CompletableOutputStream#done()} after writing is finished. * - * @param metadataFilePath The destination path for the {@value ParquetUtils#METADATA_FILE_NAME} file - * @param commonMetadataFilePath The destination path for the {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file + * @param metadataOutputStream The output stream for the {@value ParquetUtils#METADATA_FILE_NAME} file + * @param commonMetadataOutputStream The output stream for the {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file */ - void writeMetadataFiles(String metadataFilePath, String commonMetadataFilePath) throws IOException; - - /** - * Clear the list of metadata accumulated so far. - */ - void clear(); + void writeMetadataFiles( + CompletableOutputStream metadataOutputStream, + CompletableOutputStream commonMetadataOutputStream) throws IOException; } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java index 70f83f9adfc..467c7b22d8a 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java @@ -90,7 +90,9 @@ public static boolean isVisibleParquetFile(@NotNull final Path rootDir, @NotNull } /** - * Resolve a relative path against a base URI. The path can be from Windows or Unix systems. + * Resolve a relative path against a base URI. The path can be from Windows or Unix systems. This method should be + * used if we expect the relative path to contain file separators or special characters, otherwise use + * {@code base.resolve(relativePath)} */ public static URI resolve(final URI base, final String relativePath) { final URI relativeURI; diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PositionedBufferedOutputStream.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PositionedBufferedOutputStream.java deleted file mode 100644 index 3d26162f806..00000000000 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PositionedBufferedOutputStream.java +++ /dev/null @@ -1,29 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.parquet.base; - -import org.jetbrains.annotations.NotNull; - -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.nio.channels.Channels; -import java.nio.channels.SeekableByteChannel; - -public final class PositionedBufferedOutputStream extends BufferedOutputStream { - - private final SeekableByteChannel writeChannel; - - public PositionedBufferedOutputStream(@NotNull final SeekableByteChannel writeChannel, final int size) { - super(Channels.newOutputStream(writeChannel), size); - this.writeChannel = writeChannel; - } - - /** - * Get the total number of bytes written to this stream - */ - long position() throws IOException { - // Number of bytes buffered in the stream + bytes written to the underlying channel - return this.count + writeChannel.position(); - } -} diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java index c873f15d495..6d387228866 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java @@ -3,6 +3,7 @@ // package io.deephaven.parquet.base; +import com.google.common.io.CountingOutputStream; import io.deephaven.parquet.compress.CompressorAdapter; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -17,7 +18,7 @@ import java.util.List; final class RowGroupWriterImpl implements RowGroupWriter { - private final PositionedBufferedOutputStream bufferedOutput; + private final CountingOutputStream countingOutput; private final MessageType type; private final int targetPageSize; private final ByteBufferAllocator allocator; @@ -26,22 +27,22 @@ final class RowGroupWriterImpl implements RowGroupWriter { private final List currentOffsetIndexes = new ArrayList<>(); private final CompressorAdapter compressorAdapter; - RowGroupWriterImpl(PositionedBufferedOutputStream bufferedOutput, + RowGroupWriterImpl(CountingOutputStream countingOutput, MessageType type, int targetPageSize, ByteBufferAllocator allocator, CompressorAdapter compressorAdapter) { - this(bufferedOutput, type, targetPageSize, allocator, new BlockMetaData(), compressorAdapter); + this(countingOutput, type, targetPageSize, allocator, new BlockMetaData(), compressorAdapter); } - private RowGroupWriterImpl(PositionedBufferedOutputStream bufferedOutput, + private RowGroupWriterImpl(CountingOutputStream countingOutput, MessageType type, int targetPageSize, ByteBufferAllocator allocator, BlockMetaData blockMetaData, CompressorAdapter compressorAdapter) { - this.bufferedOutput = bufferedOutput; + this.countingOutput = countingOutput; this.type = type; this.targetPageSize = targetPageSize; this.allocator = allocator; @@ -72,7 +73,7 @@ public ColumnWriter addColumn(String columnName) { + " need to close that before opening a writer for " + columnName); } activeWriter = new ColumnWriterImpl(this, - bufferedOutput, + countingOutput, type.getColumnDescription(getPrimitivePath(columnName)), compressorAdapter, targetPageSize, diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetMetadataFileWriterImpl.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetMetadataFileWriterImpl.java index 90da731eb0c..c0ea3a6f43c 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetMetadataFileWriterImpl.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetMetadataFileWriterImpl.java @@ -3,15 +3,14 @@ // package io.deephaven.parquet.table; +import com.google.common.io.CountingOutputStream; import io.deephaven.UncheckedDeephavenException; import io.deephaven.parquet.base.ParquetFileWriter; import io.deephaven.parquet.base.ParquetMetadataFileWriter; import io.deephaven.parquet.base.ParquetUtils; -import io.deephaven.parquet.base.PositionedBufferedOutputStream; import io.deephaven.parquet.table.metadata.ColumnTypeInfo; import io.deephaven.parquet.table.metadata.TableInfo; -import io.deephaven.util.channel.SeekableChannelsProvider; -import io.deephaven.util.channel.SeekableChannelsProviderLoader; +import io.deephaven.util.channel.CompletableOutputStream; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -19,9 +18,9 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.io.File; import java.io.IOException; -import java.nio.file.Path; +import java.io.OutputStream; +import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -29,7 +28,6 @@ import java.util.List; import java.util.Map; -import static io.deephaven.base.FileUtils.convertToURI; import static io.deephaven.parquet.base.ParquetUtils.MAGIC; import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY; import static io.deephaven.parquet.base.ParquetUtils.getPerFileMetadataKey; @@ -45,18 +43,17 @@ final class ParquetMetadataFileWriterImpl implements ParquetMetadataFileWriter { * A class to hold the parquet file and its metadata. */ private static class ParquetFileMetadata { - final String filePath; + final URI uri; final ParquetMetadata metadata; - ParquetFileMetadata(final String filePath, final ParquetMetadata metadata) { - this.filePath = filePath; + ParquetFileMetadata(final URI uri, final ParquetMetadata metadata) { + this.uri = uri; this.metadata = metadata; } } - private final Path metadataRootDirAbsPath; + private final URI metadataRootDir; private final List parquetFileMetadataList; - private final SeekableChannelsProvider channelsProvider; private final MessageType partitioningColumnsSchema; // The following fields are used to accumulate metadata for all parquet files @@ -76,23 +73,22 @@ private static class ParquetFileMetadata { * @param partitioningColumnsSchema The common schema for partitioning columns to be included in the * {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file, can be null if there are no partitioning columns. */ - ParquetMetadataFileWriterImpl(@NotNull final File metadataRootDir, @NotNull final File[] destinations, + ParquetMetadataFileWriterImpl( + @NotNull final URI metadataRootDir, + @NotNull final URI[] destinations, @Nullable final MessageType partitioningColumnsSchema) { if (destinations.length == 0) { throw new IllegalArgumentException("No destinations provided"); } - this.metadataRootDirAbsPath = metadataRootDir.getAbsoluteFile().toPath(); - final String metadataRootDirAbsPathString = metadataRootDirAbsPath.toString(); - for (final File destination : destinations) { - if (!destination.getAbsolutePath().startsWith(metadataRootDirAbsPathString)) { + this.metadataRootDir = metadataRootDir; + final String metadataRootDirStr = metadataRootDir.toString(); + for (final URI destination : destinations) { + if (!destination.toString().startsWith(metadataRootDirStr)) { throw new UncheckedDeephavenException("All destinations must be nested under the provided metadata root" - + " directory, provided destination " + destination.getAbsolutePath() + " is not under " + - metadataRootDirAbsPathString); + + " directory, provided destination " + destination + " is not under " + metadataRootDir); } } this.parquetFileMetadataList = new ArrayList<>(destinations.length); - this.channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader( - convertToURI(metadataRootDirAbsPathString, true), null); this.partitioningColumnsSchema = partitioningColumnsSchema; this.mergedSchema = null; @@ -106,28 +102,31 @@ private static class ParquetFileMetadata { /** * Add parquet metadata for the provided parquet file to the combined metadata file. * - * @param parquetFilePath The parquet file destination path + * @param parquetFileURI The parquet file destination URI * @param metadata The parquet metadata */ - public void addParquetFileMetadata(final String parquetFilePath, final ParquetMetadata metadata) { - parquetFileMetadataList.add(new ParquetFileMetadata(parquetFilePath, metadata)); + public void addParquetFileMetadata(final URI parquetFileURI, final ParquetMetadata metadata) { + parquetFileMetadataList.add(new ParquetFileMetadata(parquetFileURI, metadata)); } /** - * Write the accumulated metadata to the provided files and clear the metadata accumulated so far. + * Write the combined metadata to the provided streams and clear the metadata accumulated so far. The output streams + * are marked as {@link CompletableOutputStream#done()} after writing is finished. * - * @param metadataFilePath The destination path for the {@value ParquetUtils#METADATA_FILE_NAME} file - * @param commonMetadataFilePath The destination path for the {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file + * @param metadataOutputStream The output stream for the {@value ParquetUtils#METADATA_FILE_NAME} file + * @param commonMetadataOutputStream The output stream for the {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file */ - public void writeMetadataFiles(final String metadataFilePath, final String commonMetadataFilePath) - throws IOException { + public void writeMetadataFiles( + final CompletableOutputStream metadataOutputStream, + final CompletableOutputStream commonMetadataOutputStream) throws IOException { if (parquetFileMetadataList.isEmpty()) { throw new UncheckedDeephavenException("No parquet files to write metadata for"); } mergeMetadata(); final ParquetMetadata metadataFooter = new ParquetMetadata(new FileMetaData(mergedSchema, mergedKeyValueMetaData, mergedCreatedByString), mergedBlocks); - writeMetadataFile(metadataFooter, metadataFilePath); + writeMetadataFile(metadataFooter, metadataOutputStream); + metadataOutputStream.done(); // Skip the blocks data and merge schema with partitioning columns' schema to write the common metadata file. // The ordering of arguments in method call is important because we want to keep partitioning columns in the @@ -136,7 +135,8 @@ public void writeMetadataFiles(final String metadataFilePath, final String commo final ParquetMetadata commonMetadataFooter = new ParquetMetadata(new FileMetaData(mergedSchema, mergedKeyValueMetaData, mergedCreatedByString), new ArrayList<>()); - writeMetadataFile(commonMetadataFooter, commonMetadataFilePath); + writeMetadataFile(commonMetadataFooter, commonMetadataOutputStream); + commonMetadataOutputStream.done(); // Clear the accumulated metadata clear(); @@ -150,7 +150,7 @@ private void mergeMetadata() throws IOException { for (final ParquetFileMetadata parquetFileMetadata : parquetFileMetadataList) { final FileMetaData fileMetaData = parquetFileMetadata.metadata.getFileMetaData(); mergedSchema = mergeSchemaInto(fileMetaData.getSchema(), mergedSchema); - final String relativePath = getRelativePath(parquetFileMetadata.filePath, metadataRootDirAbsPath); + final String relativePath = metadataRootDir.relativize(parquetFileMetadata.uri).getPath(); mergeKeyValueMetaData(parquetFileMetadata, relativePath); mergeBlocksInto(parquetFileMetadata, relativePath, mergedBlocks); mergedCreatedBy.add(fileMetaData.getCreatedBy()); @@ -218,7 +218,7 @@ private void mergeKeyValueMetaData(@NotNull final ParquetFileMetadata parquetFil // Assuming the keys are unique for each file because file names are unique, verified in the constructor if (mergedKeyValueMetaData.containsKey(fileKey)) { throw new IllegalStateException("Could not merge metadata for file " + - parquetFileMetadata.filePath + " because it has conflicting file key: " + fileKey); + parquetFileMetadata.uri + " because it has conflicting file key: " + fileKey); } mergedKeyValueMetaData.put(fileKey, entry.getValue()); @@ -253,21 +253,18 @@ private static void mergeBlocksInto(final ParquetFileMetadata parquetFileMetadat } } - private static String getRelativePath(final String parquetFilePath, final Path metadataRootDirAbsPath) { - final Path parquetFileAbsPath = new File(parquetFilePath).getAbsoluteFile().toPath(); - return metadataRootDirAbsPath.relativize(parquetFileAbsPath).toString(); - } - - private void writeMetadataFile(final ParquetMetadata metadataFooter, final String outputPath) throws IOException { - final PositionedBufferedOutputStream metadataOutputStream = - new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(outputPath, false), - ParquetUtils.PARQUET_OUTPUT_BUFFER_SIZE); - metadataOutputStream.write(MAGIC); - ParquetFileWriter.serializeFooter(metadataFooter, metadataOutputStream); - metadataOutputStream.close(); + private static void writeMetadataFile(final ParquetMetadata metadataFooter, final OutputStream outputStream) + throws IOException { + final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream); + countingOutputStream.write(MAGIC); + ParquetFileWriter.serializeFooter(metadataFooter, countingOutputStream); + countingOutputStream.flush(); } - public void clear() { + /** + * Clear the list of metadata accumulated so far. + */ + private void clear() { parquetFileMetadataList.clear(); mergedKeyValueMetaData.clear(); mergedBlocks.clear(); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java index a0c63614a82..ebb1d17571d 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java @@ -27,8 +27,7 @@ import io.deephaven.stringset.StringSet; import io.deephaven.util.QueryConstants; import io.deephaven.util.SafeCloseable; -import io.deephaven.util.annotations.VisibleForTesting; -import io.deephaven.util.channel.SeekableChannelsProviderLoader; +import io.deephaven.util.channel.CompletableOutputStream; import io.deephaven.vector.Vector; import org.apache.commons.lang3.tuple.Pair; import org.apache.parquet.bytes.HeapByteBufferAllocator; @@ -39,14 +38,13 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.io.File; import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; import java.nio.IntBuffer; -import java.nio.file.Path; import java.util.*; import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY; -import static io.deephaven.base.FileUtils.convertToURI; /** * API for writing DH tables in parquet format @@ -72,25 +70,23 @@ static class IndexWritingInfo { */ final String[] parquetColumnNames; /** - * File path to be added in the index metadata of the main parquet file + * Destination for writing the index file */ - final File destFileForMetadata; + final URI dest; /** - * Destination path for writing the index file. The two filenames can differ because we write index files to - * shadow file paths first and then place them at the final path once the write is complete. The metadata should - * always hold the accurate path. + * Output stream to write the index file */ - final File destFile; + final CompletableOutputStream destOutputStream; IndexWritingInfo( final List indexColumnNames, final String[] parquetColumnNames, - final File destFileForMetadata, - final File destFile) { + final URI dest, + final CompletableOutputStream destOutputStream) { this.indexColumnNames = indexColumnNames; this.parquetColumnNames = parquetColumnNames; - this.destFileForMetadata = destFileForMetadata.getAbsoluteFile(); - this.destFile = destFile.getAbsoluteFile(); + this.dest = dest; + this.destOutputStream = destOutputStream; } } @@ -100,10 +96,9 @@ static class IndexWritingInfo { * @param t The table to write * @param definition Table definition * @param writeInstructions Write instructions for customizations while writing - * @param destFilePath The destination path - * @param destFilePathForMetadata The destination path to store in the metadata files. This can be different from - * {@code destFilePath} if we are writing the parquet file to a shadow location first since the metadata - * should always hold the accurate path. + * @param dest The destination URI to write to + * @param destOutputStream The output stream to write to dest, should be marked as + * {@link CompletableOutputStream#done()} once writing is finished * @param incomingMeta A map of metadata values to be stores in the file footer * @param indexInfoList Arrays containing the column names for indexes to persist as sidecar tables. Indexes that * are specified but missing will be computed on demand. @@ -120,8 +115,8 @@ static void write( @NotNull final Table t, @NotNull final TableDefinition definition, @NotNull final ParquetInstructions writeInstructions, - @NotNull final String destFilePath, - @NotNull final String destFilePathForMetadata, + @NotNull final URI dest, + @NotNull final CompletableOutputStream destOutputStream, @NotNull final Map incomingMeta, @Nullable final List indexInfoList, @NotNull final ParquetMetadataFileWriter metadataFileWriter, @@ -137,66 +132,49 @@ static void write( } final TableInfo.Builder tableInfoBuilder = TableInfo.builder(); - List cleanupFiles = null; - try { - if (indexInfoList != null) { - cleanupFiles = new ArrayList<>(indexInfoList.size()); - final Path destDirPath = new File(destFilePath).getAbsoluteFile().getParentFile().toPath(); - for (final ParquetTableWriter.IndexWritingInfo info : indexInfoList) { - try (final SafeCloseable ignored = t.isRefreshing() ? LivenessScopeStack.open() : null) { - // This will retrieve an existing index if one exists, or create a new one if not - final BasicDataIndex dataIndex = Optional - .ofNullable(DataIndexer.getDataIndex(t, info.indexColumnNames)) - .or(() -> Optional.of(DataIndexer.getOrCreateDataIndex(t, info.indexColumnNames))) - .get() - .transform(DataIndexTransformer.builder().invertRowSet(t.getRowSet()).build()); - final Table indexTable = dataIndex.table().sort(info.indexColumnNames.toArray(new String[0])); - final TableInfo.Builder indexTableInfoBuilder = TableInfo.builder().addSortingColumns( - info.indexColumnNames.stream() - .map(cn -> SortColumnInfo.of(cn, SortColumnInfo.SortDirection.Ascending)) - .toArray(SortColumnInfo[]::new)); + if (indexInfoList != null) { + final URI destDir = dest.resolve("."); + for (final ParquetTableWriter.IndexWritingInfo info : indexInfoList) { + try (final SafeCloseable ignored = t.isRefreshing() ? LivenessScopeStack.open() : null) { + // This will retrieve an existing index if one exists, or create a new one if not + final BasicDataIndex dataIndex = Optional + .ofNullable(DataIndexer.getDataIndex(t, info.indexColumnNames)) + .or(() -> Optional.of(DataIndexer.getOrCreateDataIndex(t, info.indexColumnNames))) + .get() + .transform(DataIndexTransformer.builder().invertRowSet(t.getRowSet()).build()); + final Table indexTable = dataIndex.table().sort(info.indexColumnNames.toArray(new String[0])); + final TableInfo.Builder indexTableInfoBuilder = TableInfo.builder().addSortingColumns( + info.indexColumnNames.stream() + .map(cn -> SortColumnInfo.of(cn, SortColumnInfo.SortDirection.Ascending)) + .toArray(SortColumnInfo[]::new)); - cleanupFiles.add(info.destFile); - tableInfoBuilder.addDataIndexes(DataIndexInfo.of( - destDirPath.relativize(info.destFileForMetadata.toPath()).toString(), - info.parquetColumnNames)); - final ParquetInstructions writeInstructionsToUse; - if (INDEX_ROW_SET_COLUMN_NAME.equals(dataIndex.rowSetColumnName())) { - writeInstructionsToUse = writeInstructions; - } else { - writeInstructionsToUse = new ParquetInstructions.Builder(writeInstructions) - .addColumnNameMapping(INDEX_ROW_SET_COLUMN_NAME, dataIndex.rowSetColumnName()) - .build(); - } - write(indexTable, indexTable.getDefinition(), writeInstructionsToUse, - info.destFile.getAbsolutePath(), info.destFileForMetadata.getAbsolutePath(), - Collections.emptyMap(), indexTableInfoBuilder, NullParquetMetadataFileWriter.INSTANCE, - computedCache); + tableInfoBuilder.addDataIndexes(DataIndexInfo.of( + destDir.relativize(info.dest).getPath(), + info.parquetColumnNames)); + final ParquetInstructions writeInstructionsToUse; + if (INDEX_ROW_SET_COLUMN_NAME.equals(dataIndex.rowSetColumnName())) { + writeInstructionsToUse = writeInstructions; + } else { + writeInstructionsToUse = new ParquetInstructions.Builder(writeInstructions) + .addColumnNameMapping(INDEX_ROW_SET_COLUMN_NAME, dataIndex.rowSetColumnName()) + .build(); } + write(indexTable, indexTable.getDefinition(), writeInstructionsToUse, info.dest, + info.destOutputStream, Collections.emptyMap(), indexTableInfoBuilder, + NullParquetMetadataFileWriter.INSTANCE, computedCache); } } + } - // SortedColumnsAttribute effectively only stores (zero or more) individual columns by which the table is - // sorted, rather than ordered sets expressing multi-column sorts. Given that mismatch, we can only reflect - // a single column sort in the metadata at this time. - final List sortedColumns = SortedColumnsAttribute.getSortedColumns(t); - if (!sortedColumns.isEmpty()) { - tableInfoBuilder.addSortingColumns(SortColumnInfo.of(sortedColumns.get(0))); - } - write(t, definition, writeInstructions, destFilePath, destFilePathForMetadata, incomingMeta, - tableInfoBuilder, metadataFileWriter, computedCache); - } catch (Exception e) { - if (cleanupFiles != null) { - for (final File cleanupFile : cleanupFiles) { - try { - // noinspection ResultOfMethodCallIgnored - cleanupFile.delete(); - } catch (Exception ignored) { - } - } - } - throw e; + // SortedColumnsAttribute effectively only stores (zero or more) individual columns by which the table is + // sorted, rather than ordered sets expressing multi-column sorts. Given that mismatch, we can only reflect + // a single column sort in the metadata at this time. + final List sortedColumns = SortedColumnsAttribute.getSortedColumns(t); + if (!sortedColumns.isEmpty()) { + tableInfoBuilder.addSortingColumns(SortColumnInfo.of(sortedColumns.get(0))); } + write(t, definition, writeInstructions, dest, destOutputStream, incomingMeta, + tableInfoBuilder, metadataFileWriter, computedCache); } /** @@ -205,10 +183,9 @@ static void write( * @param table The table to write * @param definition The table definition * @param writeInstructions Write instructions for customizations while writing - * @param destFilePath The destination path - * @param destFilePathForMetadata The destination path to store in the metadata files. This can be different from - * {@code destFilePath} if we are writing the parquet file to a shadow location first since the metadata - * should always hold the accurate path. + * @param dest The destination URI to write to + * @param destOutputStream The output stream to write to dest, should be marked as + * {@link CompletableOutputStream#done()} once writing is finished * @param tableMeta A map of metadata values to be stores in the file footer * @param tableInfoBuilder A partially constructed builder for the metadata object * @param metadataFileWriter The writer for the {@value ParquetUtils#METADATA_FILE_NAME} and @@ -216,12 +193,12 @@ static void write( * @param computedCache Per column cache tags * @throws IOException For file writing related errors */ - static void write( + private static void write( @NotNull final Table table, @NotNull final TableDefinition definition, @NotNull final ParquetInstructions writeInstructions, - @NotNull final String destFilePath, - @NotNull final String destFilePathForMetadata, + @NotNull final URI dest, + @NotNull final CompletableOutputStream destOutputStream, @NotNull final Map tableMeta, @NotNull final TableInfo.Builder tableInfoBuilder, @NotNull final ParquetMetadataFileWriter metadataFileWriter, @@ -230,11 +207,13 @@ static void write( final Table t = pretransformTable(table, definition); final TrackingRowSet tableRowSet = t.getRowSet(); final Map> columnSourceMap = t.getColumnSourceMap(); - final ParquetFileWriter parquetFileWriter = getParquetFileWriter(computedCache, definition, tableRowSet, - columnSourceMap, destFilePath, destFilePathForMetadata, writeInstructions, tableMeta, - tableInfoBuilder, metadataFileWriter); - // Given the transformation, do not use the original table's "definition" for writing - write(t, writeInstructions, parquetFileWriter, computedCache); + try (final ParquetFileWriter parquetFileWriter = getParquetFileWriter(computedCache, definition, + tableRowSet, columnSourceMap, dest, destOutputStream, writeInstructions, tableMeta, + tableInfoBuilder, metadataFileWriter)) { + // Given the transformation, do not use the original table's "definition" for writing + write(t, writeInstructions, parquetFileWriter, computedCache); + } + destOutputStream.done(); } } @@ -269,8 +248,6 @@ private static void write( } } } - - parquetFileWriter.close(); } /** @@ -336,19 +313,16 @@ private static Table pretransformTable(@NotNull final Table table, @NotNull fina * Create a {@link ParquetFileWriter} for writing the table to disk. * * @param computedCache Per column cache tags - * @param definition the writable definition - * @param tableRowSet the row set being written - * @param columnSourceMap the columns of the table - * @param destFilePath the destination to write to - * @param destFilePathForMetadata The destination path to store in the metadata files. This can be different from - * {@code destFilePath} if we are writing the parquet file to a shadow location first since the metadata - * should always hold the accurate path. - * @param writeInstructions write instructions for the file - * @param tableMeta metadata to include in the parquet metadata - * @param tableInfoBuilder a builder for accumulating per-column information to construct the deephaven metadata + * @param definition The writable definition + * @param tableRowSet The row set being written + * @param columnSourceMap The columns of the table + * @param dest The destination URI to write to + * @param destOutputStream The output stream to write to dest + * @param writeInstructions Write instructions for the file + * @param tableMeta Metadata to include in the parquet metadata + * @param tableInfoBuilder Builder for accumulating per-column information to construct the deephaven metadata * @param metadataFileWriter The writer for the {@value ParquetUtils#METADATA_FILE_NAME} and * {@value ParquetUtils#COMMON_METADATA_FILE_NAME} files - * * @return a new file writer */ @NotNull @@ -357,8 +331,8 @@ private static ParquetFileWriter getParquetFileWriter( @NotNull final TableDefinition definition, @NotNull final RowSet tableRowSet, @NotNull final Map> columnSourceMap, - @NotNull final String destFilePath, - @NotNull final String destFilePathForMetadata, + @NotNull final URI dest, + @NotNull final OutputStream destOutputStream, @NotNull final ParquetInstructions writeInstructions, @NotNull final Map tableMeta, @NotNull final TableInfo.Builder tableInfoBuilder, @@ -404,21 +378,18 @@ private static ParquetFileWriter getParquetFileWriter( final Map extraMetaData = new HashMap<>(tableMeta); extraMetaData.put(METADATA_KEY, tableInfoBuilder.build().serializeToJSON()); - return new ParquetFileWriter(destFilePath, destFilePathForMetadata, - SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(destFilePath, false), null), - writeInstructions.getTargetPageSize(), + return new ParquetFileWriter(dest, destOutputStream, writeInstructions.getTargetPageSize(), new HeapByteBufferAllocator(), mappedSchema.getParquetSchema(), writeInstructions.getCompressionCodecName(), extraMetaData, metadataFileWriter); } - @VisibleForTesting - static void writeColumnSource( + private static void writeColumnSource( @NotNull final RowSet tableRowSet, @NotNull final ParquetInstructions writeInstructions, @NotNull final RowGroupWriter rowGroupWriter, @NotNull final Map> computedCache, @NotNull final String columnName, - @NotNull ColumnSource columnSource) throws IllegalAccessException, IOException { + @NotNull final ColumnSource columnSource) throws IllegalAccessException, IOException { try (final ColumnWriter columnWriter = rowGroupWriter.addColumn( writeInstructions.getParquetColumnNameFromColumnNameOrDefault(columnName))) { boolean usedDictionary = false; @@ -435,8 +406,8 @@ static void writeColumnSource( /** * Makes a copy of the given buffer */ - private static IntBuffer makeCopy(IntBuffer orig) { - IntBuffer copy = IntBuffer.allocate(orig.capacity()); + private static IntBuffer makeCopy(final IntBuffer orig) { + final IntBuffer copy = IntBuffer.allocate(orig.capacity()); copy.put(orig).flip(); return copy; } @@ -534,9 +505,9 @@ private static void encodePlain( try (final TransferObject transferObject = TransferObject.create( tableRowSet, writeInstructions, computedCache, columnName, columnSource)) { final Statistics statistics = columnWriter.getStats(); - boolean writeVectorPages = (transferObject instanceof ArrayAndVectorTransfer); + final boolean writeVectorPages = (transferObject instanceof ArrayAndVectorTransfer); do { - int numValuesBuffered = transferObject.transferOnePageToBuffer(); + final int numValuesBuffered = transferObject.transferOnePageToBuffer(); if (writeVectorPages) { columnWriter.addVectorPage(transferObject.getBuffer(), transferObject.getRepeatCount(), numValuesBuffered, statistics); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 5422acf5e96..ab035bdd0c8 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -22,9 +22,9 @@ import io.deephaven.parquet.base.ParquetMetadataFileWriter; import io.deephaven.parquet.base.NullParquetMetadataFileWriter; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.channel.CompletableOutputStream; import io.deephaven.util.channel.SeekableChannelsProvider; import io.deephaven.util.channel.SeekableChannelsProviderLoader; -import io.deephaven.util.channel.SeekableChannelsProviderPlugin; import io.deephaven.vector.*; import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.PartitionAwareSourceTable; @@ -53,14 +53,17 @@ import org.jetbrains.annotations.Nullable; import java.io.File; +import java.io.IOException; import java.math.BigDecimal; import java.net.URI; import java.util.*; import java.util.function.Supplier; import java.util.stream.Collectors; +import static io.deephaven.base.FileUtils.URI_SEPARATOR_CHAR; import static io.deephaven.base.FileUtils.convertToURI; -import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME; +import static io.deephaven.parquet.base.ParquetUtils.PARQUET_OUTPUT_BUFFER_SIZE; +import static io.deephaven.parquet.base.ParquetUtils.resolve; import static io.deephaven.parquet.table.ParquetInstructions.FILE_INDEX_TOKEN; import static io.deephaven.parquet.table.ParquetInstructions.PARTITIONS_TOKEN; import static io.deephaven.parquet.table.ParquetInstructions.UUID_TOKEN; @@ -193,15 +196,6 @@ private static ParquetInstructions ensureTableDefinition( return instructions; } - private static File getShadowFile(final File destFile) { - return new File(destFile.getParent(), ".NEW_" + destFile.getName()); - } - - @VisibleForTesting - static File getBackupFile(final File destFile) { - return new File(destFile.getParent(), ".OLD_" + destFile.getName()); - } - private static String minusParquetSuffix(@NotNull final String s) { if (s.endsWith(PARQUET_FILE_EXTENSION)) { return s.substring(0, s.length() - PARQUET_FILE_EXTENSION.length()); @@ -209,10 +203,22 @@ private static String minusParquetSuffix(@NotNull final String s) { return s; } + /** + * Get the name of the file from the URI. + */ + private static String getFileName(@NotNull final URI uri) { + final String path = uri.getPath(); + final int lastSlash = path.lastIndexOf(URI_SEPARATOR_CHAR); + if (lastSlash == path.length() - 1) { + throw new IllegalArgumentException("Directory URIs are not supported, found" + uri); + } + return lastSlash == -1 ? path : path.substring(lastSlash + 1); + } + /** * Generates the index file path relative to the table destination file path. * - * @param tableDest Destination path for the main table containing these indexing columns + * @param destFileName Destination name for the main table containing these indexing columns * @param columnNames Array of indexing column names * * @return The relative index file path. For example, for table with destination {@code "table.parquet"} and @@ -220,10 +226,10 @@ private static String minusParquetSuffix(@NotNull final String s) { * {@code ".dh_metadata/indexes/IndexingColName/index_IndexingColName_table.parquet"} on unix systems. */ @VisibleForTesting - static String getRelativeIndexFilePath(@NotNull final File tableDest, @NotNull final String... columnNames) { + static String getRelativeIndexFilePath(@NotNull final String destFileName, @NotNull final String... columnNames) { final String columns = String.join(",", columnNames); return String.format(".dh_metadata%sindexes%s%s%sindex_%s_%s", File.separator, File.separator, columns, - File.separator, columns, tableDest.getName()); + File.separator, columns, destFileName); } /** @@ -243,139 +249,36 @@ public static String legacyGroupingFileName(@NotNull final File tableDest, @NotN return prefix + "_" + columnName + "_grouping.parquet"; } - /** - * Delete any old backup files created for this destination, and throw an exception on failure - */ - private static void deleteBackupFile(@NotNull final File destFile) { - if (!deleteBackupFileNoExcept(destFile)) { - throw new UncheckedDeephavenException( - String.format("Failed to delete backup file at %s", getBackupFile(destFile).getAbsolutePath())); - } - } - - /** - * Delete any old backup files created for this destination with no exception in case of failure - */ - private static boolean deleteBackupFileNoExcept(@NotNull final File destFile) { - final File backupDestFile = getBackupFile(destFile); - if (backupDestFile.exists() && !backupDestFile.delete()) { - log.error().append("Error in deleting backup file at path ") - .append(backupDestFile.getAbsolutePath()) - .endl(); - return false; - } - return true; - } - - /** - * Backup any existing files at location destFile and rename the shadow file to destFile - */ - private static void installShadowFile(@NotNull final File destFile, @NotNull final File shadowDestFile) { - final File backupDestFile = getBackupFile(destFile); - if (destFile.exists() && !destFile.renameTo(backupDestFile)) { - throw new UncheckedDeephavenException( - String.format( - "Failed to install shadow file at %s because a file already exists at the path which couldn't be renamed to %s", - destFile.getAbsolutePath(), backupDestFile.getAbsolutePath())); - } - if (!shadowDestFile.renameTo(destFile)) { - throw new UncheckedDeephavenException(String.format( - "Failed to install shadow file at %s because couldn't rename temporary shadow file from %s to %s", - destFile.getAbsolutePath(), shadowDestFile.getAbsolutePath(), destFile.getAbsolutePath())); - } - } - - /** - * Roll back any changes made in the {@link #installShadowFile} in best-effort manner - */ - private static void rollbackFile(@NotNull final File destFile) { - final File backupDestFile = getBackupFile(destFile); - final File shadowDestFile = getShadowFile(destFile); - destFile.renameTo(shadowDestFile); - backupDestFile.renameTo(destFile); - } - - /** - * Make any missing ancestor directories of {@code destination}. - * - * @param destination The destination parquet file - * @return The first created directory, or null if no directories were made. - */ - private static File prepareDestinationFileLocation(@NotNull File destination) { - destination = destination.getAbsoluteFile(); - if (!destination.getPath().endsWith(PARQUET_FILE_EXTENSION)) { - throw new UncheckedDeephavenException( - String.format("Destination %s does not end in %s extension", destination, PARQUET_FILE_EXTENSION)); - } - if (destination.exists()) { - if (destination.isDirectory()) { - throw new UncheckedDeephavenException( - String.format("Destination %s exists and is a directory", destination)); - } - if (!destination.canWrite()) { - throw new UncheckedDeephavenException( - String.format("Destination %s exists but is not writable", destination)); - } - return null; - } - final File firstParent = destination.getParentFile(); - if (firstParent.isDirectory()) { - if (firstParent.canWrite()) { - return null; - } - throw new UncheckedDeephavenException( - String.format("Destination %s has non writable parent directory", destination)); - } - File firstCreated = firstParent; - File parent; - for (parent = destination.getParentFile(); parent != null && !parent.exists(); parent = - parent.getParentFile()) { - firstCreated = parent; - } - if (parent == null) { - throw new IllegalArgumentException( - String.format("Can't find any existing parent directory for destination path: %s", destination)); - } - if (!parent.isDirectory()) { - throw new IllegalArgumentException( - String.format("Existing parent file %s of %s is not a directory", parent, destination)); - } - if (!firstParent.mkdirs()) { - throw new UncheckedDeephavenException("Couldn't (re)create destination directory " + firstParent); - } - return firstCreated; - } - /** * Helper function for building index column info for writing and deleting any backup index column files * * @param indexColumns Names of index columns, stored as String list for each index * @param parquetColumnNameArr Names of index columns for the parquet file, stored as String[] for each index - * @param destFile The destination path for the main table containing these index columns + * @param dest The destination URI for the main table containing these index columns + * @param channelProvider The channel provider to use for creating channels to the index files */ private static List indexInfoBuilderHelper( @NotNull final Collection> indexColumns, @NotNull final String[][] parquetColumnNameArr, - @NotNull final File destFile) { + @NotNull final URI dest, + @NotNull final SeekableChannelsProvider channelProvider) throws IOException { Require.eq(indexColumns.size(), "indexColumns.size", parquetColumnNameArr.length, "parquetColumnNameArr.length"); final int numIndexes = indexColumns.size(); final List indexInfoList = new ArrayList<>(numIndexes); int gci = 0; + final String destFileName = getFileName(dest); for (final List indexColumnNames : indexColumns) { final String[] parquetColumnNames = parquetColumnNameArr[gci]; - final String indexFileRelativePath = getRelativeIndexFilePath(destFile, parquetColumnNames); - final File indexFile = new File(destFile.getParent(), indexFileRelativePath); - prepareDestinationFileLocation(indexFile); - deleteBackupFile(indexFile); - - final File shadowIndexFile = getShadowFile(indexFile); - + final String indexFileRelativePath = getRelativeIndexFilePath(destFileName, parquetColumnNames); + final URI indexFileURI = resolve(dest, indexFileRelativePath); + final CompletableOutputStream indexFileOutputStream = + channelProvider.getOutputStream(indexFileURI, PARQUET_OUTPUT_BUFFER_SIZE); final ParquetTableWriter.IndexWritingInfo info = new ParquetTableWriter.IndexWritingInfo( indexColumnNames, parquetColumnNames, - indexFile, - shadowIndexFile); + indexFileURI, + indexFileOutputStream); indexInfoList.add(info); gci++; } @@ -392,7 +295,7 @@ private static List indexInfoBuilderHelper( * while writing, use {@link ParquetInstructions.Builder#addIndexColumns}. * * @param sourceTable The table to partition and write - * @param destinationDir The path to destination root directory to store partitioned data in nested format. + * @param destinationDir The path or URI to destination root directory to store partitioned data in nested format. * Non-existing directories are created. * @param writeInstructions Write instructions for customizations while writing */ @@ -427,7 +330,7 @@ public static void writeKeyValuePartitionedTable( * {@link ParquetInstructions.Builder#addIndexColumns}. * * @param partitionedTable The partitioned table to write - * @param destinationDir The path to destination root directory to store partitioned data in nested format. + * @param destinationDir The path or URI to destination root directory to store partitioned data in nested format. * Non-existing directories are created. * @param writeInstructions Write instructions for customizations while writing */ @@ -458,7 +361,7 @@ public static void writeKeyValuePartitionedTable( * @param partitionedTable The partitioned table to write * @param keyTableDefinition The definition for key columns * @param leafDefinition The definition for leaf parquet files to be written - * @param destinationRoot The path to destination root directory to store partitioned data in nested format + * @param destinationRoot The path or URI to destination root directory to store partitioned data in nested format * @param writeInstructions Write instructions for customizations while writing * @param indexColumns Collection containing the column names for indexes to persist. The write operation will store * the index info as sidecar tables. This argument is used to narrow the set of indexes to write, or to be @@ -512,14 +415,16 @@ private static void writeKeyValuePartitionedTableImpl( }); // For the constituent column for each row, accumulate the constituent tables and build the final file paths final Collection partitionedData = new ArrayList<>(); - final Collection destinations = new ArrayList<>(); + final Collection destinations = new ArrayList<>(); try (final CloseableIterator> constituentIterator = withGroupConstituents.objectColumnIterator(partitionedTable.constituentColumnName())) { int row = 0; + final URI destinationDir = convertToURI(destinationRoot, true); while (constituentIterator.hasNext()) { final ObjectVector constituentVector = constituentIterator.next(); final List partitionStrings = partitionStringsList.get(row); - final File relativePath = new File(destinationRoot, String.join(File.separator, partitionStrings)); + final String relativePath = concatenatePartitions(partitionStrings); + final URI partitionDir = resolve(destinationDir, relativePath); int count = 0; for (final Table constituent : constituentVector) { String filename = baseName; @@ -533,7 +438,7 @@ private static void writeKeyValuePartitionedTableImpl( filename = filename.replace(UUID_TOKEN, UUID.randomUUID().toString()); } filename += PARQUET_FILE_EXTENSION; - destinations.add(new File(relativePath, filename)); + destinations.add(resolve(partitionDir, filename)); partitionedData.add(constituent); count++; } @@ -557,14 +462,22 @@ private static void writeKeyValuePartitionedTableImpl( // Store hard reference to prevent indexes from being garbage collected final List dataIndexes = addIndexesToTables(partitionedDataArray, indexColumns); writeTablesImpl(partitionedDataArray, leafDefinition, writeInstructions, - destinations.toArray(File[]::new), indexColumns, partitioningColumnsSchema, - new File(destinationRoot), computedCache); + destinations.toArray(URI[]::new), indexColumns, partitioningColumnsSchema, + convertToURI(destinationRoot, true), computedCache); if (dataIndexes != null) { dataIndexes.clear(); } } } + private static String concatenatePartitions(final List partitions) { + final StringBuilder builder = new StringBuilder(); + for (final String partition : partitions) { + builder.append(partition).append(File.separator); + } + return builder.toString(); + } + /** * Add data indexes to provided tables, if not present, and return a list of hard references to the indexes. */ @@ -646,10 +559,10 @@ private static void writeTablesImpl( @NotNull final Table[] sources, @NotNull final TableDefinition definition, @NotNull final ParquetInstructions writeInstructions, - @NotNull final File[] destinations, + @NotNull final URI[] destinations, @NotNull final Collection> indexColumns, @Nullable final MessageType partitioningColumnsSchema, - @Nullable final File metadataRootDir, + @Nullable final URI metadataRootDir, @NotNull final Map> computedCache) { Require.eq(sources.length, "sources.length", destinations.length, "destinations.length"); if (writeInstructions.getFileLayout().isPresent()) { @@ -659,14 +572,9 @@ private static void writeTablesImpl( if (definition.numColumns() == 0) { throw new TableDataException("Cannot write a parquet table with zero columns"); } - Arrays.stream(destinations).forEach(ParquetTools::deleteBackupFile); - - // Write all files at temporary shadow file paths in the same directory to prevent overwriting any existing - // data in case of failure - final File[] shadowDestFiles = - Arrays.stream(destinations).map(ParquetTools::getShadowFile).toArray(File[]::new); - final File[] firstCreatedDirs = - Arrays.stream(shadowDestFiles).map(ParquetTools::prepareDestinationFileLocation).toArray(File[]::new); + // Assuming all destination URIs have the same scheme, and will use the same channels provider instance + final SeekableChannelsProvider channelsProvider = SeekableChannelsProviderLoader.getInstance() + .fromServiceLoader(destinations[0], writeInstructions.getSpecialInstructions()); final ParquetMetadataFileWriter metadataFileWriter; if (writeInstructions.generateMetadataFiles()) { @@ -679,104 +587,74 @@ private static void writeTablesImpl( metadataFileWriter = NullParquetMetadataFileWriter.INSTANCE; } - // List of shadow files, to clean up in case of exceptions - final List shadowFiles = new ArrayList<>(); - // List of all destination files (including index files), to roll back in case of exceptions - final List destFiles = new ArrayList<>(); - try { - final List> indexInfoLists; - if (indexColumns.isEmpty()) { - // Write the tables without any index info - indexInfoLists = null; - for (int tableIdx = 0; tableIdx < sources.length; tableIdx++) { - shadowFiles.add(shadowDestFiles[tableIdx]); - final Table source = sources[tableIdx]; - ParquetTableWriter.write(source, definition, writeInstructions, shadowDestFiles[tableIdx].getPath(), - destinations[tableIdx].getPath(), Collections.emptyMap(), - (List) null, metadataFileWriter, - computedCache); + // List of output streams created, to rollback in case of exceptions + final List outputStreams = new ArrayList<>(destinations.length); + try (final SafeCloseable ignored = () -> SafeCloseable.closeAll(outputStreams.stream())) { + try { + if (indexColumns.isEmpty()) { + // Write the tables without any index info + for (int tableIdx = 0; tableIdx < sources.length; tableIdx++) { + final Table source = sources[tableIdx]; + final CompletableOutputStream outputStream = channelsProvider.getOutputStream( + destinations[tableIdx], PARQUET_OUTPUT_BUFFER_SIZE); + outputStreams.add(outputStream); + ParquetTableWriter.write(source, definition, writeInstructions, destinations[tableIdx], + outputStream, Collections.emptyMap(), (List) null, + metadataFileWriter, computedCache); + } + } else { + // Shared parquet column names across all tables + final String[][] parquetColumnNameArr = indexColumns.stream() + .map((Collection columns) -> columns.stream() + .map(writeInstructions::getParquetColumnNameFromColumnNameOrDefault) + .toArray(String[]::new)) + .toArray(String[][]::new); + + for (int tableIdx = 0; tableIdx < sources.length; tableIdx++) { + final URI tableDestination = destinations[tableIdx]; + final List indexInfoList = + indexInfoBuilderHelper(indexColumns, parquetColumnNameArr, tableDestination, + channelsProvider); + final CompletableOutputStream outputStream = channelsProvider.getOutputStream( + destinations[tableIdx], PARQUET_OUTPUT_BUFFER_SIZE); + outputStreams.add(outputStream); + for (final ParquetTableWriter.IndexWritingInfo info : indexInfoList) { + outputStreams.add(info.destOutputStream); + } + final Table sourceTable = sources[tableIdx]; + ParquetTableWriter.write(sourceTable, definition, writeInstructions, destinations[tableIdx], + outputStream, Collections.emptyMap(), indexInfoList, metadataFileWriter, computedCache); + } } - } else { - // Create index info for each table and write the table and index files to shadow path - indexInfoLists = new ArrayList<>(sources.length); - - // Shared parquet column names across all tables - final String[][] parquetColumnNameArr = indexColumns.stream() - .map((Collection columns) -> columns.stream() - .map(writeInstructions::getParquetColumnNameFromColumnNameOrDefault) - .toArray(String[]::new)) - .toArray(String[][]::new); - for (int tableIdx = 0; tableIdx < sources.length; tableIdx++) { - final File tableDestination = destinations[tableIdx]; - final List indexInfoList = - indexInfoBuilderHelper(indexColumns, parquetColumnNameArr, tableDestination); - indexInfoLists.add(indexInfoList); - - shadowFiles.add(shadowDestFiles[tableIdx]); - indexInfoList.forEach(item -> shadowFiles.add(item.destFile)); - - final Table sourceTable = sources[tableIdx]; - ParquetTableWriter.write(sourceTable, definition, writeInstructions, - shadowDestFiles[tableIdx].getPath(), tableDestination.getPath(), Collections.emptyMap(), - indexInfoList, metadataFileWriter, computedCache); + if (writeInstructions.generateMetadataFiles()) { + final URI metadataDest = metadataRootDir.resolve(METADATA_FILE_NAME); + final CompletableOutputStream metadataOutputStream = channelsProvider.getOutputStream( + metadataDest, PARQUET_OUTPUT_BUFFER_SIZE); + outputStreams.add(metadataOutputStream); + final URI commonMetadataDest = metadataRootDir.resolve(COMMON_METADATA_FILE_NAME); + final CompletableOutputStream commonMetadataOutputStream = channelsProvider.getOutputStream( + commonMetadataDest, PARQUET_OUTPUT_BUFFER_SIZE); + outputStreams.add(commonMetadataOutputStream); + metadataFileWriter.writeMetadataFiles(metadataOutputStream, commonMetadataOutputStream); } - } - // Write the combined metadata files to shadow destinations - final File metadataDestFile, shadowMetadataFile, commonMetadataDestFile, shadowCommonMetadataFile; - if (writeInstructions.generateMetadataFiles()) { - metadataDestFile = new File(metadataRootDir, METADATA_FILE_NAME); - shadowMetadataFile = ParquetTools.getShadowFile(metadataDestFile); - shadowFiles.add(shadowMetadataFile); - commonMetadataDestFile = new File(metadataRootDir, COMMON_METADATA_FILE_NAME); - shadowCommonMetadataFile = ParquetTools.getShadowFile(commonMetadataDestFile); - shadowFiles.add(shadowCommonMetadataFile); - metadataFileWriter.writeMetadataFiles(shadowMetadataFile.getAbsolutePath(), - shadowCommonMetadataFile.getAbsolutePath()); - } else { - metadataDestFile = shadowMetadataFile = commonMetadataDestFile = shadowCommonMetadataFile = null; - } - - // Write to shadow files was successful, now replace the original files with the shadow files - for (int tableIdx = 0; tableIdx < sources.length; tableIdx++) { - destFiles.add(destinations[tableIdx]); - installShadowFile(destinations[tableIdx], shadowDestFiles[tableIdx]); - if (indexInfoLists != null) { - final List indexInfoList = indexInfoLists.get(tableIdx); - for (final ParquetTableWriter.IndexWritingInfo info : indexInfoList) { - final File indexDestFile = info.destFileForMetadata; - final File shadowIndexFile = info.destFile; - destFiles.add(indexDestFile); - installShadowFile(indexDestFile, shadowIndexFile); - } + // Commit all the writes to underlying file system, to detect any exceptions early before closing + for (final CompletableOutputStream outputStream : outputStreams) { + outputStream.complete(); } - } - if (writeInstructions.generateMetadataFiles()) { - destFiles.add(metadataDestFile); - installShadowFile(metadataDestFile, shadowMetadataFile); - destFiles.add(commonMetadataDestFile); - installShadowFile(commonMetadataDestFile, shadowCommonMetadataFile); - } - } catch (Exception e) { - for (final File file : destFiles) { - rollbackFile(file); - } - for (final File file : shadowFiles) { - file.delete(); - } - for (final File firstCreatedDir : firstCreatedDirs) { - if (firstCreatedDir == null) { - continue; + } catch (final Exception e) { + // Try to rollback all the output streams in reverse order to undo any writes + for (int idx = outputStreams.size() - 1; idx >= 0; idx--) { + try { + outputStreams.get(idx).rollback(); + } catch (IOException e1) { + log.error().append("Error in rolling back output stream ").append(e1).endl(); + } } - log.error().append( - "Error in table writing, cleaning up potentially incomplete table destination path starting from ") - .append(firstCreatedDir.getAbsolutePath()).append(e).endl(); - FileUtils.deleteRecursivelyOnNFS(firstCreatedDir); + throw new UncheckedDeephavenException("Error writing parquet tables", e); } - throw new UncheckedDeephavenException("Error writing parquet tables", e); } - destFiles.forEach(ParquetTools::deleteBackupFileNoExcept); } /** @@ -873,28 +751,35 @@ public static void writeTables( } definition = firstDefinition; } - final File[] destinationFiles = new File[destinations.length]; + final URI[] destinationUris = new URI[destinations.length]; + String firstScheme = null; for (int idx = 0; idx < destinations.length; idx++) { - final URI destinationURI = convertToURI(destinations[idx], false); - if (!FILE_URI_SCHEME.equals(destinationURI.getScheme())) { + if (!destinations[idx].endsWith(PARQUET_FILE_EXTENSION)) { throw new IllegalArgumentException( - "Only file URI scheme is supported for writing parquet files, found" + - "non-file URI: " + destinations[idx]); + String.format("Destination %s does not end in %s extension", destinations[idx], + PARQUET_FILE_EXTENSION)); + } + destinationUris[idx] = convertToURI(destinations[idx], false); + if (idx == 0) { + firstScheme = destinationUris[0].getScheme(); + } else if (!firstScheme.equals(destinationUris[idx].getScheme())) { + throw new IllegalArgumentException("All destination URIs must have the same scheme, expected " + + firstScheme + " found " + destinationUris[idx].getScheme()); } - destinationFiles[idx] = new File(destinationURI); } - final File metadataRootDir; + final URI metadataRootDir; if (writeInstructions.generateMetadataFiles()) { // We insist on writing the metadata file in the same directory as the destination files, thus all // destination files should be in the same directory. - final String firstDestinationDir = destinationFiles[0].getAbsoluteFile().getParentFile().getAbsolutePath(); + final URI firstDestinationDir = destinationUris[0].resolve("."); for (int i = 1; i < destinations.length; i++) { - if (!firstDestinationDir.equals(destinationFiles[i].getParentFile().getAbsolutePath())) { + final URI destinationDir = destinationUris[i].resolve("."); + if (!firstDestinationDir.equals(destinationDir)) { throw new IllegalArgumentException("All destination files must be in the same directory for " + - " generating metadata files"); + " generating metadata files, found " + firstDestinationDir + " and " + destinationDir); } } - metadataRootDir = new File(firstDestinationDir); + metadataRootDir = firstDestinationDir; } else { metadataRootDir = null; } @@ -904,7 +789,7 @@ public static void writeTables( buildComputedCache(() -> PartitionedTableFactory.ofTables(definition, sources).merge(), definition); // We do not have any additional schema for partitioning columns in this case. Schema for all columns will be // generated at the time of writing the parquet files and merged to generate the metadata files. - writeTablesImpl(sources, definition, writeInstructions, destinationFiles, indexColumns, null, metadataRootDir, + writeTablesImpl(sources, definition, writeInstructions, destinationUris, indexColumns, null, metadataRootDir, computedCache); } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 416013bf376..5b582b3c678 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -432,7 +432,7 @@ public void testSortingMetadata() { StandaloneTableKey.getInstance(), new ParquetTableLocationKey( convertToURI(new File(rootFile, - ParquetTools.getRelativeIndexFilePath(dest, "someString")), false), + ParquetTools.getRelativeIndexFilePath(dest.getName(), "someString")), false), 0, Map.of(), EMPTY), EMPTY); assertEquals(index1Location.getSortedColumns(), List.of(SortColumn.asc(ColumnName.of("someString")))); @@ -443,7 +443,7 @@ public void testSortingMetadata() { StandaloneTableKey.getInstance(), new ParquetTableLocationKey( convertToURI(new File(rootFile, - ParquetTools.getRelativeIndexFilePath(dest, "someInt", "someString")), false), + ParquetTools.getRelativeIndexFilePath(dest.getName(), "someInt", "someString")), false), 0, Map.of(), EMPTY), EMPTY); assertEquals(index2Location.getSortedColumns(), List.of( @@ -453,7 +453,7 @@ public void testSortingMetadata() { assertTableEquals(index2Table, index2Table.sort("someInt", "someString")); } - private static void verifyIndexingInfoExists(final Table table, final String... columnNames) { + static void verifyIndexingInfoExists(final Table table, final String... columnNames) { assertTrue(DataIndexer.hasDataIndex(table, columnNames)); final DataIndex fullIndex = DataIndexer.getDataIndex(table, columnNames); Assert.neqNull(fullIndex, "fullIndex"); @@ -592,6 +592,19 @@ private static void writeReadTableTest(final Table table, final File dest, checkSingleTable(table, dest); } + @Test + public void basicParquetWrongDestinationTest() { + final Table table = TableTools.emptyTable(5).update("A=(int)i"); + final File dest = new File(rootFile, "basicParquetWrongDestinationTest.parquet"); + writeTable(table, dest.getPath()); + final File wrongDest = new File(rootFile, "basicParquetWrongDestinationTest"); + try { + writeTable(table, wrongDest.getPath()); + fail("Expected an exception because destination does not end with .parquet"); + } catch (final IllegalArgumentException expected) { + } + } + @Test public void basicParquetWithMetadataTest() { final Table table = TableTools.emptyTable(5).update("A=(int)i", "B=(long)i", "C=(double)i"); @@ -1114,6 +1127,12 @@ public void writeKeyValuePartitionedDataWithMixedPartitionsTest() { @Test public void someMoreKeyValuePartitionedTestsWithComplexKeys() { + // Verify complex keys both with and without data index + someMoreKeyValuePartitionedTestsWithComplexKeysHelper(true); + someMoreKeyValuePartitionedTestsWithComplexKeysHelper(false); + } + + private void someMoreKeyValuePartitionedTestsWithComplexKeysHelper(final boolean addDataIndex) { final TableDefinition definition = TableDefinition.of( ColumnDefinition.ofString("symbol").withPartitioning(), ColumnDefinition.ofString("epic_collection_id"), @@ -1126,16 +1145,32 @@ public void someMoreKeyValuePartitionedTestsWithComplexKeys() { "I = ii")) .withDefinitionUnsafe(definition); - final File parentDir = new File(rootFile, "someTest"); - final ParquetInstructions writeInstructions = ParquetInstructions.builder() - .setGenerateMetadataFiles(true) - .build(); + final File parentDir = new File(rootFile, "someMoreKeyValuePartitionedTestsWithComplexKeys"); + if (parentDir.exists()) { + FileUtils.deleteRecursively(parentDir); + } + final ParquetInstructions writeInstructions; + if (addDataIndex) { + writeInstructions = ParquetInstructions.builder() + .setGenerateMetadataFiles(true) + .addIndexColumns("I", "epic_request_id") + .build(); + } else { + writeInstructions = ParquetInstructions.builder() + .setGenerateMetadataFiles(true) + .build(); + } final String[] partitioningCols = new String[] {"symbol", "epic_collection_id", "epic_request_id"}; final PartitionedTable partitionedTable = inputData.partitionBy(partitioningCols); writeKeyValuePartitionedTable(partitionedTable, parentDir.getPath(), writeInstructions); final Table fromDisk = readTable(parentDir.getPath(), EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)); + if (addDataIndex) { + // Verify if index present on columns "I, epic_request_id" + verifyIndexingInfoExists(fromDisk, "I", "epic_request_id"); + } + for (final String col : partitioningCols) { assertTrue(fromDisk.getDefinition().getColumn(col).isPartitioning()); } @@ -2024,16 +2059,6 @@ public void readFromDirTest() { assertTableEquals(expected, fromDisk); } - /** - * These are tests for writing a table to a parquet file and making sure there are no unnecessary files left in the - * directory after we finish writing. - */ - @Test - public void basicWriteTests() { - basicWriteTestsImpl(SINGLE_WRITER); - basicWriteTestsImpl(MULTI_WRITER); - } - @Test public void readPartitionedDataGeneratedOnWindows() { final String path = ParquetTableReadWriteTest.class @@ -2046,6 +2071,16 @@ public void readPartitionedDataGeneratedOnWindows() { assertTableEquals(expected, partitionedDataFromWindows.sort("year")); } + /** + * These are tests for writing a table to a parquet file and making sure there are no unnecessary files left in the + * directory after we finish writing. + */ + @Test + public void basicWriteTests() { + basicWriteTestsImpl(SINGLE_WRITER); + basicWriteTestsImpl(MULTI_WRITER); + } + private static void basicWriteTestsImpl(TestParquetTableWriter writer) { // Create an empty parent directory final File parentDir = new File(rootFile, "tempDir"); @@ -2064,6 +2099,7 @@ private static void basicWriteTestsImpl(TestParquetTableWriter writer) { // This write should fail final Table badTable = TableTools.emptyTable(5) .updateView("InputString = ii % 2 == 0 ? Long.toString(ii) : null", "A=InputString.charAt(0)"); + DataIndexer.getOrCreateDataIndex(badTable, "InputString"); try { writer.writeTable(badTable, destFile); TestCase.fail("Exception expected for invalid formula"); @@ -2168,9 +2204,10 @@ public void writeMultiTableExceptionTest() { final File parentDir = new File(rootFile, "tempDir"); parentDir.mkdir(); - // Write two tables to parquet file and read them back + // Write two tables to parquet file final Table firstTable = TableTools.emptyTable(5) .updateView("InputString = Long.toString(ii)", "A=InputString.charAt(0)"); + DataIndexer.getOrCreateDataIndex(firstTable, "InputString"); final File firstDestFile = new File(parentDir, "firstTable.parquet"); final Table secondTable = TableTools.emptyTable(5) @@ -2180,7 +2217,7 @@ public void writeMultiTableExceptionTest() { final Table[] tablesToSave = new Table[] {firstTable, secondTable}; final String[] destinations = new String[] {firstDestFile.getPath(), secondDestFile.getPath()}; - // This write should fail + // This write should fail because of the null value in the second table try { writeTables(tablesToSave, destinations, ParquetInstructions.EMPTY.withTableDefinition(firstTable.getDefinition())); @@ -2584,6 +2621,10 @@ public void indexOverwritingTests() { indexOverwritingTestsImpl(MULTI_WRITER); } + private static File getBackupFile(final File destFile) { + return new File(destFile.getParent(), ".OLD_" + destFile.getName()); + } + private void indexOverwritingTestsImpl(TestParquetTableWriter writer) { // Create an empty parent directory final File parentDir = new File(rootFile, "tempDir"); @@ -2629,7 +2670,7 @@ private void indexOverwritingTestsImpl(TestParquetTableWriter writer) { // The directory should still contain the updated table, its index file for column xxx, and old index file // for column vvv final File xxxIndexFile = new File(parentDir, xxxIndexFilePath); - final File backupXXXIndexFile = ParquetTools.getBackupFile(xxxIndexFile); + final File backupXXXIndexFile = getBackupFile(xxxIndexFile); final String backupXXXIndexFileName = backupXXXIndexFile.getName(); verifyFilesInDir(parentDir, new String[] {destFilename}, Map.of("vvv", new String[] {vvvIndexFilePath}, @@ -2988,10 +3029,6 @@ public void singleTable() { final TableDefinition fooBarDefinition; final TableDefinition barDefinition; { - fooSource.mkdirs(); - fooBarSource.mkdirs(); - barSource.mkdirs(); - final ColumnHolder fooCol = intCol("Foo", 1, 2, 3); final ColumnHolder barCol = stringCol("Bar", "Zip", "Zap", "Zoom"); @@ -3103,8 +3140,6 @@ public void flatPartitionedTable() { final File p1FileEmpty = new File(emptySource, "01.parquet"); final File p2FileEmpty = new File(emptySource, "02.parquet"); - p1File.mkdirs(); - p2File.mkdirs(); emptySource.mkdirs(); final ColumnHolder foo1 = intCol("Foo", 1, 2, 3); @@ -3118,8 +3153,6 @@ public void flatPartitionedTable() { writeTable(p1, p1File.getPath()); writeTable(p2, p2File.getPath()); writeIntoEmptySource = () -> { - p1FileEmpty.mkdirs(); - p2FileEmpty.mkdirs(); writeTable(p1, p1FileEmpty.getPath()); writeTable(p2, p2FileEmpty.getPath()); }; @@ -3222,8 +3255,6 @@ public void keyValuePartitionedTable() { final File p1FileEmpty = new File(emptySource, "Partition=1/z.parquet"); final File p2FileEmpty = new File(emptySource, "Partition=2/a.parquet"); - p1File.mkdirs(); - p2File.mkdirs(); emptySource.mkdirs(); final ColumnHolder part1 = intCol("Partition", 1, 1, 1); @@ -3240,8 +3271,6 @@ public void keyValuePartitionedTable() { writeTable(p1, p1File.getPath()); writeTable(p2, p2File.getPath()); writeIntoEmptySource = () -> { - p1FileEmpty.mkdirs(); - p2FileEmpty.mkdirs(); writeTable(p1, p1FileEmpty.getPath()); writeTable(p2, p2FileEmpty.getPath()); }; diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java index 17e99079e1c..b1ce204a5ea 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java @@ -3,16 +3,20 @@ // package io.deephaven.parquet.table; +import io.deephaven.UncheckedDeephavenException; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.table.impl.indexer.DataIndexer; import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.engine.table.impl.select.FormulaEvaluationException; import io.deephaven.engine.testutil.junit4.EngineCleanup; import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.s3.S3Instructions; import io.deephaven.extensions.s3.testlib.S3SeekableChannelTestSetup; import io.deephaven.test.types.OutOfBandTest; +import junit.framework.TestCase; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -30,7 +34,10 @@ import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; import static io.deephaven.engine.util.TableTools.merge; +import static io.deephaven.parquet.table.ParquetTableReadWriteTest.verifyIndexingInfoExists; import static io.deephaven.parquet.table.ParquetTools.writeKeyValuePartitionedTable; +import static io.deephaven.parquet.table.ParquetTools.writeTable; +import static io.deephaven.parquet.table.ParquetTools.writeTables; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -66,21 +73,135 @@ private static Table getTable(final int numRows) { public final void readSingleParquetFile() throws IOException, ExecutionException, InterruptedException, TimeoutException { final Table table = getTable(500_000); + final URI uri = uri("table.parquet"); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .build(); + + // Write the table to S3 using the test async client final File dest = new File(folder.newFolder(), "table.parquet"); ParquetTools.writeTable(table, dest.getAbsolutePath()); putObject("table.parquet", AsyncRequestBody.fromFile(dest)); + final Table fromS3 = ParquetTools.readTable(uri.toString(), instructions); + assertTableEquals(table, fromS3); + } + + @Test + public final void readWriteSingleParquetFile() { + readWriteSingleParquetFileHelper(0); // Empty table + readWriteSingleParquetFileHelper(5_000); + readWriteSingleParquetFileHelper(50_000); + readWriteSingleParquetFileHelper(500_000); + } + private void readWriteSingleParquetFileHelper(final int numRows) { + final Table table = getTable(numRows); final URI uri = uri("table.parquet"); - final ParquetInstructions readInstructions = ParquetInstructions.builder() + final ParquetInstructions instructions = ParquetInstructions.builder() .setSpecialInstructions(s3Instructions( S3Instructions.builder() + .writePartSize(5 << 20) + .numConcurrentWriteParts(5) .readTimeout(Duration.ofSeconds(10))) .build()) .build(); - final Table fromS3 = ParquetTools.readTable(uri.toString(), readInstructions); + + // Write the table to S3 using ParquetTools write API + ParquetTools.writeTable(table, uri.toString(), instructions); + final Table fromS3 = ParquetTools.readTable(uri.toString(), instructions); assertTableEquals(table, fromS3); } + @Test + public final void mixURIWritingTest() { + final Table table1, table2; + table1 = table2 = getTable(5000); + final String uri1 = uri("table1.parquet").toString(); + final String uri2 = new File(folder.getRoot(), "table2.parquet").toURI().toString(); + try { + ParquetTools.writeTables(new Table[] {table1, table2}, new String[] {uri1, uri2}, + ParquetInstructions.EMPTY); + fail("Expected exception because writing to file and to S3 are not allowed in the same call"); + } catch (final IllegalArgumentException e) { + assertTrue(e.getMessage().contains("URIs must have the same scheme")); + } + } + + @Test + public final void writeSingleTableExceptionTest() { + final Table tableToSave = TableTools.emptyTable(5).update("A=(int)i", "B=(long)i", "C=(double)i"); + final URI fileUri = uri("destDir/table.parquet"); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .build(); + ParquetTools.writeTable(tableToSave, fileUri.toString(), instructions); + + final URI parentDir = uri("destDir"); + Table fromS3 = ParquetTools.readTable(parentDir.toString(), instructions); + assertTableEquals(tableToSave, fromS3); + + // Try to write a bad table at the same destination. This write should fail midway and the original file should + // be preserved. + final Table badTable = TableTools.emptyTable(5) + .updateView("InputString = ii % 2 == 0 ? Long.toString(ii) : null", "A=InputString.charAt(0)"); + try { + ParquetTools.writeTable(badTable, fileUri.toString(), instructions); + TestCase.fail("Exception expected for invalid formula"); + } catch (UncheckedDeephavenException e) { + assertTrue(e.getCause() instanceof FormulaEvaluationException); + } + + // Make sure that original file is preserved + fromS3 = ParquetTools.readTable(parentDir.toString(), instructions); + assertTableEquals(tableToSave, fromS3); + } + + /** + * These are tests for writing multiple parquet tables such that there is an exception in the second write. + */ + @Test + public void writeMultiTableExceptionTest() { + // Write some initial data to S3 + final Table initialData = TableTools.emptyTable(5).update("A=(int)i", "B=(long)i", "C=(double)i"); + final URI initialDataUri = uri("destDir/initialDate.parquet"); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .build(); + ParquetTools.writeTable(initialData, initialDataUri.toString(), instructions); + + // Write two tables to parquet file and read them back + final Table firstTable = TableTools.emptyTable(5) + .updateView("InputString = Long.toString(ii)", "A=InputString.charAt(0)"); + final URI firstFileUri = uri("destDir/table1.parquet"); + final Table badTable = TableTools.emptyTable(5) + .updateView("InputString = ii % 2 == 0 ? Long.toString(ii*5) : null", "A=InputString.charAt(0)"); + final URI secondFileUri = uri("destDir/table2.parquet"); + + // This write should fail for the second table + try { + writeTables(new Table[] {firstTable, badTable}, + new String[] {firstFileUri.toString(), secondFileUri.toString()}, instructions); + TestCase.fail("Exception expected for invalid formula"); + } catch (UncheckedDeephavenException e) { + assertTrue(e.getCause() instanceof FormulaEvaluationException); + } + + // All new files should be deleted even though first table would be written successfully. The directory should + // just have initial data. + final URI parentDir = uri("destDir"); + final Table fromS3 = ParquetTools.readTable(parentDir.toString(), instructions); + assertTableEquals(initialData, fromS3); + } + @Test public final void readFlatPartitionedParquetData() throws ExecutionException, InterruptedException, TimeoutException, IOException { @@ -194,6 +315,33 @@ public void readKeyValuePartitionedParquetData() } } + @Test + public void readWriteKeyValuePartitionedParquetData() { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("PC1").withPartitioning(), + ColumnDefinition.ofInt("PC2").withPartitioning(), + ColumnDefinition.ofInt("someIntColumn"), + ColumnDefinition.ofString("someStringColumn")); + final Table table = ((QueryTable) TableTools.emptyTable(500_000) + .updateView("PC1 = (int)(ii%3)", + "PC2 = (int)(ii%2)", + "someIntColumn = (int) i", + "someStringColumn = String.valueOf(i)")) + .withDefinitionUnsafe(definition); + final URI uri = uri("keyValuePartitionedDataDir"); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .setTableDefinition(definition) + .setBaseNameForPartitionedParquetData("data") + .build(); + writeKeyValuePartitionedTable(table, uri.toString(), instructions); + final Table fromS3 = ParquetTools.readTable(uri.toString(), instructions); + assertTableEquals(table.sort("PC1", "PC2"), fromS3.sort("PC1", "PC2")); + } + @Test public void readMetadataPartitionedParquetData() throws ExecutionException, InterruptedException, TimeoutException, IOException { @@ -292,4 +440,62 @@ public void readMetadataPartitionedParquetWithMissingMetadataFile() assertTrue(expected.getMessage().contains("metadata")); } } + + @Test + public void readWriteMetadataPartitionedParquetData() { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("PC1").withPartitioning(), + ColumnDefinition.ofInt("PC2").withPartitioning(), + ColumnDefinition.ofInt("someIntColumn"), + ColumnDefinition.ofString("someStringColumn")); + final Table table = ((QueryTable) TableTools.emptyTable(500_000) + .updateView("PC1 = (int)(ii%3)", + "PC2 = (int)(ii%2)", + "someIntColumn = (int) i", + "someStringColumn = String.valueOf(i)")) + .withDefinitionUnsafe(definition); + final URI uri = uri("keyValuePartitionedDataDir"); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .setTableDefinition(definition) + .setBaseNameForPartitionedParquetData("data") + .setGenerateMetadataFiles(true) + .build(); + writeKeyValuePartitionedTable(table, uri.toString(), instructions); + final Table fromS3 = ParquetTools.readTable(uri.toString(), instructions.withTableDefinitionAndLayout(null, + ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)); + assertTableEquals(table.sort("PC1", "PC2"), fromS3.sort("PC1", "PC2")); + } + + @Test + public void indexByLongKey() { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("someInt"), + ColumnDefinition.ofLong("someLong")); + final Table testTable = + ((QueryTable) TableTools.emptyTable(10).select("someInt = i", "someLong = ii % 3") + .groupBy("someLong").ungroup("someInt")).withDefinitionUnsafe(definition); + DataIndexer.getOrCreateDataIndex(testTable, "someLong"); + DataIndexer.getOrCreateDataIndex(testTable, "someInt", "someLong"); + + final URI uri = uri("table.parquet"); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .build(); + + writeTable(testTable, uri.toString(), instructions); + final Table fromS3 = ParquetTools.readTable(uri.toString(), instructions); + assertTableEquals(testTable, fromS3); + + // Validate the indexes and lookup functions. + verifyIndexingInfoExists(fromS3, "someLong"); + verifyIndexingInfoExists(fromS3, "someInt", "someLong"); + verifyIndexingInfoExists(fromS3, "someLong", "someInt"); + } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java index de89778aefa..d096669b192 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java @@ -259,7 +259,7 @@ public void testWriteTableExceptions() throws IOException { try { ParquetTools.writeTable(table1, testRoot + File.separator + "unexpectedFile" + File.separator + "Table1"); TestCase.fail("Expected exception"); - } catch (UncheckedDeephavenException e) { + } catch (IllegalArgumentException e) { // Expected } @@ -268,7 +268,7 @@ public void testWriteTableExceptions() throws IOException { try { ParquetTools.writeTable(table1, testRoot + File.separator + "Table1"); TestCase.fail("Expected exception"); - } catch (UncheckedDeephavenException e) { + } catch (IllegalArgumentException e) { // Expected } new File(testRoot + File.separator + "Nested").mkdirs(); diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java index 69150aafa00..f8c0ae3f5b4 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java @@ -9,8 +9,10 @@ import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; import software.amazon.awssdk.core.retry.RetryMode; +import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; +import software.amazon.awssdk.http.crt.AwsCrtHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; @@ -33,7 +35,7 @@ class S3AsyncClientFactory { getOrComputeThreadCountProperty("S3.numScheduledExecutorThreads", 5); private static final Logger log = LoggerFactory.getLogger(S3AsyncClientFactory.class); - private static final Map httpClientCache = new ConcurrentHashMap<>(); + private static final Map httpAsyncClientCache = new ConcurrentHashMap<>(); private static volatile Executor futureCompletionExecutor; private static volatile ScheduledExecutorService scheduledExecutor; @@ -43,7 +45,7 @@ static S3AsyncClient getAsyncClient(@NotNull final S3Instructions instructions) .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, ensureAsyncFutureCompletionExecutor())) - .httpClient(getOrBuildHttpClient(instructions)) + .httpClient(getOrBuildHttpAsyncClient(instructions)) .overrideConfiguration(ClientOverrideConfiguration.builder() // If we find that the STANDARD retry policy does not work well in all situations, we might // try experimenting with ADAPTIVE retry policy, potentially with fast fail. @@ -58,11 +60,11 @@ static S3AsyncClient getAsyncClient(@NotNull final S3Instructions instructions) .credentialsProvider(instructions.awsV2CredentialsProvider()); instructions.regionName().map(Region::of).ifPresent(builder::region); instructions.endpointOverride().ifPresent(builder::endpointOverride); - final S3AsyncClient ret = builder.build(); + final S3AsyncClient s3AsyncClient = builder.build(); if (log.isDebugEnabled()) { log.debug().append("Building S3AsyncClient with instructions: ").append(instructions).endl(); } - return ret; + return s3AsyncClient; } private static class HttpClientConfig { @@ -103,10 +105,10 @@ public boolean equals(final Object other) { } } - private static SdkAsyncHttpClient getOrBuildHttpClient(@NotNull final S3Instructions instructions) { + private static SdkAsyncHttpClient getOrBuildHttpAsyncClient(@NotNull final S3Instructions instructions) { final HttpClientConfig config = new HttpClientConfig(instructions.maxConcurrentRequests(), instructions.connectionTimeout()); - return httpClientCache.computeIfAbsent(config, key -> AwsCrtAsyncHttpClient.builder() + return httpAsyncClientCache.computeIfAbsent(config, key -> AwsCrtAsyncHttpClient.builder() .maxConcurrency(config.maxConcurrentRequests()) .connectionTimeout(config.connectionTimeout()) .build()); diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3CompletableOutputStream.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3CompletableOutputStream.java new file mode 100644 index 00000000000..43004a6ba70 --- /dev/null +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3CompletableOutputStream.java @@ -0,0 +1,362 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3; + +import io.deephaven.util.channel.CompletableOutputStream; +import org.jetbrains.annotations.NotNull; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Uri; +import software.amazon.awssdk.services.s3.internal.multipart.SdkPojoConversionUtils; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static io.deephaven.extensions.s3.S3ChannelContext.handleS3Exception; +import static io.deephaven.extensions.s3.S3Instructions.MIN_WRITE_PART_SIZE; + +class S3CompletableOutputStream extends CompletableOutputStream { + + /** + * @see Amazon S3 User Guide + */ + private static final int MIN_PART_NUMBER = 1; + private static final int MAX_PART_NUMBER = 10000; + private static final int INVALID_PART_NUMBER = -1; + + private enum State { + OPEN, DONE, COMPLETED, ABORTED + } + + private final S3Uri uri; + private final S3AsyncClient s3AsyncClient; + private final S3Instructions s3Instructions; + + private final int writePartSize; + private final int numConcurrentWriteParts; + + private final List completedParts; + private final List pendingRequests; + + private int nextPartNumber; + private String uploadId; // Initialized on first write, changed back to null when multipart upload completed/aborted + private State state; + + S3CompletableOutputStream( + @NotNull final URI uri, + @NotNull final S3AsyncClient s3AsyncClient, + @NotNull final S3Instructions s3Instructions) { + this.uri = s3AsyncClient.utilities().parseUri(uri); + this.s3AsyncClient = s3AsyncClient; + this.s3Instructions = s3Instructions; + + this.writePartSize = s3Instructions.writePartSize(); + this.numConcurrentWriteParts = s3Instructions.numConcurrentWriteParts(); + this.pendingRequests = new ArrayList<>(numConcurrentWriteParts); + + this.nextPartNumber = MIN_PART_NUMBER; + this.completedParts = new ArrayList<>(); + this.state = State.OPEN; + } + + @Override + public void write(final int b) throws IOException { + write((dest, destOff, destCount) -> { + dest.put((byte) b); + return 1; + }, 0, 1); + } + + @Override + public void write(final byte @NotNull [] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(final byte @NotNull [] b, final int off, final int len) throws IOException { + write((dest, currentOffset, remainingLength) -> { + final int lengthToWrite = Math.min(remainingLength, dest.remaining()); + dest.put(b, currentOffset, lengthToWrite); + return lengthToWrite; + }, off, len); + } + + @FunctionalInterface + private interface DataWriter { + /** + * Writes source data from a single {@code outputStream.write} call to the given destination buffer, starting + * from the current offset in the source data. + * + * @param dest the destination buffer to write data to + * @param currentOffset the current offset in the source data + * @param remainingLength the remaining number of bytes of source data to write + * @return the number of bytes written to the destination buffer + * + * @throws IOException if an I/O error occurs during the write operation + */ + int write(ByteBuffer dest, int currentOffset, int remainingLength) throws IOException; + } + + /** + * Writes source data from a single {@code outputStream.write} call to S3 using the provided {@link DataWriter}. + * + * @param writer the {@link DataWriter} used to write data to the destination buffer + * @param off the offset in the source data from which to start writing + * @param len the length of the data to be written + * + * @throws IOException if an I/O error occurs during the write operation or if the stream is not {@link State#OPEN} + */ + private void write(@NotNull final DataWriter writer, int off, int len) throws IOException { + if (state != State.OPEN) { + throw new IOException("Cannot write to stream for uri " + uri + " because stream in state " + state + + " instead of OPEN"); + } + while (len != 0) { + if (uploadId == null) { + // Initialize the upload ID for the multipart upload + uploadId = initiateMultipartUpload(); + } + + // We use request slots in a circular queue fashion + final int nextSlotId = (nextPartNumber - 1) % numConcurrentWriteParts; + final OutgoingRequest useRequest; + if (pendingRequests.size() == nextSlotId) { + pendingRequests.add(useRequest = new OutgoingRequest(writePartSize)); + } else if (pendingRequests.size() < nextSlotId) { + throw new IllegalStateException("Unexpected slot ID " + nextSlotId + " for uri " + uri + " with " + + pendingRequests.size() + " pending requests."); + } else { + useRequest = pendingRequests.get(nextSlotId); + // Wait for the oldest upload to complete if no space is available + if (useRequest.future != null) { + waitForCompletion(useRequest); + } + } + + // Write as much as possible to this buffer + final ByteBuffer buffer = useRequest.buffer; + final int lengthWritten = writer.write(buffer, off, len); + if (!buffer.hasRemaining()) { + sendPartRequest(useRequest); + } + off += lengthWritten; + len -= lengthWritten; + } + } + + @Override + public void flush() throws IOException { + // Flush the next part if it is larger than the minimum part size + flushImpl(false); + } + + @Override + public void done() throws IOException { + if (state == State.DONE) { + return; + } + if (state != State.OPEN) { + throw new IOException("Cannot mark stream as done for uri " + uri + " because stream in state " + state + + " instead of OPEN"); + } + flushImpl(true); + state = State.DONE; + } + + @Override + public void complete() throws IOException { + if (state == State.COMPLETED) { + return; + } + done(); + completeMultipartUpload(); + state = State.COMPLETED; + } + + @Override + public void rollback() throws IOException { + if (state == State.COMPLETED || state == State.ABORTED) { + // Cannot roll back a completed or aborted multipart upload + return; + } + abortMultipartUpload(); + state = State.ABORTED; + } + + @Override + public void close() throws IOException { + if (state == State.COMPLETED || state == State.ABORTED) { + return; + } + abortMultipartUpload(); + state = State.ABORTED; + } + + ////////// Helper methods and classes ////////// + + private static class OutgoingRequest { + /** + * The buffer for this request + */ + private final ByteBuffer buffer; + + /** + * The part number for the part to be uploaded + */ + private int partNumber; + + /** + * The future for the part upload + */ + private CompletableFuture future; + + OutgoingRequest(final int writePartSize) { + // TODO(deephaven-core#5935): Experiment with buffer pool here + buffer = ByteBuffer.allocate(writePartSize); + partNumber = INVALID_PART_NUMBER; + } + } + + private String initiateMultipartUpload() throws IOException { + final CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder() + .bucket(uri.bucket().orElseThrow()) + .key(uri.key().orElseThrow()) + .build(); + // Note: We can add support for other parameters like tagging, storage class, encryption, permissions, etc. in + // future + final CompletableFuture future = + s3AsyncClient.createMultipartUpload(createMultipartUploadRequest); + final CreateMultipartUploadResponse response; + try { + response = future.get(); + } catch (final InterruptedException | ExecutionException e) { + throw handleS3Exception(e, String.format("initiating multipart upload for uri %s", uri), s3Instructions); + } + return response.uploadId(); + } + + /** + * Send a part request for the given buffer. This method assumes that the buffer is non-empty. + */ + private void sendPartRequest(final OutgoingRequest request) throws IOException { + if (nextPartNumber > MAX_PART_NUMBER) { + throw new IOException("Cannot upload more than " + MAX_PART_NUMBER + " parts for uri " + uri + ", please" + + " try again with a larger part size"); + } + if (request.future != null) { + throw new IllegalStateException("Request already in progress for uri " + uri + " with part number " + + nextPartNumber); + } + final UploadPartRequest uploadPartRequest = UploadPartRequest.builder() + .bucket(uri.bucket().orElseThrow()) + .key(uri.key().orElseThrow()) + .uploadId(uploadId) + .partNumber(nextPartNumber) + .build(); + request.buffer.flip(); + request.future = s3AsyncClient.uploadPart(uploadPartRequest, + AsyncRequestBody.fromByteBufferUnsafe(request.buffer)); + request.partNumber = nextPartNumber; + nextPartNumber++; + } + + private void waitForCompletion(final OutgoingRequest request) throws IOException { + final UploadPartResponse uploadPartResponse; + try { + uploadPartResponse = request.future.get(); + } catch (final InterruptedException | ExecutionException e) { + throw handleS3Exception(e, String.format("waiting for part %d for uri %s to complete uploading", + request.partNumber, uri), s3Instructions); + } + completedParts.add(SdkPojoConversionUtils.toCompletedPart(uploadPartResponse, request.partNumber)); + request.buffer.clear(); + request.future = null; + request.partNumber = INVALID_PART_NUMBER; + } + + /** + * Flushes the current buffer to S3. + * + * @param force if true, forces the buffer to be flushed even if it is smaller than the minimum + * {@value S3Instructions#MIN_WRITE_PART_SIZE} MiB threshold, which should only be done for the very last + * part. + * @throws IOException if an I/O error occurs during the flush operation + */ + private void flushImpl(final boolean force) throws IOException { + final int nextSlotId = (nextPartNumber - 1) % numConcurrentWriteParts; + if (pendingRequests.size() == nextSlotId) { + // Nothing to flush + return; + } + final OutgoingRequest request = pendingRequests.get(nextSlotId); + if (request.buffer.position() != 0 + && request.future == null + && (force || request.buffer.position() >= MIN_WRITE_PART_SIZE)) { + sendPartRequest(request); + } + } + + private void completeMultipartUpload() throws IOException { + if (uploadId == null) { + throw new IllegalStateException("Cannot complete multipart upload for uri " + uri + " because upload ID " + + "is null"); + } + // Complete all pending requests in the exact order they were sent + final int partCount = nextPartNumber - 1; + for (int partNumber = completedParts.size() + 1; partNumber <= partCount; partNumber++) { + // Part numbers start from 1, therefore, we use (partNumber - 1) for the slot ID + final int slotId = (partNumber - 1) % numConcurrentWriteParts; + final OutgoingRequest request = pendingRequests.get(slotId); + waitForCompletion(request); + } + final CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder() + .bucket(uri.bucket().orElseThrow()) + .key(uri.key().orElseThrow()) + .uploadId(uploadId) + .multipartUpload(CompletedMultipartUpload.builder() + .parts(completedParts) + .build()) + .build(); + try { + s3AsyncClient.completeMultipartUpload(completeRequest).get(); + } catch (final InterruptedException | ExecutionException e) { + throw handleS3Exception(e, String.format("completing multipart upload for uri %s", uri), s3Instructions); + } + uploadId = null; + } + + /** + * Abort the multipart upload if it is in progress. + */ + private void abortMultipartUpload() throws IOException { + if (uploadId == null) { + throw new IllegalStateException("Cannot abort multipart upload for uri " + uri + " because upload ID " + + "is null"); + } + final AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder() + .bucket(uri.bucket().orElseThrow()) + .key(uri.key().orElseThrow()) + .uploadId(uploadId) + .build(); + try { + s3AsyncClient.abortMultipartUpload(abortRequest).get(); + } catch (final InterruptedException | ExecutionException e) { + throw handleS3Exception(e, String.format("aborting multipart upload for uri %s", uri), s3Instructions); + } + uploadId = null; + } +} diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java index 27d313a235c..f6a259c26aa 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java @@ -24,12 +24,23 @@ @CopyableStyle public abstract class S3Instructions implements LogOutputAppendable { - private final static int DEFAULT_MAX_CONCURRENT_REQUESTS = 256; - private final static int DEFAULT_READ_AHEAD_COUNT = 32; - private final static int DEFAULT_FRAGMENT_SIZE = 1 << 16; // 64 KiB - private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KiB - private final static Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(2); - private final static Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(2); + private static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 256; + private static final int DEFAULT_READ_AHEAD_COUNT = 32; + private static final int DEFAULT_FRAGMENT_SIZE = 1 << 16; // 64 KiB + private static final int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KiB + private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(2); + private static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(2); + private static final int DEFAULT_NUM_CONCURRENT_WRITE_PARTS = 64; + + /** + * We set default part size to 10 MiB. The maximum number of parts allowed is 10,000. This means maximum size of a + * single file that we can write is roughly 100k MiB (or about 98 GiB). For uploading larger files, user would need + * to set a larger part size. + * + * @see Amazon S3 User Guide + */ + private static final int DEFAULT_WRITE_PART_SIZE = 10 << 20; // 10 MiB + static final int MIN_WRITE_PART_SIZE = 5 << 20; // 5 MiB static final S3Instructions DEFAULT = builder().build(); @@ -99,6 +110,28 @@ public Credentials credentials() { return Credentials.defaultCredentials(); } + /** + * The size of each part (in bytes) to upload when writing to S3, defaults to {@value #DEFAULT_WRITE_PART_SIZE}. The + * minimum allowed part size is {@value #MIN_WRITE_PART_SIZE}. Setting a higher value may increase throughput, but + * may also increase memory usage. Note that the maximum number of parts allowed for a single file is 10,000. + * Therefore, for {@value #DEFAULT_WRITE_PART_SIZE} part size, the maximum size of a single file that can be written + * is {@value #DEFAULT_WRITE_PART_SIZE} * 10,000 bytes. + */ + @Default + public int writePartSize() { + return DEFAULT_WRITE_PART_SIZE; + } + + /** + * The maximum number of parts that can be uploaded concurrently when writing to S3 without blocking. Setting a + * higher value may increase throughput, but may also increase memory usage. Defaults to + * {@value #DEFAULT_NUM_CONCURRENT_WRITE_PARTS}. + */ + @Default + public int numConcurrentWriteParts() { + return DEFAULT_NUM_CONCURRENT_WRITE_PARTS; + } + @Override public LogOutput append(final LogOutput logOutput) { return logOutput.append(toString()); @@ -129,6 +162,10 @@ public interface Builder { Builder endpointOverride(URI endpointOverride); + Builder writePartSize(int writePartSize); + + Builder numConcurrentWriteParts(int numConcurrentWriteParts); + default Builder endpointOverride(String endpointOverride) { return endpointOverride(URI.create(endpointOverride)); } @@ -174,6 +211,31 @@ final void awsSdkV2Credentials() { } } + @Check + final void boundsCheckWritePartSize() { + if (writePartSize() < MIN_WRITE_PART_SIZE) { + throw new IllegalArgumentException( + "writePartSize(=" + writePartSize() + ") must be >= " + MIN_WRITE_PART_SIZE + " MiB"); + } + } + + @Check + final void boundsCheckMinNumConcurrentWriteParts() { + if (numConcurrentWriteParts() < 1) { + throw new IllegalArgumentException( + "numConcurrentWriteParts(=" + numConcurrentWriteParts() + ") must be >= 1"); + } + } + + @Check + final void boundsCheckMaxNumConcurrentWriteParts() { + if (numConcurrentWriteParts() > maxConcurrentRequests()) { + throw new IllegalArgumentException( + "numConcurrentWriteParts(=" + numConcurrentWriteParts() + ") must be <= " + + "maxConcurrentRequests(=" + maxConcurrentRequests() + ")"); + } + } + final AwsCredentialsProvider awsV2CredentialsProvider() { return ((AwsSdkV2Credentials) credentials()).awsV2CredentialsProvider(); } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java index 7083e1c22cf..4bd06a1b661 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java @@ -11,6 +11,7 @@ import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.util.channel.Channels; +import io.deephaven.util.channel.CompletableOutputStream; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; import org.jetbrains.annotations.NotNull; @@ -28,7 +29,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.channels.SeekableByteChannel; -import java.nio.file.Path; import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; @@ -130,8 +130,9 @@ public boolean isCompatibleWith(@NotNull final SeekableChannelContext channelCon } @Override - public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boolean append) { - throw new UnsupportedOperationException("Writing to S3 is currently unsupported"); + public CompletableOutputStream getOutputStream(@NotNull final URI uri, final int bufferSizeHint) { + // bufferSizeHint is unused because s3 output stream is buffered internally into parts + return new S3CompletableOutputStream(uri, s3AsyncClient, s3Instructions); } @Override diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java index 521bc02f6be..4d6ef35ce4a 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java @@ -22,6 +22,8 @@ void defaults() { assertThat(instructions.connectionTimeout()).isEqualTo(Duration.ofSeconds(2)); assertThat(instructions.readTimeout()).isEqualTo(Duration.ofSeconds(2)); assertThat(instructions.credentials()).isEqualTo(Credentials.defaultCredentials()); + assertThat(instructions.writePartSize()).isEqualTo(10485760); + assertThat(instructions.numConcurrentWriteParts()).isEqualTo(64); assertThat(instructions.endpointOverride()).isEmpty(); } @@ -36,13 +38,25 @@ void testSetRegion() { } @Test - void minMaxConcurrentRequests() { + void testSetMaxConcurrentRequests() { assertThat(S3Instructions.builder() .regionName("some-region") - .maxConcurrentRequests(1) + .maxConcurrentRequests(100) .build() .maxConcurrentRequests()) - .isEqualTo(1); + .isEqualTo(100); + } + + @Test + void testMinMaxConcurrentRequests() { + try { + S3Instructions.builder() + .regionName("some-region") + .maxConcurrentRequests(-1) + .build(); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("maxConcurrentRequests"); + } } @Test @@ -122,4 +136,41 @@ void badCredentials() { assertThat(e).hasMessageContaining("credentials"); } } + + @Test + void tooSmallWritePartSize() { + try { + S3Instructions.builder() + .regionName("some-region") + .writePartSize(1024) + .build(); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("writePartSize"); + } + } + + @Test + void tooSmallNumConcurrentWriteParts() { + try { + S3Instructions.builder() + .regionName("some-region") + .numConcurrentWriteParts(0) + .build(); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("numConcurrentWriteParts"); + } + } + + @Test + void tooLargeNumConcurrentWriteParts() { + try { + S3Instructions.builder() + .regionName("some-region") + .numConcurrentWriteParts(1001) + .maxConcurrentRequests(1000) + .build(); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("numConcurrentWriteParts"); + } + } } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java index 68f6a9042c7..a0cf78b0f3a 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java @@ -5,8 +5,10 @@ import io.deephaven.extensions.s3.testlib.S3SeekableChannelTestSetup; import io.deephaven.util.channel.CachedChannelProvider; +import io.deephaven.util.channel.CompletableOutputStream; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; +import junit.framework.TestCase; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -88,4 +90,60 @@ public int read() { assertThat(readChannel.read(buffer)).isEqualTo(-1); } } + + @Test + void readWriteTest() throws IOException { + final URI uri = uri("writeReadTest.txt"); + final String content = "Hello, world!"; + final byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8); + try ( + final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); + final CompletableOutputStream outputStream = provider.getOutputStream(uri, 0)) { + final int numBytes = 36 * 1024 * 1024; // 36 Mib -> Three 10-MiB parts + One 6-MiB part + final int numIters = numBytes / contentBytes.length; + for (int i = 0; i < numIters; ++i) { + outputStream.write(contentBytes); + } + outputStream.flush(); + outputStream.flush(); + outputStream.write(contentBytes); + outputStream.flush(); + outputStream.flush(); + outputStream.done(); + outputStream.flush(); + try { + outputStream.write(contentBytes); + TestCase.fail("Failure expected on writing since the stream is marked as done."); + } catch (IOException expected) { + } + + // Push data to S3, but don't close the stream + outputStream.complete(); + try ( + final SeekableChannelContext context = provider.makeContext(); + final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { + final ByteBuffer buffer = ByteBuffer.allocate(contentBytes.length); + // We wrote total of numIters + 1 times + for (int i = 0; i < numIters + 1; ++i) { + fillBuffer(readChannel, buffer); + assertThat(buffer).isEqualTo(ByteBuffer.wrap(contentBytes)); + buffer.clear(); + } + // We should have read all the data from the channel + assertThat(readChannel.read(buffer)).isEqualTo(-1); + } + + // Try rollback, should not delete the file + outputStream.rollback(); + try ( + final SeekableChannelContext context = provider.makeContext(); + final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { + final ByteBuffer buffer = ByteBuffer.allocate(contentBytes.length); + readChannel.read(buffer); + buffer.flip(); + assertThat(buffer).isEqualTo(ByteBuffer.wrap(contentBytes)); + } + } + } } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java index 9d4df0a5744..2cb04c83364 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java @@ -83,4 +83,15 @@ protected static ByteBuffer readAll(ReadableByteChannel channel, int maxBytes) t dst.flip(); return dst; } + + protected static void fillBuffer(ReadableByteChannel channel, final ByteBuffer dst) throws IOException { + final int numBytes = dst.remaining(); + while (dst.remaining() > 0 && channel.read(dst) != -1) { + // continue + } + if (dst.remaining() > 0) { + throw new RuntimeException(String.format("channel has less than %d bytes", numBytes)); + } + dst.flip(); + } } diff --git a/extensions/trackedfile/build.gradle b/extensions/trackedfile/build.gradle index 3896eb1e4ee..f7e7910289d 100644 --- a/extensions/trackedfile/build.gradle +++ b/extensions/trackedfile/build.gradle @@ -11,6 +11,7 @@ dependencies { implementation project(':Base') implementation project(':Util') implementation project(':engine-table') + implementation project(':log-factory') compileOnly libs.jetbrains.annotations diff --git a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/LocalCompletableOutputStream.java b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/LocalCompletableOutputStream.java new file mode 100644 index 00000000000..5f6961f8bf6 --- /dev/null +++ b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/LocalCompletableOutputStream.java @@ -0,0 +1,245 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.trackedfile; + +import io.deephaven.UncheckedDeephavenException; +import io.deephaven.base.FileUtils; +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.logger.Logger; +import io.deephaven.util.channel.CompletableOutputStream; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.Channels; + +/** + * A {@link CompletableOutputStream} that writes to a temporary shadow file paths in the same directory to prevent + * overwriting any existing data in case of failure. + */ +class LocalCompletableOutputStream extends CompletableOutputStream { + + private static final Logger log = LoggerFactory.getLogger(LocalCompletableOutputStream.class); + + private enum State { + OPEN, DONE, COMPLETED, ROLLED_BACK + } + + private final File firstCreatedDir; + private final File destFile; + private final File shadowDestFile; + private final OutputStream shadowDelegateStream; // Writes to the shadow file + + private State state; + + LocalCompletableOutputStream( + @NotNull final File destFile, + @NotNull final TrackedSeekableChannelsProvider provider, + final int bufferSizeHint) throws IOException { + this.firstCreatedDir = prepareDestinationFileLocation(destFile); + this.destFile = destFile; + deleteBackupFile(destFile); + this.shadowDestFile = getShadowFile(destFile); + this.shadowDelegateStream = new BufferedOutputStream(Channels.newOutputStream( + provider.getWriteChannel(shadowDestFile)), bufferSizeHint); + this.state = State.OPEN; + } + + @Override + public void write(int b) throws IOException { + verifyOpen(); + shadowDelegateStream.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + verifyOpen(); + shadowDelegateStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + verifyOpen(); + shadowDelegateStream.write(b, off, len); + } + + @Override + public void flush() throws IOException { + verifyOpen(); + shadowDelegateStream.flush(); + } + + public void done() throws IOException { + if (state == State.DONE) { + return; + } + if (state != State.OPEN) { + throw new IOException("Cannot mark stream as done for file " + destFile.getAbsolutePath() + " because " + + "stream in state " + state + " instead of OPEN"); + } + flush(); + state = State.DONE; + } + + public void complete() throws IOException { + if (state == State.COMPLETED) { + return; + } + done(); + shadowDelegateStream.close(); + installShadowFile(destFile, shadowDestFile); + state = State.COMPLETED; + } + + @Override + public void rollback() { + if (state == State.ROLLED_BACK) { + return; + } + if (state == State.COMPLETED) { + rollbackShadowFiles(destFile); + } + // noinspection ResultOfMethodCallIgnored + shadowDestFile.delete(); + if (firstCreatedDir != null) { + log.error().append("Cleaning up potentially incomplete table destination path starting from ") + .append(firstCreatedDir.getAbsolutePath()).endl(); + FileUtils.deleteRecursivelyOnNFS(firstCreatedDir); + } + state = State.ROLLED_BACK; + } + + @Override + public void close() throws IOException { + if (state == State.ROLLED_BACK) { + return; + } + if (state != State.COMPLETED) { + rollback(); + return; + } + deleteBackupFileNoExcept(destFile); + } + + ////////////// Helper methods ///////////// + + private void verifyOpen() throws IOException { + if (state != State.OPEN) { + throw new IOException("Cannot write to stream for file " + destFile.getAbsolutePath() + " because stream " + + "in state " + state + " instead of OPEN"); + } + } + + /** + * Delete any old backup files created for this destination, and throw an exception on failure. + */ + private static void deleteBackupFile(@NotNull final File destFile) { + if (!deleteBackupFileNoExcept(destFile)) { + throw new UncheckedDeephavenException( + String.format("Failed to delete backup file at %s", getBackupFile(destFile).getAbsolutePath())); + } + } + + /** + * Delete any old backup files created for this destination with no exception in case of failure. + */ + private static boolean deleteBackupFileNoExcept(@NotNull final File destFile) { + final File backupDestFile = getBackupFile(destFile); + if (backupDestFile.exists() && !backupDestFile.delete()) { + log.error().append("Error in deleting backup file at path ") + .append(backupDestFile.getAbsolutePath()) + .endl(); + return false; + } + return true; + } + + private static File getBackupFile(final File destFile) { + return new File(destFile.getParent(), ".OLD_" + destFile.getName()); + } + + private static File getShadowFile(final File destFile) { + return new File(destFile.getParent(), ".NEW_" + destFile.getName()); + } + + /** + * Make any missing ancestor directories of {@code destination}. + * + * @param destination The destination file + * @return The first created directory, or null if no directories were made. + */ + @Nullable + private static File prepareDestinationFileLocation(@NotNull File destination) { + destination = destination.getAbsoluteFile(); + if (destination.exists()) { + if (destination.isDirectory()) { + throw new UncheckedDeephavenException( + String.format("Destination %s exists and is a directory", destination)); + } + if (!destination.canWrite()) { + throw new UncheckedDeephavenException( + String.format("Destination %s exists but is not writable", destination)); + } + return null; + } + final File firstParent = destination.getParentFile(); + if (firstParent.isDirectory()) { + if (firstParent.canWrite()) { + return null; + } + throw new UncheckedDeephavenException( + String.format("Destination %s has non writable parent directory", destination)); + } + File firstCreated = firstParent; + File parent; + for (parent = destination.getParentFile(); parent != null && !parent.exists(); parent = + parent.getParentFile()) { + firstCreated = parent; + } + if (parent == null) { + throw new IllegalArgumentException( + String.format("Can't find any existing parent directory for destination path: %s", destination)); + } + if (!parent.isDirectory()) { + throw new IllegalArgumentException( + String.format("Existing parent file %s of %s is not a directory", parent, destination)); + } + if (!firstParent.mkdirs()) { + throw new UncheckedDeephavenException("Couldn't (re)create destination directory " + firstParent); + } + return firstCreated; + } + + + /** + * Backup any existing files at destination and rename the shadow file to destination file. + */ + private static void installShadowFile(@NotNull final File destFile, @NotNull final File shadowDestFile) { + final File backupDestFile = getBackupFile(destFile); + if (destFile.exists() && !destFile.renameTo(backupDestFile)) { + throw new UncheckedDeephavenException( + String.format("Failed to install shadow file at %s because a file already exists at the path " + + "which couldn't be renamed to %s", destFile.getAbsolutePath(), + backupDestFile.getAbsolutePath())); + } + if (!shadowDestFile.renameTo(destFile)) { + throw new UncheckedDeephavenException(String.format( + "Failed to install shadow file at %s because couldn't rename temporary shadow file from %s to %s", + destFile.getAbsolutePath(), shadowDestFile.getAbsolutePath(), destFile.getAbsolutePath())); + } + } + + /** + * Roll back any changes made in the {@link #installShadowFile} in best-effort manner. + */ + private static void rollbackShadowFiles(@NotNull final File destFile) { + final File backupDestFile = getBackupFile(destFile); + final File shadowDestFile = getShadowFile(destFile); + destFile.renameTo(shadowDestFile); + backupDestFile.renameTo(destFile); + } +} diff --git a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java index 345894bba64..b4fdf1b6157 100644 --- a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java +++ b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java @@ -10,6 +10,7 @@ import io.deephaven.engine.util.file.TrackedFileHandleFactory; import io.deephaven.engine.util.file.TrackedSeekableByteChannel; import io.deephaven.util.channel.Channels; +import io.deephaven.util.channel.CompletableOutputStream; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.BaseSeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; @@ -73,11 +74,8 @@ public InputStream getInputStream(SeekableByteChannel channel, int sizeHint) { } @Override - public SeekableByteChannel getWriteChannel(@NotNull final Path filePath, final boolean append) - throws IOException { - // NB: I'm not sure this is actually the intended behavior; the "truncate-once" is per-handle, not per file. - return new TrackedSeekableByteChannel(append ? fileHandleFactory.writeAppendCreateHandleCreator - : new TruncateOnceFileCreator(fileHandleFactory), filePath.toFile()); + public CompletableOutputStream getOutputStream(@NotNull final URI uri, int bufferSizeHint) throws IOException { + return new LocalCompletableOutputStream(new File(uri), this, bufferSizeHint); } @Override @@ -94,6 +92,10 @@ public Stream walk(@NotNull final URI directory) throws IOException { return Files.walk(Path.of(directory)).map(path -> FileUtils.convertToURI(path, false)); } + SeekableByteChannel getWriteChannel(@NotNull final File destFile) throws IOException { + return new TrackedSeekableByteChannel(new TruncateOnceFileCreator(fileHandleFactory), destFile); + } + private static final class TruncateOnceFileCreator implements FileHandleFactory.FileToHandleFunction { private static final AtomicIntegerFieldUpdater FIRST_TIME_UPDATER = @@ -111,7 +113,7 @@ private TruncateOnceFileCreator(@NotNull final TrackedFileHandleFactory fileHand @NotNull @Override - public final FileHandle invoke(@NotNull final File file) throws IOException { + public FileHandle invoke(@NotNull final File file) throws IOException { if (FIRST_TIME_UPDATER.compareAndSet(this, FIRST_TIME_TRUE, FIRST_TIME_FALSE)) { return fileHandleFactory.writeTruncateCreateHandleCreator.invoke(file); } diff --git a/py/server/deephaven/experimental/s3.py b/py/server/deephaven/experimental/s3.py index c19a381b4d0..db6168aca16 100644 --- a/py/server/deephaven/experimental/s3.py +++ b/py/server/deephaven/experimental/s3.py @@ -21,14 +21,14 @@ _JS3Instructions = None """ - This module is useful for reading files stored in S3-compatible APIs. + This module is useful for reading from and writing to S3-compatible APIs. Importing this module requires the S3 specific deephaven extensions (artifact name deephaven-extensions-s3) to be included in the package. This is an opt-out functionality included by default. If not included, importing this module will fail to find the java types. """ class S3Instructions(JObjectWrapper): """ - S3Instructions provides specialized instructions for reading from S3-compatible APIs. + S3Instructions provides specialized instructions for reading from and writing to S3-compatible APIs. """ j_object_type = _JS3Instructions or type(None) @@ -45,7 +45,9 @@ def __init__(self, access_key_id: Optional[str] = None, secret_access_key: Optional[str] = None, anonymous_access: bool = False, - endpoint_override: Optional[str] = None): + endpoint_override: Optional[str] = None, + write_part_size: Optional[int] = None, + num_concurrent_write_parts: Optional[int] = None): """ Initializes the instructions. @@ -76,6 +78,14 @@ def __init__(self, anonymous access. Can't be combined with other credentials. By default, is False. endpoint_override (str): the endpoint to connect to. Callers connecting to AWS do not typically need to set this; it is most useful when connecting to non-AWS, S3-compatible APIs. + write_part_size (int): Writes to S3 are done in parts or chunks, and this value determines the size of each + part (in bytes). The default value is 10485760 (= 10 MiB) and minimum allowed part size is 5 MiB. + Setting a higher value may increase throughput, but may also increase memory usage. + Note that the maximum number of parts allowed for a single file is 10,000. Therefore, for 10 MiB part + size, the maximum size of a single file that can be written is roughly 100k MiB (or about 98 GiB). + num_concurrent_write_parts (int): the maximum number of parts that can be uploaded concurrently when writing + to S3 without blocking, defaults to 64. Setting a higher value may increase throughput, but may also + increase memory usage. Raises: DHError: If unable to build the instructions object. @@ -120,6 +130,12 @@ def __init__(self, if endpoint_override is not None: builder.endpointOverride(endpoint_override) + if write_part_size is not None: + builder.writePartSize(write_part_size) + + if num_concurrent_write_parts is not None: + builder.numConcurrentWriteParts(num_concurrent_write_parts) + self._j_object = builder.build() except Exception as e: raise DHError(e, "Failed to build S3 instructions") from e diff --git a/py/server/deephaven/parquet.py b/py/server/deephaven/parquet.py index dc877660671..61614c37061 100644 --- a/py/server/deephaven/parquet.py +++ b/py/server/deephaven/parquet.py @@ -242,13 +242,14 @@ def write( max_dictionary_size: Optional[int] = None, target_page_size: Optional[int] = None, generate_metadata_files: Optional[bool] = None, - index_columns: Optional[Sequence[Sequence[str]]] = None + index_columns: Optional[Sequence[Sequence[str]]] = None, + special_instructions: Optional[s3.S3Instructions] = None ) -> None: """ Write a table to a Parquet file. Args: table (Table): the source table - path (str): the destination file path; the file name should end in a ".parquet" extension. If the path + path (str): the destination file path or URI; the file name should end in a ".parquet" extension. If the path includes any non-existing directories, they are created. If there is an error, any intermediate directories previously created are removed; note this makes this method unsafe for concurrent use table_definition (Optional[Union[Dict[str, DType], List[Column]]): the table definition to use for writing, @@ -275,6 +276,8 @@ def write( source table. This argument can be used to narrow the set of indexes to write, or to be explicit about the expected set of indexes present on all sources. Indexes that are specified but missing will be computed on demand. + special_instructions (Optional[s3.S3Instructions]): Special instructions for writing parquet files, useful when + writing files to a non-local file system, like S3. By default, None. Raises: DHError """ @@ -289,6 +292,7 @@ def write( generate_metadata_files=generate_metadata_files, table_definition=table_definition, index_columns=index_columns, + special_instructions=special_instructions, ) _JParquetTools.writeTable(table.j_table, path, write_instructions) except Exception as e: @@ -306,7 +310,8 @@ def write_partitioned( target_page_size: Optional[int] = None, base_name: Optional[str] = None, generate_metadata_files: Optional[bool] = None, - index_columns: Optional[Sequence[Sequence[str]]] = None + index_columns: Optional[Sequence[Sequence[str]]] = None, + special_instructions: Optional[s3.S3Instructions] = None ) -> None: """ Write table to disk in parquet format with the partitioning columns written as "key=value" format in a nested directory structure. For example, for a partitioned column "date", we will have a directory structure like @@ -316,8 +321,9 @@ def write_partitioned( Args: table (Table): the source table or partitioned table - destination_dir (str): The path to destination root directory in which the partitioned parquet data will be stored - in a nested directory structure format. Non-existing directories in the provided path will be created. + destination_dir (str): The path or URI to the destination root directory in which the partitioned parquet data + will be stored in a nested directory structure format. Non-existing directories in the provided path will be + created. table_definition (Optional[Union[Dict[str, DType], List[Column]]): the table definition to use for writing, instead of the definitions implied by the table. Default is None, which means use the column definitions implied by the table. This definition can be used to skip some columns or add additional columns with @@ -354,6 +360,8 @@ def write_partitioned( source table. This argument can be used to narrow the set of indexes to write, or to be explicit about the expected set of indexes present on all sources. Indexes that are specified but missing will be computed on demand. + special_instructions (Optional[s3.S3Instructions]): Special instructions for writing parquet files, useful when + writing files to a non-local file system, like S3. By default, None. Raises: DHError @@ -370,6 +378,7 @@ def write_partitioned( base_name=base_name, table_definition=table_definition, index_columns=index_columns, + special_instructions=special_instructions, ) _JParquetTools.writeKeyValuePartitionedTable(table.j_object, destination_dir, write_instructions) except Exception as e: @@ -386,7 +395,8 @@ def batch_write( max_dictionary_size: Optional[int] = None, target_page_size: Optional[int] = None, generate_metadata_files: Optional[bool] = None, - index_columns: Optional[Sequence[Sequence[str]]] = None + index_columns: Optional[Sequence[Sequence[str]]] = None, + special_instructions: Optional[s3.S3Instructions] = None ): """ Writes tables to disk in parquet format to a supplied set of paths. @@ -394,7 +404,7 @@ def batch_write( Args: tables (List[Table]): the source tables - paths (List[str]): the destination paths. Any non-existing directories in the paths provided are + paths (List[str]): the destination paths or URIs. Any non-existing directories in the paths provided are created. If there is an error, any intermediate directories previously created are removed; note this makes this method unsafe for concurrent use table_definition (Optional[Union[Dict[str, DType], List[Column]]]): the table definition to use for writing. @@ -420,6 +430,8 @@ def batch_write( source table. This argument can be used to narrow the set of indexes to write, or to be explicit about the expected set of indexes present on all sources. Indexes that are specified but missing will be computed on demand. + special_instructions (Optional[s3.S3Instructions]): Special instructions for writing parquet files, useful when + writing files to a non-local file system, like S3. By default, None. Raises: DHError @@ -435,6 +447,7 @@ def batch_write( generate_metadata_files=generate_metadata_files, table_definition=table_definition, index_columns=index_columns, + special_instructions=special_instructions, ) _JParquetTools.writeTables([t.j_table for t in tables], _j_string_array(paths), write_instructions) except Exception as e: