Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support to write parquet files to S3 #5914

Merged
merged 19 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Stream;

Expand Down Expand Up @@ -111,15 +110,9 @@ public InputStream getInputStream(final SeekableByteChannel channel, final int s
}

@Override
public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boolean append) throws IOException {
final String pathKey = path.toAbsolutePath().toString();
final ChannelType channelType = append ? ChannelType.WriteAppend : ChannelType.Write;
final KeyedObjectHashMap<String, PerPathPool> channelPool = channelPools.get(channelType);
final CachedChannel result = tryGetPooledChannel(pathKey, channelPool);
return result == null
? new CachedChannel(wrappedProvider.getWriteChannel(path, append), channelType, pathKey)
: result.position(append ? result.size() : 0); // The seek isn't really necessary for append; will be at
// end no matter what.
public final CompletableOutputStream getOutputStream(@NotNull final URI uri, final int bufferSizeHint)
throws IOException {
return wrappedProvider.getOutputStream(uri, bufferSizeHint);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.util.channel;

import java.io.IOException;
import java.io.OutputStream;

/**
* An {@link OutputStream} that can be marked as done, completed, or rolled back.
* <p>
* The {@link #done()} method is used to flush all buffered data to the underlying storage, {@link #complete()} to
* finalize the write operation, and {@link #rollback()} to cancel the write. Closing this output stream without calling
* complete will not flush data to the underlying storage.
* <p>
* One usage pattern can be like this:
*
* <pre>
* try (final CompletableOutputStream outputStream = CreateCompletableOutputStream()) {
* try {
* IOUtils.copy(inputStream, outputStream);
* outputStream.done(); // Optional; use this to flush buffered data without completing the stream
* outputStream.complete();
* } catch (IOException e) {
* outputStream.rollback();
* }
* }
* </pre>
*/
public abstract class CompletableOutputStream extends OutputStream {

/**
* Flush all buffered data to the underlying storage. This is optional and should be called after the user is done
* writing to the output stream. All writes to the output stream after calling this method will lead to an
* {@link IOException}.
*/
public abstract void done() throws IOException;

/**
* Flush all buffered data and save all written data to the underlying storage. This method should be called after
* the user is done writing to the output stream. All writes to the output stream after calling this method will
* lead to an {@link IOException}.
*/
public abstract void complete() throws IOException;

/**
* Try to roll back any data written to the underlying storage, reverting back to the original state before opening
* this stream. This is an optional operation, as some implementations may not be able to support it.
*/
public abstract void rollback() throws IOException;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import java.io.InputStream;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;

import static io.deephaven.base.FileUtils.convertToURI;
Expand Down Expand Up @@ -92,11 +90,17 @@ SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContex
*/
InputStream getInputStream(SeekableByteChannel channel, int sizeHint) throws IOException;

default SeekableByteChannel getWriteChannel(@NotNull final String path, final boolean append) throws IOException {
return getWriteChannel(Paths.get(path), append);
}
/**
* Creates a {@link CompletableOutputStream} to write to the given URI.
*
* @param uri the URI to write to
* @param bufferSizeHint the number of bytes the caller expects to buffer before flushing
* @return the output stream
* @throws IOException if an IO exception occurs
* @see CompletableOutputStream
*/
CompletableOutputStream getOutputStream(@NotNull final URI uri, int bufferSizeHint) throws IOException;

SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) throws IOException;

/**
* Returns a stream of URIs, the elements of which are the entries in the directory. The listing is non-recursive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -54,50 +53,17 @@ public void testSimpleRead() throws IOException {
}

@Test
public void testSimpleReadWrite() throws IOException {
public void testSimplePooledReadChannelClose() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc =
((i / 100) % 2 == 0 ? cachedChannelProvider.getReadChannel(wrappedProvider.makeContext(), "r" + i)
: cachedChannelProvider.getWriteChannel("w" + i, false));
SeekableByteChannel rc = cachedChannelProvider.getReadChannel(wrappedProvider.makeContext(), "r" + i);
rc.close();
}
assertEquals(900, closed.size());
assertTrue(closed.get(0).endsWith("r0"));
}

@Test
public void testSimpleWrite() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("w" + i, false);
// Call write to hit the assertions inside the mock channel
final ByteBuffer buffer = ByteBuffer.allocate(1);
rc.write(buffer);
rc.close();
}
assertEquals(900, closed.size());
for (int i = 0; i < 900; i++) {
assertTrue(closed.get(i).endsWith("w" + (i)));
}
}

@Test
public void testSimpleAppend() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
for (int i = 0; i < 1000; i++) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("a" + i, true);
rc.close();
}
assertEquals(900, closed.size());
for (int i = 0; i < 900; i++) {
assertTrue(closed.get(i).endsWith("a" + (i)));
}
}

@Test
public void testCloseOrder() throws IOException {
SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
Expand Down Expand Up @@ -147,33 +113,6 @@ public void testReuse() throws IOException {
assertEquals(0, closed.size());
}

@Test
public void testReuse10() throws IOException {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100);
final SeekableByteChannel[] someResult = new SeekableByteChannel[100];
for (int pi = 0; pi < 10; ++pi) {
for (int ci = 0; ci < 10; ++ci) {
someResult[pi * 10 + ci] = cachedChannelProvider.getWriteChannel("w" + pi % 10, false);
}
for (int ci = 0; ci < 10; ++ci) {
someResult[pi * 10 + 9 - ci].close();
}
}
for (int step = 0; step < 10; ++step) {
final SeekableByteChannel[] reused = new SeekableByteChannel[100];
for (int ri = 0; ri < 100; ++ri) {
SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("w" + (ri / 10) % 10, false);
assertSame(rc, someResult[ri % 100]);
reused[ri] = rc;
}
for (int ri = 0; ri < 100; ++ri) {
reused[99 - ri].close();
}
}
assertEquals(0, closed.size());
}

@Test
void testRewrapCachedChannelProvider() {
final SeekableChannelsProvider wrappedProvider = new TestChannelProvider();
Expand Down Expand Up @@ -231,13 +170,8 @@ public SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channe
}

@Override
public SeekableByteChannel getWriteChannel(@NotNull String path, boolean append) {
return new TestMockChannel(count.getAndIncrement(), path);
}

@Override
public SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) {
return new TestMockChannel(count.getAndIncrement(), path.toString());
public CompletableOutputStream getOutputStream(@NotNull final URI uri, int bufferSizeHint) {
throw new UnsupportedOperationException("getOutputStream");
}

@Override
Expand Down
1 change: 1 addition & 0 deletions extensions/parquet/base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation project(':Configuration')
implementation project(':DataStructures')
implementation libs.commons.io
implementation libs.guava

compileOnly libs.jetbrains.annotations
testImplementation libs.junit4
Expand Down
Loading
Loading