diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java index 98cf03908e8..a2ae09fb1d0 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java @@ -197,7 +197,7 @@ static WritableChunk extractChunkFromInputStream( } @Deprecated - static WritableChunk extractChunkFromInputStream( + private static WritableChunk extractChunkFromInputStream( final StreamReaderOptions options, final int factor, final ChunkType chunkType, final Class type, final Class componentType, @@ -206,8 +206,8 @@ static WritableChunk extractChunkFromInputStream( final DataInput is, final WritableChunk outChunk, final int outOffset, final int totalRows) throws IOException { return DefaultChunkReadingFactory.INSTANCE.extractChunkFromInputStream(options, factor, - new ChunkReadingFactory.ChunkTypeInfo(chunkType, type, componentType, null), fieldNodeIter, - bufferInfoIter, is, outChunk, outOffset, totalRows); + new ChunkReadingFactory.ChunkTypeInfo(chunkType, type, componentType, null)) + .read(fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } /** diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java new file mode 100644 index 00000000000..de90744fc0d --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java @@ -0,0 +1,34 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage.chunk; + +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.attributes.Values; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.PrimitiveIterator; + +/** + * Consumes Flight/Barrage streams and transforms them into WritableChunks. + */ +public interface ChunkReader { + /** + * + * @param fieldNodeIter + * @param bufferInfoIter + * @param is + * @param outChunk + * @param outOffset + * @param totalRows + * @return + */ + WritableChunk read(final Iterator fieldNodeIter, + final PrimitiveIterator.OfLong bufferInfoIter, + final DataInput is, + final WritableChunk outChunk, + final int outOffset, + final int totalRows) throws IOException; +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java index d7cfb18db00..d624f5bc736 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java @@ -67,50 +67,24 @@ public Field componentArrowField() { * @param options * @param factor * @param typeInfo - * @param fieldNodeIter - * @param bufferInfoIter - * @param is - * @param outChunk - * @param outOffset - * @param totalRows * @return * @throws IOException */ - WritableChunk extractChunkFromInputStream( + ChunkReader extractChunkFromInputStream( final StreamReaderOptions options, final int factor, - final ChunkTypeInfo typeInfo, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException; + final ChunkTypeInfo typeInfo) throws IOException; /** * * @param options * @param typeInfo - * @param fieldNodeIter - * @param bufferInfoIter - * @param is - * @param outChunk - * @param offset - * @param totalRows * @return * @throws IOException */ - default WritableChunk extractChunkFromInputStream( - final StreamReaderOptions options, - final ChunkTypeInfo typeInfo, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int offset, - final int totalRows) throws IOException { - return extractChunkFromInputStream(options, 1, typeInfo, fieldNodeIter, bufferInfoIter, is, outChunk, offset, - totalRows); + default ChunkReader extractChunkFromInputStream(final StreamReaderOptions options, final ChunkTypeInfo typeInfo) + throws IOException { + return extractChunkFromInputStream(options, 1, typeInfo); } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java index e7a5bcd665c..6a7ceead2d2 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java @@ -36,167 +36,201 @@ public final class DefaultChunkReadingFactory implements ChunkReadingFactory { public static final ChunkReadingFactory INSTANCE = new DefaultChunkReadingFactory(); @Override - public WritableChunk extractChunkFromInputStream(StreamReaderOptions options, int factor, - ChunkTypeInfo typeInfo, Iterator fieldNodeIter, - PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, - int totalRows) throws IOException { + public ChunkReader extractChunkFromInputStream(StreamReaderOptions options, int factor, + ChunkTypeInfo typeInfo) throws IOException { // TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats switch (typeInfo.chunkType()) { case Boolean: throw new UnsupportedOperationException("Booleans are reinterpreted as bytes"); case Char: - return CharChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> CharChunkInputStreamGenerator.extractChunkFromInputStream( + options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); case Byte: if (typeInfo.type() == Boolean.class || typeInfo.type() == boolean.class) { - return BooleanChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> BooleanChunkInputStreamGenerator.extractChunkFromInputStream( + options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } - return ByteChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> ByteChunkInputStreamGenerator.extractChunkFromInputStream( + options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); case Short: - return ShortChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> ShortChunkInputStreamGenerator.extractChunkFromInputStream( + options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); case Int: - return IntChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> IntChunkInputStreamGenerator.extractChunkFromInputStream( + options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); case Long: if (factor == 1) { - return LongChunkInputStreamGenerator.extractChunkFromInputStream( - options, - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); - } - return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithConversion( - options, - (long v) -> v == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : (v * factor), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> LongChunkInputStreamGenerator.extractChunkFromInputStream( + options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> LongChunkInputStreamGenerator.extractChunkFromInputStreamWithConversion( + options, + (long v) -> v == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : (v * factor), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); case Float: - return FloatChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> FloatChunkInputStreamGenerator.extractChunkFromInputStream( + options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); case Double: - return DoubleChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> DoubleChunkInputStreamGenerator.extractChunkFromInputStream( + options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); case Object: if (typeInfo.type().isArray()) { if (typeInfo.componentType() == byte.class) { - return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( - is, - fieldNodeIter, - bufferInfoIter, - (buf, off, len) -> Arrays.copyOfRange(buf, off, off + len), - outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( + is, + fieldNodeIter, + bufferInfoIter, + (buf, off, len) -> Arrays.copyOfRange(buf, off, off + len), + outChunk, outOffset, totalRows); } else { - return VarListChunkInputStreamGenerator.extractChunkFromInputStream(options, typeInfo, - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows, this); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> VarListChunkInputStreamGenerator.extractChunkFromInputStream(options, + typeInfo, + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows, this); } } if (Vector.class.isAssignableFrom(typeInfo.type())) { - return VectorChunkInputStreamGenerator.extractChunkFromInputStream(options, - typeInfo, fieldNodeIter, bufferInfoIter, - is, outChunk, outOffset, totalRows, this); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> VectorChunkInputStreamGenerator.extractChunkFromInputStream(options, + typeInfo, fieldNodeIter, bufferInfoIter, + is, outChunk, outOffset, totalRows, this); } if (typeInfo.type() == BigInteger.class) { - return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( - is, - fieldNodeIter, - bufferInfoIter, - BigInteger::new, - outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( + is, + fieldNodeIter, + bufferInfoIter, + BigInteger::new, + outChunk, outOffset, totalRows); } if (typeInfo.type() == BigDecimal.class) { - return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( - is, - fieldNodeIter, - bufferInfoIter, - (final byte[] buf, final int offset, final int length) -> { - // read the int scale value as little endian, arrow's endianness. - final byte b1 = buf[offset]; - final byte b2 = buf[offset + 1]; - final byte b3 = buf[offset + 2]; - final byte b4 = buf[offset + 3]; - final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF); - return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale); - }, - outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( + is, + fieldNodeIter, + bufferInfoIter, + (final byte[] buf, final int offset, final int length) -> { + // read the int scale value as little endian, arrow's endianness. + final byte b1 = buf[offset]; + final byte b2 = buf[offset + 1]; + final byte b3 = buf[offset + 2]; + final byte b4 = buf[offset + 3]; + final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF); + return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale); + }, + outChunk, outOffset, totalRows); } if (typeInfo.type() == Instant.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Long.BYTES, options, io -> { - final long value = io.readLong(); - if (value == QueryConstants.NULL_LONG) { - return null; - } - return DateTimeUtils.epochNanosToInstant(value * factor); - }, - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> FixedWidthChunkInputStreamGenerator + .extractChunkFromInputStreamWithTypeConversion( + Long.BYTES, options, io -> { + final long value = io.readLong(); + if (value == QueryConstants.NULL_LONG) { + return null; + } + return DateTimeUtils.epochNanosToInstant(value * factor); + }, + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (typeInfo.type() == ZonedDateTime.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Long.BYTES, options, io -> { - final long value = io.readLong(); - if (value == QueryConstants.NULL_LONG) { - return null; - } - return DateTimeUtils.epochNanosToZonedDateTime( - value * factor, DateTimeUtils.timeZone()); - }, - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> FixedWidthChunkInputStreamGenerator + .extractChunkFromInputStreamWithTypeConversion( + Long.BYTES, options, io -> { + final long value = io.readLong(); + if (value == QueryConstants.NULL_LONG) { + return null; + } + return DateTimeUtils.epochNanosToZonedDateTime( + value * factor, DateTimeUtils.timeZone()); + }, + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (typeInfo.type() == Byte.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Byte.BYTES, options, io -> TypeUtils.box(io.readByte()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> FixedWidthChunkInputStreamGenerator + .extractChunkFromInputStreamWithTypeConversion( + Byte.BYTES, options, io -> TypeUtils.box(io.readByte()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (typeInfo.type() == Character.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Character.BYTES, options, io -> TypeUtils.box(io.readChar()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> FixedWidthChunkInputStreamGenerator + .extractChunkFromInputStreamWithTypeConversion( + Character.BYTES, options, io -> TypeUtils.box(io.readChar()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (typeInfo.type() == Double.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Double.BYTES, options, io -> TypeUtils.box(io.readDouble()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> FixedWidthChunkInputStreamGenerator + .extractChunkFromInputStreamWithTypeConversion( + Double.BYTES, options, io -> TypeUtils.box(io.readDouble()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (typeInfo.type() == Float.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Float.BYTES, options, io -> TypeUtils.box(io.readFloat()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> FixedWidthChunkInputStreamGenerator + .extractChunkFromInputStreamWithTypeConversion( + Float.BYTES, options, io -> TypeUtils.box(io.readFloat()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (typeInfo.type() == Integer.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Integer.BYTES, options, io -> TypeUtils.box(io.readInt()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> FixedWidthChunkInputStreamGenerator + .extractChunkFromInputStreamWithTypeConversion( + Integer.BYTES, options, io -> TypeUtils.box(io.readInt()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (typeInfo.type() == Long.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Long.BYTES, options, io -> TypeUtils.box(io.readLong()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> FixedWidthChunkInputStreamGenerator + .extractChunkFromInputStreamWithTypeConversion( + Long.BYTES, options, io -> TypeUtils.box(io.readLong()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (typeInfo.type() == Short.class) { - return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Short.BYTES, options, io -> TypeUtils.box(io.readShort()), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> FixedWidthChunkInputStreamGenerator + .extractChunkFromInputStreamWithTypeConversion( + Short.BYTES, options, io -> TypeUtils.box(io.readShort()), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (typeInfo.type() == LocalDate.class) { - return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform( - options, - value -> value == QueryConstants.NULL_LONG - ? null - : LocalDate.ofEpochDay(value / MS_PER_DAY), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform( + options, + value -> value == QueryConstants.NULL_LONG + ? null + : LocalDate.ofEpochDay(value / MS_PER_DAY), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (typeInfo.type() == LocalTime.class) { - return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform( - options, - value -> value == QueryConstants.NULL_LONG ? null : LocalTime.ofNanoOfDay(value), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform( + options, + value -> value == QueryConstants.NULL_LONG ? null : LocalTime.ofNanoOfDay(value), + fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (typeInfo.type() == String.class || options.columnConversionMode().equals(ColumnConversionMode.Stringify)) { - return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(is, fieldNodeIter, - bufferInfoIter, - (buf, off, len) -> new String(buf, off, len, Charsets.UTF_8), outChunk, outOffset, - totalRows); + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, + totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(is, + fieldNodeIter, + bufferInfoIter, + (buf, off, len) -> new String(buf, off, len, Charsets.UTF_8), outChunk, outOffset, + totalRows); } throw new UnsupportedOperationException( "Do not yet support column conversion mode: " + options.columnConversionMode()); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java index e1075e7dcf3..a1de362f385 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java @@ -244,10 +244,6 @@ static WritableObjectChunk extractChunkFromInputStream( final int totalRows, ChunkReadingFactory chunkReadingFactory) throws IOException { - final FieldNodeInfo nodeInfo = fieldNodeIter.next(); - final long validityBuffer = bufferInfoIter.nextLong(); - final long offsetsBuffer = bufferInfoIter.nextLong(); - final Class componentType = typeInfo.type().getComponentType(); final Class innerComponentType = componentType != null ? componentType.getComponentType() : null; @@ -259,13 +255,18 @@ static WritableObjectChunk extractChunkFromInputStream( chunkType = ChunkType.fromElementType(componentType); } + ChunkReader componentReader = chunkReadingFactory.extractChunkFromInputStream( + options, + new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType, + typeInfo.componentArrowField())); + + final FieldNodeInfo nodeInfo = fieldNodeIter.next(); + final long validityBuffer = bufferInfoIter.nextLong(); + final long offsetsBuffer = bufferInfoIter.nextLong(); + if (nodeInfo.numElements == 0) { - try (final WritableChunk ignored = chunkReadingFactory.extractChunkFromInputStream( - options, - new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType, - typeInfo.componentArrowField()), - fieldNodeIter, - bufferInfoIter, is, null, 0, 0)) { + try (final WritableChunk ignored = + componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { return WritableObjectChunk.makeWritableChunk(nodeInfo.numElements); } } @@ -303,11 +304,8 @@ static WritableObjectChunk extractChunkFromInputStream( } final ArrayExpansionKernel kernel = ArrayExpansionKernel.makeExpansionKernel(chunkType, componentType); - try (final WritableChunk inner = chunkReadingFactory.extractChunkFromInputStream( - options, - new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType, - typeInfo.componentArrowField()), - fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + try (final WritableChunk inner = + componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows); long nextValid = 0; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java index b7bb8cee6a4..5ee91971c5f 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java @@ -244,22 +244,22 @@ static WritableObjectChunk, Values> extractChunkFromInputStream( final int totalRows, ChunkReadingFactory chunkReadingFactory) throws IOException { + final Class componentType = + VectorExpansionKernel.getComponentType(typeInfo.type(), typeInfo.componentType()); + final ChunkType chunkType = ChunkType.fromElementType(componentType); + ChunkReader componentReader = chunkReadingFactory.extractChunkFromInputStream( + options, + new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, componentType.getComponentType(), + typeInfo.componentArrowField())); + final FieldNodeInfo nodeInfo = fieldNodeIter.next(); final long validityBuffer = bufferInfoIter.nextLong(); final long offsetsBuffer = bufferInfoIter.nextLong(); - final Class componentType = - VectorExpansionKernel.getComponentType(typeInfo.type(), typeInfo.componentType()); - final ChunkType chunkType = ChunkType.fromElementType(componentType); if (nodeInfo.numElements == 0) { - try (final WritableChunk ignored = chunkReadingFactory.extractChunkFromInputStream( - options, - new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, componentType.getComponentType(), - typeInfo.componentArrowField()), - fieldNodeIter, bufferInfoIter, - is, - null, 0, 0)) { + try (final WritableChunk ignored = + componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { if (outChunk != null) { return outChunk.asWritableObjectChunk(); } @@ -300,13 +300,8 @@ static WritableObjectChunk, Values> extractChunkFromInputStream( } final VectorExpansionKernel kernel = VectorExpansionKernel.makeExpansionKernel(chunkType, componentType); - try (final WritableChunk inner = chunkReadingFactory.extractChunkFromInputStream( - options, - new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, componentType.getComponentType(), - typeInfo.componentArrowField()), - fieldNodeIter, bufferInfoIter, - is, - null, 0, 0)) { + try (final WritableChunk inner = + componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows); long nextValid = 0; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java index b0c62c652c1..04e257263a9 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java @@ -13,6 +13,7 @@ import io.deephaven.engine.table.impl.util.BarrageMessage; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator; +import io.deephaven.extensions.barrage.chunk.ChunkReader; import io.deephaven.extensions.barrage.chunk.ChunkReadingFactory; import io.deephaven.extensions.barrage.chunk.DefaultChunkReadingFactory; import io.deephaven.extensions.barrage.table.BarrageTable; @@ -31,6 +32,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.PrimitiveIterator; import static io.deephaven.extensions.barrage.util.BarrageProtoUtil.DEFAULT_SER_OPTIONS; @@ -42,12 +44,10 @@ public class ArrowToTableConverter { protected long totalRowsRead = 0; protected BarrageTable resultTable; - private ChunkType[] columnChunkTypes; - private int[] columnConversionFactors; private Class[] columnTypes; private Class[] componentTypes; protected BarrageSubscriptionOptions options = DEFAULT_SER_OPTIONS; - private Schema schema; + private final List readers = new ArrayList<>(); private volatile boolean completed = false; @@ -139,7 +139,6 @@ public synchronized void onCompleted() throws InterruptedException { } protected void parseSchema(final Schema header) { - this.schema = header; // The Schema instance (especially originated from Python) can't be assumed to be valid after the return // of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to make a copy of // the header to use after the return of this method. @@ -151,10 +150,18 @@ protected void parseSchema(final Schema header) { resultTable = BarrageTable.make(null, result.tableDef, result.attributes, null); resultTable.setFlat(); - columnConversionFactors = result.conversionFactors; - columnChunkTypes = result.computeWireChunkTypes(); + ChunkType[] columnChunkTypes = result.computeWireChunkTypes(); columnTypes = result.computeWireTypes(); componentTypes = result.computeWireComponentTypes(); + // TODO see the note above, this is not safe since the buffer originated in python - we need to copy the schema + // before doing this + for (int i = 0; i < header.fieldsLength(); i++) { + final int factor = (result.conversionFactors == null) ? 1 : result.conversionFactors[i]; + ChunkReader reader = DefaultChunkReadingFactory.INSTANCE.extractChunkFromInputStream(options, factor, + new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[i], columnTypes[i], componentTypes[i], + header.fields(i))); + readers.add(reader); + } // retain reference until the resultTable can be sealed resultTable.retainReference(); @@ -196,13 +203,8 @@ protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, i final BarrageMessage.AddColumnData acd = new BarrageMessage.AddColumnData(); msg.addColumnData[ci] = acd; msg.addColumnData[ci].data = new ArrayList<>(); - final int factor = (columnConversionFactors == null) ? 1 : columnConversionFactors[ci]; try { - acd.data.add(DefaultChunkReadingFactory.INSTANCE.extractChunkFromInputStream(options, factor, - new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[ci], columnTypes[ci], componentTypes[ci], - schema.fields(ci)), - fieldNodeIter, - bufferInfoIter, mi.inputStream, null, 0, 0)); + acd.data.add(readers.get(ci).read(fieldNodeIter, bufferInfoIter, mi.inputStream, null, 0, 0)); } catch (final IOException unexpected) { throw new UncheckedDeephavenException(unexpected); } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java index 571082227db..b38b1eedd57 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageStreamReader.java @@ -19,12 +19,14 @@ import io.deephaven.engine.rowset.RowSetShiftData; import io.deephaven.engine.table.impl.util.*; import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator; +import io.deephaven.extensions.barrage.chunk.ChunkReader; import io.deephaven.extensions.barrage.chunk.ChunkReadingFactory; import io.deephaven.extensions.barrage.chunk.DefaultChunkReadingFactory; import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.ChunkType; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; +import org.apache.arrow.flatbuf.Field; import org.apache.arrow.flatbuf.Message; import org.apache.arrow.flatbuf.MessageHeader; import org.apache.arrow.flatbuf.RecordBatch; @@ -37,6 +39,7 @@ import java.util.Arrays; import java.util.BitSet; import java.util.Iterator; +import java.util.List; import java.util.PrimitiveIterator; import java.util.function.LongConsumer; @@ -54,10 +57,10 @@ public class BarrageStreamReader implements StreamReader { private long numModRowsRead = 0; private long numModRowsTotal = 0; - private Schema schema; private BarrageMessage msg = null; private final ChunkReadingFactory chunkReadingFactory = DefaultChunkReadingFactory.INSTANCE; + private final List readers = new ArrayList<>(); public BarrageStreamReader(final LongConsumer deserializeTmConsumer) { this.deserializeTmConsumer = deserializeTmConsumer; @@ -244,12 +247,8 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, } // fill the chunk with data and assign back into the array - acd.data.set(lastChunkIndex, - chunkReadingFactory.extractChunkFromInputStream(options, - new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[ci], - columnTypes[ci], componentTypes[ci], schema.fields(ci)), - fieldNodeIter, bufferInfoIter, ois, - chunk, chunk.size(), (int) batch.length())); + acd.data.set(lastChunkIndex, readers.get(ci).read(fieldNodeIter, bufferInfoIter, ois, chunk, + chunk.size(), (int) batch.length())); chunk.setSize(chunk.size() + (int) batch.length()); } numAddRowsRead += batch.length(); @@ -277,12 +276,8 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, } // fill the chunk with data and assign back into the array - mcd.data.set(lastChunkIndex, - chunkReadingFactory.extractChunkFromInputStream(options, - new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[ci], - columnTypes[ci], componentTypes[ci], null), - fieldNodeIter, bufferInfoIter, ois, - chunk, chunk.size(), numRowsToRead)); + mcd.data.set(lastChunkIndex, readers.get(ci).read(fieldNodeIter, bufferInfoIter, ois, chunk, + chunk.size(), numRowsToRead)); chunk.setSize(chunk.size() + numRowsToRead); } numModRowsRead += batch.length(); @@ -292,7 +287,17 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options, if (header != null && header.headerType() == MessageHeader.Schema) { // there is no body and our clients do not want to see schema messages - this.schema = (Schema) header.header(new Schema()); + Schema schema = new Schema(); + header.header(schema); + for (int i = 0; i < schema.fieldsLength(); i++) { + // TODO as with ArrowToTableConverter, see about copying the bytebuffer so we control the payload + // ourselves + Field field = schema.fields(i); + ChunkReader chunkReader = chunkReadingFactory.extractChunkFromInputStream(options, + new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[i], columnTypes[i], + componentTypes[i], field)); + readers.add(chunkReader); + } return null; }