Skip to content

Commit

Permalink
Commit #1 reading chunks, checkpoint to talk to nate, next will try r…
Browse files Browse the repository at this point in the history
…eading schema at beginning of stream
  • Loading branch information
niloc132 committed Jun 26, 2024
1 parent a3b26c7 commit c746c34
Show file tree
Hide file tree
Showing 9 changed files with 385 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.extensions.barrage.ColumnConversionMode;
import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.time.DateTimeUtils;
Expand All @@ -19,7 +18,6 @@
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.type.TypeUtils;
import io.deephaven.vector.Vector;
import org.jetbrains.annotations.Nullable;

Expand All @@ -31,7 +29,6 @@
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Iterator;
import java.util.PrimitiveIterator;

Expand Down Expand Up @@ -187,6 +184,7 @@ static <T> ChunkInputStreamGenerator makeInputStreamGenerator(
}
}

@Deprecated
static WritableChunk<Values> extractChunkFromInputStream(
final StreamReaderOptions options,
final ChunkType chunkType, final Class<?> type, final Class<?> componentType,
Expand All @@ -195,10 +193,10 @@ static WritableChunk<Values> extractChunkFromInputStream(
final DataInput is,
final WritableChunk<Values> outChunk, final int offset, final int totalRows) throws IOException {
return extractChunkFromInputStream(options, 1, chunkType, type, componentType, fieldNodeIter, bufferInfoIter,
is,
outChunk, offset, totalRows);
is, outChunk, offset, totalRows);
}

