Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support to write parquet files to S3 #5914

Merged
merged 19 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,9 @@ public InputStream getInputStream(final SeekableByteChannel channel, final int s
}

@Override
public SeekableByteChannel getWriteChannel(@NotNull final URI uri, final boolean append) throws IOException {
final String pathKey = uri.toString();
final ChannelType channelType = append ? ChannelType.WriteAppend : ChannelType.Write;
final KeyedObjectHashMap<String, PerPathPool> channelPool = channelPools.get(channelType);
final CachedChannel result = tryGetPooledChannel(pathKey, channelPool);
return result == null
? new CachedChannel(wrappedProvider.getWriteChannel(uri, append), channelType, pathKey)
: result.position(append ? result.size() : 0); // The seek isn't really necessary for append; will be at
// end no matter what.
}

@Override
public void abort(final @NotNull OutputStream outputStream) throws IOException {
wrappedProvider.abort(outputStream);
public final CompletableOutputStream getOutputStream(@NotNull final URI uri, int bufferSizeHint)
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
throws IOException {
return wrappedProvider.getOutputStream(uri, bufferSizeHint);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.util.channel;

import java.io.IOException;
import java.io.OutputStream;

/**
* An {@link OutputStream} that can be marked as done, completed, or rolled back.
* <p>
* The {@link #done()} method is to push all cached data to the underlying storage, {@link #complete()} to finalize the
* write operation, and {@link #rollback()} to cancel the write. Closing this output stream without calling done or
* complete will not write any data to the underlying storage.
* <p>
* One usage pattern can be like this:
*
* <pre>
* try (final CompletableOutputStream outputStream = CreateCompletableOutputStream()) {
* try {
* IOUtils.copy(inputStream, outputStream);
* outputStream.done();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* outputStream.close();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* } catch (IOException e) {
* outputStream.rollback();
* }
* }
* </pre>
*/
public abstract class CompletableOutputStream extends OutputStream {
/**
* Pushes all cached data to the underlying storage. This method should be called after the user is done writing to
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* the output stream. All writes to the output stream after calling this method will lead to an {@link IOException}.
*/
public abstract void done() throws IOException;

/**
* Push all cached data to underlying storage and commit the data to the underlying storage. This method should be
* called after the user is done writing to the output stream. All writes to the output stream after calling this
* method will lead to an {@link IOException}.
*/
public abstract void complete() throws IOException;

/**
* Try to roll back any data committed to the underlying storage, reverting back to the original state before
* opening this stream.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
public abstract void rollback() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,8 @@ public InputStream getInputStream(final SeekableByteChannel channel, final int s
}

@Override
public SeekableByteChannel getWriteChannel(@NotNull final URI uri, final boolean append) throws IOException {
final FileChannel result = FileChannel.open(Path.of(uri),
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
append ? StandardOpenOption.APPEND : StandardOpenOption.TRUNCATE_EXISTING);
if (append) {
result.position(result.size());
} else {
result.position(0);
}
return result;
public final CompletableOutputStream getOutputStream(@NotNull final URI uri, int bufferSizeHint) {
throw new UnsupportedOperationException("Not implemented");
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.util.stream.Stream;

Expand Down Expand Up @@ -93,35 +90,17 @@ SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContex
*/
InputStream getInputStream(SeekableByteChannel channel, int sizeHint) throws IOException;

default SeekableByteChannel getWriteChannel(@NotNull final String uriStr, final boolean append) throws IOException {
return getWriteChannel(convertToURI(uriStr, false), append);
}

SeekableByteChannel getWriteChannel(@NotNull URI uri, boolean append) throws IOException;

/**
* Creates an {@link OutputStream} to write to the given URI. The caller is responsible for closing the stream. To
* abort upload, users should call {@link #abort(OutputStream)} on the stream.
* Creates a {@link CompletableOutputStream} to write to the given URI.
*
* @param uri the URI to write to
* @param append whether to append to the file if it already exists
* @param bufferSizeHint the number of bytes the caller expects to buffer before flushing
* @return the output stream
* @throws IOException if an IO exception occurs
* @see CompletableOutputStream
*/
default OutputStream getOutputStream(@NotNull final URI uri, boolean append, int bufferSizeHint)
throws IOException {
return new BufferedOutputStream(Channels.newOutputStream(getWriteChannel(uri, append)), bufferSizeHint);
}
CompletableOutputStream getOutputStream(@NotNull final URI uri, int bufferSizeHint) throws IOException;

/**
* Tries to abort the write operation and closes the provided output stream, assuming the stream was created by this
* provider.
*/
default void abort(@NotNull final OutputStream outputStream) throws IOException {
// By default, we cannot abort the write operation, so just close the stream.
outputStream.close();
}

/**
* Returns a stream of URIs, the elements of which are the entries in the directory. The listing is non-recursive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,50 +53,17 @@ public void testSimpleRead() throws IOException {
}

@Test
public void testSimpleReadWrite() throws IOException {
public void testSimplePooledReadChannelClose() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc =
((i / 100) % 2 == 0 ? cachedChannelProvider.getReadChannel(wrappedProvider.makeContext(), "r" + i)
: cachedChannelProvider.getWriteChannel("w" + i, false));
SeekableByteChannel rc = cachedChannelProvider.getReadChannel(wrappedProvider.makeContext(), "r" + i);
rc.close();
}
assertEquals(900, closed.size());
assertTrue(closed.get(0).endsWith("r0"));
}

@Test
public void testSimpleWrite() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("w" + i, false);
// Call write to hit the assertions inside the mock channel
final ByteBuffer buffer = ByteBuffer.allocate(1);
rc.write(buffer);
rc.close();
}
assertEquals(900, closed.size());
for (int i = 0; i < 900; i++) {
assertTrue(closed.get(i).endsWith("w" + (i)));
}
}

@Test
public void testSimpleAppend() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("a" + i, true);
rc.close();
}
assertEquals(900, closed.size());
for (int i = 0; i < 900; i++) {
assertTrue(closed.get(i).endsWith("a" + (i)));
}
}

@Test
public void testCloseOrder() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
Expand Down Expand Up @@ -146,33 +113,6 @@ public void testReuse() throws IOException {
assertEquals(0, closed.size());
}

@Test
public void testReuse10() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
final SeekableByteChannel[] someResult = new SeekableByteChannel[100];
for (int pi = 0; pi < 10; ++pi) {
for (int ci = 0; ci < 10; ++ci) {
someResult[pi * 10 + ci] = cachedChannelProvider.getWriteChannel("w" + pi % 10, false);
}
for (int ci = 0; ci < 10; ++ci) {
someResult[pi * 10 + 9 - ci].close();
}
}
for (int step = 0; step < 10; ++step) {
final SeekableByteChannel[] reused = new SeekableByteChannel[100];
for (int ri = 0; ri < 100; ++ri) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("w" + (ri / 10) % 10, false);
assertSame(rc, someResult[ri % 100]);
reused[ri] = rc;
}
for (int ri = 0; ri < 100; ++ri) {
reused[99 - ri].close();
}
}
assertEquals(0, closed.size());
}

@Test
void testRewrapCachedChannelProvider() {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
Expand Down Expand Up @@ -230,13 +170,8 @@ public SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channe
}

@Override
public SeekableByteChannel getWriteChannel(@NotNull String uriStr, boolean append) {
return new TestMockChannel(count.getAndIncrement(), uriStr);
}

@Override
public SeekableByteChannel getWriteChannel(@NotNull URI uri, boolean append) {
return new TestMockChannel(count.getAndIncrement(), uri.toString());
public CompletableOutputStream getOutputStream(@NotNull final URI uri, int bufferSizeHint) {
throw new UnsupportedOperationException("getOutputStream");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
//
package io.deephaven.parquet.base;

import io.deephaven.util.channel.CompletableOutputStream;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.OutputStream;
import java.net.URI;

/**
Expand All @@ -20,6 +20,6 @@ public void addParquetFileMetadata(final URI parquetFileURI, final ParquetMetada

@Override
public void writeMetadataFiles(
final OutputStream metadataOutputStream,
final OutputStream commonMetadataOutputStream) {}
final CompletableOutputStream metadataOutputStream,
final CompletableOutputStream commonMetadataOutputStream) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesUtils;

import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.metadata.*;
Expand All @@ -29,7 +28,7 @@
import static io.deephaven.parquet.base.ParquetUtils.MAGIC;
import static org.apache.parquet.format.Util.writeFileMetaData;

public final class ParquetFileWriter {
public final class ParquetFileWriter implements AutoCloseable {
private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
private static final int VERSION = 1;

Expand All @@ -41,12 +40,12 @@ public final class ParquetFileWriter {
private final Map<String, String> extraMetaData;
private final List<BlockMetaData> blocks = new ArrayList<>();
private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
private final URI destForMetadata;
private final URI dest;
private final ParquetMetadataFileWriter metadataFileWriter;

public ParquetFileWriter(
final URI dest,
final OutputStream destOutputStream,
final URI destForMetadata,
final int targetPageSize,
final ByteBufferAllocator allocator,
final MessageType type,
Expand All @@ -60,7 +59,7 @@ public ParquetFileWriter(
countingOutput.write(MAGIC);
this.type = type;
this.compressorAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName(codecName);
this.destForMetadata = destForMetadata;
this.dest = dest;
this.metadataFileWriter = metadataFileWriter;
}

Expand All @@ -73,12 +72,13 @@ public RowGroupWriter addRowGroup(final long size) {
return rowGroupWriter;
}

@Override
public void close() throws IOException {
serializeOffsetIndexes();
final ParquetMetadata footer =
new ParquetMetadata(new FileMetaData(type, extraMetaData, Version.FULL_VERSION), blocks);
serializeFooter(footer, countingOutput);
metadataFileWriter.addParquetFileMetadata(destForMetadata, footer);
metadataFileWriter.addParquetFileMetadata(dest, footer);
// Flush any buffered data, do not close the stream since it is managed by the layer above
countingOutput.flush();
compressorAdapter.close();
Expand All @@ -90,10 +90,19 @@ public static void serializeFooter(final ParquetMetadata footer, final CountingO
final org.apache.parquet.format.FileMetaData parquetMetadata =
metadataConverter.toParquetMetadata(VERSION, footer);
writeFileMetaData(parquetMetadata, countingOutput);
BytesUtils.writeIntLittleEndian(countingOutput, (int) (countingOutput.getCount() - footerIndex));
countingOutput.write(intToLittleEndian((int) (countingOutput.getCount() - footerIndex)));
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
countingOutput.write(MAGIC);
}

private static byte[] intToLittleEndian(final int value) {
return new byte[] {
(byte) (value & 0xFF),
(byte) ((value >> 8) & 0xFF),
(byte) ((value >> 16) & 0xFF),
(byte) ((value >> 24) & 0xFF)
};
}

private void serializeOffsetIndexes() throws IOException {
for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
final List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
//
package io.deephaven.parquet.base;

import io.deephaven.util.channel.CompletableOutputStream;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;

/**
Expand All @@ -26,11 +26,12 @@ public interface ParquetMetadataFileWriter {

/**
* Write the combined metadata to the provided streams and clear the metadata accumulated so far. The output streams
* should be managed by the caller and will not be closed by this API.
* should be marked as {@link CompletableOutputStream#done()} after writing is finished.
*
* @param metadataOutputStream The output stream for the {@value ParquetUtils#METADATA_FILE_NAME} file
* @param commonMetadataOutputStream The output stream for the {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file
*/
void writeMetadataFiles(OutputStream metadataOutputStream, OutputStream commonMetadataOutputStream)
throws IOException;
void writeMetadataFiles(
CompletableOutputStream metadataOutputStream,
CompletableOutputStream commonMetadataOutputStream) throws IOException;
}
Loading
Loading