@Deprecated
static WritableChunk<Values> extractChunkFromInputStream(
final StreamReaderOptions options,
final int factor,
Expand All @@ -207,168 +205,9 @@ static WritableChunk<Values> extractChunkFromInputStream(
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk, final int outOffset, final int totalRows) throws IOException {
// TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats
switch (chunkType) {
case Boolean:
throw new UnsupportedOperationException("Booleans are reinterpreted as bytes");
case Char:
return CharChunkInputStreamGenerator.extractChunkFromInputStream(
options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
case Byte:
if (type == Boolean.class || type == boolean.class) {
return BooleanChunkInputStreamGenerator.extractChunkFromInputStream(
options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
return ByteChunkInputStreamGenerator.extractChunkFromInputStream(
options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
case Short:
return ShortChunkInputStreamGenerator.extractChunkFromInputStream(
options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
case Int:
return IntChunkInputStreamGenerator.extractChunkFromInputStream(
options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
case Long:
if (factor == 1) {
return LongChunkInputStreamGenerator.extractChunkFromInputStream(
options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithConversion(
options, (long v) -> v == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : (v * factor),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
case Float:
return FloatChunkInputStreamGenerator.extractChunkFromInputStream(
options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
case Double:
return DoubleChunkInputStreamGenerator.extractChunkFromInputStream(
options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
case Object:
if (type.isArray()) {
if (componentType == byte.class) {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
(buf, off, len) -> Arrays.copyOfRange(buf, off, off + len),
outChunk, outOffset, totalRows);
} else {
return VarListChunkInputStreamGenerator.extractChunkFromInputStream(
options, type, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
}
if (Vector.class.isAssignableFrom(type)) {
// noinspection unchecked
return VectorChunkInputStreamGenerator.extractChunkFromInputStream(
options, (Class<Vector<?>>) type, componentType, fieldNodeIter, bufferInfoIter, is,
outChunk, outOffset, totalRows);
}
if (type == BigInteger.class) {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
BigInteger::new,
outChunk, outOffset, totalRows);
}
if (type == BigDecimal.class) {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
(final byte[] buf, final int offset, final int length) -> {
// read the int scale value as little endian, arrow's endianness.
final byte b1 = buf[offset];
final byte b2 = buf[offset + 1];
final byte b3 = buf[offset + 2];
final byte b4 = buf[offset + 3];
final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF);
return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale);
},
outChunk, outOffset, totalRows);
}
if (type == Instant.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Long.BYTES, options, io -> {
final long value = io.readLong();
if (value == QueryConstants.NULL_LONG) {
return null;
}
return DateTimeUtils.epochNanosToInstant(value * factor);
},
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == ZonedDateTime.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Long.BYTES, options, io -> {
final long value = io.readLong();
if (value == QueryConstants.NULL_LONG) {
return null;
}
return DateTimeUtils.epochNanosToZonedDateTime(
value * factor, DateTimeUtils.timeZone());
},
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == Byte.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Byte.BYTES, options, io -> TypeUtils.box(io.readByte()),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == Character.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Character.BYTES, options, io -> TypeUtils.box(io.readChar()),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == Double.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Double.BYTES, options, io -> TypeUtils.box(io.readDouble()),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == Float.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Float.BYTES, options, io -> TypeUtils.box(io.readFloat()),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == Integer.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Integer.BYTES, options, io -> TypeUtils.box(io.readInt()),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == Long.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Long.BYTES, options, io -> TypeUtils.box(io.readLong()),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == Short.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Short.BYTES, options, io -> TypeUtils.box(io.readShort()),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == LocalDate.class) {
return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform(
options,
value -> value == QueryConstants.NULL_LONG
? null
: LocalDate.ofEpochDay(value / MS_PER_DAY),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == LocalTime.class) {
return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform(
options,
value -> value == QueryConstants.NULL_LONG ? null : LocalTime.ofNanoOfDay(value),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == String.class ||
options.columnConversionMode().equals(ColumnConversionMode.Stringify)) {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(is, fieldNodeIter,
bufferInfoIter,
(buf, off, len) -> new String(buf, off, len, Charsets.UTF_8), outChunk, outOffset,
totalRows);
}
throw new UnsupportedOperationException(
"Do not yet support column conversion mode: " + options.columnConversionMode());
default:
throw new UnsupportedOperationException();
}
return DefaultChunkReadingFactory.INSTANCE.extractChunkFromInputStream(options, factor,
new ChunkReadingFactory.ChunkTypeInfo(chunkType, type, componentType, null), fieldNodeIter,
bufferInfoIter, is, outChunk, outOffset, totalRows);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import org.apache.arrow.flatbuf.Field;
import org.apache.arrow.flatbuf.Type;

import java.io.DataInput;
import java.io.IOException;
import java.util.Iterator;
import java.util.PrimitiveIterator;

/**
*
*/
public interface ChunkReadingFactory {
/**
*
*/
class ChunkTypeInfo {
private final ChunkType chunkType;
private final Class<?> type;
private final Class<?> componentType;
private final Field arrowField;

public ChunkTypeInfo(ChunkType chunkType, Class<?> type, Class<?> componentType, Field arrowField) {
this.chunkType = chunkType;
this.type = type;
this.componentType = componentType;
this.arrowField = arrowField;
}

public ChunkType chunkType() {
return chunkType;
}

public Class<?> type() {
return type;
}

public Class<?> componentType() {
return componentType;
}

public Field arrowField() {
return arrowField;
}

public Field componentArrowField() {
if (arrowField.typeType() != Type.List) {
throw new IllegalStateException("Not a flight List");
}
if (arrowField.childrenLength() != 1) {
throw new IllegalStateException("Incorrect number of child Fields");
}
return arrowField.children(0);
}
}

/**
*
* @param options
* @param factor
* @param typeInfo
* @param fieldNodeIter
* @param bufferInfoIter
* @param is
* @param outChunk
* @param outOffset
* @param totalRows
* @return
* @throws IOException
*/
WritableChunk<Values> extractChunkFromInputStream(
final StreamReaderOptions options,
final int factor,
final ChunkTypeInfo typeInfo,
final Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException;

/**
*
* @param options
* @param typeInfo
* @param fieldNodeIter
* @param bufferInfoIter
* @param is
* @param outChunk
* @param offset
* @param totalRows
* @return
* @throws IOException
*/
default WritableChunk<Values> extractChunkFromInputStream(
final StreamReaderOptions options,
final ChunkTypeInfo typeInfo,
final Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int offset,
final int totalRows) throws IOException {
return extractChunkFromInputStream(options, 1, typeInfo, fieldNodeIter, bufferInfoIter, is, outChunk, offset,
totalRows);
}

}
Loading

0 comments on commit c746c34

Please sign in to comment.