From 3b0789bfedd2d0b26cef70170a80ba7b6c189dda Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 25 Jun 2024 08:06:43 -0500 Subject: [PATCH] Commit #3, create vector/array chunk readers to do type lookups once --- .../barrage/chunk/ChunkReadingFactory.java | 11 +- .../chunk/DefaultChunkReadingFactory.java | 18 +-- .../VarListChunkInputStreamGenerator.java | 90 -------------- .../barrage/chunk/VarListChunkReader.java | 114 ++++++++++++++++++ .../VectorChunkInputStreamGenerator.java | 87 ------------- .../barrage/chunk/VectorChunkReader.java | 110 +++++++++++++++++ 6 files changed, 230 insertions(+), 200 deletions(-) create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkReader.java create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkReader.java diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java index d624f5bc736..d3f8ba84a95 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReadingFactory.java @@ -68,22 +68,17 @@ public Field componentArrowField() { * @param factor * @param typeInfo * @return - * @throws IOException */ - ChunkReader extractChunkFromInputStream( - final StreamReaderOptions options, - final int factor, - final ChunkTypeInfo typeInfo) throws IOException; + ChunkReader extractChunkFromInputStream(final StreamReaderOptions options, final int factor, + final ChunkTypeInfo typeInfo); /** * * @param options * @param typeInfo * @return - * @throws IOException */ - default ChunkReader extractChunkFromInputStream(final StreamReaderOptions options, final ChunkTypeInfo typeInfo) - throws IOException { + default ChunkReader extractChunkFromInputStream(final StreamReaderOptions options, final ChunkTypeInfo typeInfo) { return extractChunkFromInputStream(options, 1, typeInfo); } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java index 6a7ceead2d2..df41a1ae7ca 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java @@ -4,8 +4,6 @@ package io.deephaven.extensions.barrage.chunk; import com.google.common.base.Charsets; -import io.deephaven.chunk.WritableChunk; -import io.deephaven.chunk.attributes.Values; import io.deephaven.extensions.barrage.ColumnConversionMode; import io.deephaven.extensions.barrage.util.StreamReaderOptions; import io.deephaven.time.DateTimeUtils; @@ -13,8 +11,6 @@ import io.deephaven.util.type.TypeUtils; import io.deephaven.vector.Vector; -import java.io.DataInput; -import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.time.Instant; @@ -22,8 +18,6 @@ import java.time.LocalTime; import java.time.ZonedDateTime; import java.util.Arrays; -import java.util.Iterator; -import java.util.PrimitiveIterator; import static io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator.MS_PER_DAY; @@ -37,7 +31,7 @@ public final class DefaultChunkReadingFactory implements ChunkReadingFactory { @Override public ChunkReader extractChunkFromInputStream(StreamReaderOptions options, int factor, - ChunkTypeInfo typeInfo) throws IOException { + ChunkTypeInfo typeInfo) { // TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats switch (typeInfo.chunkType()) { case Boolean: @@ -93,17 +87,11 @@ public ChunkReader extractChunkFromInputStream(StreamReaderOptions options, int (buf, off, len) -> Arrays.copyOfRange(buf, off, off + len), outChunk, outOffset, totalRows); } else { - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> VarListChunkInputStreamGenerator.extractChunkFromInputStream(options, - typeInfo, - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows, this); + return new VarListChunkReader<>(options, typeInfo, this); } } if (Vector.class.isAssignableFrom(typeInfo.type())) { - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> VectorChunkInputStreamGenerator.extractChunkFromInputStream(options, - typeInfo, fieldNodeIter, bufferInfoIter, - is, outChunk, outOffset, totalRows, this); + return new VectorChunkReader(options, typeInfo, this); } if (typeInfo.type() == BigInteger.class) { return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java index a1de362f385..470b0b87291 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java @@ -233,95 +233,5 @@ public int drainTo(final OutputStream outputStream) throws IOException { } } - static WritableObjectChunk extractChunkFromInputStream( - final StreamReaderOptions options, - final ChunkReadingFactory.ChunkTypeInfo typeInfo, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows, - ChunkReadingFactory chunkReadingFactory) throws IOException { - - final Class componentType = typeInfo.type().getComponentType(); - final Class innerComponentType = componentType != null ? componentType.getComponentType() : null; - - final ChunkType chunkType; - if (componentType == boolean.class || componentType == Boolean.class) { - // Note: Internally booleans are passed around as bytes, but the wire format is packed bits. - chunkType = ChunkType.Byte; - } else { - chunkType = ChunkType.fromElementType(componentType); - } - - ChunkReader componentReader = chunkReadingFactory.extractChunkFromInputStream( - options, - new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType, - typeInfo.componentArrowField())); - - final FieldNodeInfo nodeInfo = fieldNodeIter.next(); - final long validityBuffer = bufferInfoIter.nextLong(); - final long offsetsBuffer = bufferInfoIter.nextLong(); - - if (nodeInfo.numElements == 0) { - try (final WritableChunk ignored = - componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { - return WritableObjectChunk.makeWritableChunk(nodeInfo.numElements); - } - } - - final WritableObjectChunk chunk; - final int numValidityLongs = (nodeInfo.numElements + 63) / 64; - try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs); - final WritableIntChunk offsets = - WritableIntChunk.makeWritableChunk(nodeInfo.numElements + 1)) { - // Read validity buffer: - int jj = 0; - for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { - isValid.set(jj, is.readLong()); - } - final long valBufRead = jj * 8L; - if (valBufRead < validityBuffer) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); - } - // we support short validity buffers - for (; jj < numValidityLongs; ++jj) { - isValid.set(jj, -1); // -1 is bit-wise representation of all ones - } - // consumed entire validity buffer by here - - // Read offsets: - final long offBufRead = (nodeInfo.numElements + 1L) * Integer.BYTES; - if (offsetsBuffer < offBufRead) { - throw new IllegalStateException("offset buffer is too short for the expected number of elements"); - } - for (int i = 0; i < nodeInfo.numElements + 1; ++i) { - offsets.set(i, is.readInt()); - } - if (offBufRead < offsetsBuffer) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, offsetsBuffer - offBufRead)); - } - - final ArrayExpansionKernel kernel = ArrayExpansionKernel.makeExpansionKernel(chunkType, componentType); - try (final WritableChunk inner = - componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { - chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows); - - long nextValid = 0; - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - if ((ii % 64) == 0) { - nextValid = isValid.get(ii / 64); - } - if ((nextValid & 0x1) == 0x0) { - chunk.set(outOffset + ii, null); - } - nextValid >>= 1; - } - } - } - - return chunk; - } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkReader.java new file mode 100644 index 00000000000..71c294d6387 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkReader.java @@ -0,0 +1,114 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage.chunk; + +import io.deephaven.chunk.ChunkType; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableIntChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.attributes.ChunkPositions; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.extensions.barrage.chunk.array.ArrayExpansionKernel; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.PrimitiveIterator; + +public class VarListChunkReader implements ChunkReader { + private static final String DEBUG_NAME = "VarListChunkReader"; + + private final ArrayExpansionKernel kernel; + private final ChunkReader componentReader; + + public VarListChunkReader(final StreamReaderOptions options, final ChunkReadingFactory.ChunkTypeInfo typeInfo, + ChunkReadingFactory chunkReadingFactory) { + final Class componentType = typeInfo.type().getComponentType(); + final Class innerComponentType = componentType != null ? componentType.getComponentType() : null; + + final ChunkType chunkType; + if (componentType == boolean.class || componentType == Boolean.class) { + // Note: Internally booleans are passed around as bytes, but the wire format is packed bits. + chunkType = ChunkType.Byte; + } else { + chunkType = ChunkType.fromElementType(componentType); + } + kernel = ArrayExpansionKernel.makeExpansionKernel(chunkType, componentType); + + componentReader = chunkReadingFactory.extractChunkFromInputStream( + options, + new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType, + typeInfo.componentArrowField())); + } + + @Override + public WritableObjectChunk read(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next(); + final long validityBuffer = bufferInfoIter.nextLong(); + final long offsetsBuffer = bufferInfoIter.nextLong(); + + if (nodeInfo.numElements == 0) { + try (final WritableChunk ignored = + componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + return WritableObjectChunk.makeWritableChunk(nodeInfo.numElements); + } + } + + final WritableObjectChunk chunk; + final int numValidityLongs = (nodeInfo.numElements + 63) / 64; + try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs); + final WritableIntChunk offsets = + WritableIntChunk.makeWritableChunk(nodeInfo.numElements + 1)) { + // Read validity buffer: + int jj = 0; + for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { + isValid.set(jj, is.readLong()); + } + final long valBufRead = jj * 8L; + if (valBufRead < validityBuffer) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); + } + // we support short validity buffers + for (; jj < numValidityLongs; ++jj) { + isValid.set(jj, -1); // -1 is bit-wise representation of all ones + } + // consumed entire validity buffer by here + + // Read offsets: + final long offBufRead = (nodeInfo.numElements + 1L) * Integer.BYTES; + if (offsetsBuffer < offBufRead) { + throw new IllegalStateException("offset buffer is too short for the expected number of elements"); + } + for (int i = 0; i < nodeInfo.numElements + 1; ++i) { + offsets.set(i, is.readInt()); + } + if (offBufRead < offsetsBuffer) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, offsetsBuffer - offBufRead)); + } + + try (final WritableChunk inner = + componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows); + + long nextValid = 0; + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + if ((ii % 64) == 0) { + nextValid = isValid.get(ii / 64); + } + if ((nextValid & 0x1) == 0x0) { + chunk.set(outOffset + ii, null); + } + nextValid >>= 1; + } + } + } + + return chunk; + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java index 5ee91971c5f..620ae84541f 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java @@ -232,91 +232,4 @@ public int drainTo(final OutputStream outputStream) throws IOException { return LongSizedDataStructure.intSize(DEBUG_NAME, bytesWritten); } } - - static WritableObjectChunk, Values> extractChunkFromInputStream( - final StreamReaderOptions options, - final ChunkReadingFactory.ChunkTypeInfo typeInfo, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows, - ChunkReadingFactory chunkReadingFactory) throws IOException { - - final Class componentType = - VectorExpansionKernel.getComponentType(typeInfo.type(), typeInfo.componentType()); - final ChunkType chunkType = ChunkType.fromElementType(componentType); - ChunkReader componentReader = chunkReadingFactory.extractChunkFromInputStream( - options, - new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, componentType.getComponentType(), - typeInfo.componentArrowField())); - - final FieldNodeInfo nodeInfo = fieldNodeIter.next(); - final long validityBuffer = bufferInfoIter.nextLong(); - final long offsetsBuffer = bufferInfoIter.nextLong(); - - - if (nodeInfo.numElements == 0) { - try (final WritableChunk ignored = - componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { - if (outChunk != null) { - return outChunk.asWritableObjectChunk(); - } - return WritableObjectChunk.makeWritableChunk(totalRows); - } - } - - final WritableObjectChunk, Values> chunk; - final int numValidityLongs = (nodeInfo.numElements + 63) / 64; - try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs); - final WritableIntChunk offsets = - WritableIntChunk.makeWritableChunk(nodeInfo.numElements + 1)) { - // Read validity buffer: - int jj = 0; - for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { - isValid.set(jj, is.readLong()); - } - final long valBufRead = jj * 8L; - if (valBufRead < validityBuffer) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); - } - // we support short validity buffers - for (; jj < numValidityLongs; ++jj) { - isValid.set(jj, -1); // -1 is bit-wise representation of all ones - } - // consumed entire validity buffer by here - - // Read offsets: - final long offBufRead = (nodeInfo.numElements + 1L) * Integer.BYTES; - if (offsetsBuffer < offBufRead) { - throw new IllegalStateException("offset buffer is too short for the expected number of elements"); - } - for (int i = 0; i < nodeInfo.numElements + 1; ++i) { - offsets.set(i, is.readInt()); - } - if (offBufRead < offsetsBuffer) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, offsetsBuffer - offBufRead)); - } - - final VectorExpansionKernel kernel = VectorExpansionKernel.makeExpansionKernel(chunkType, componentType); - try (final WritableChunk inner = - componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { - chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows); - - long nextValid = 0; - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - if ((ii % 64) == 0) { - nextValid = isValid.get(ii / 64); - } - if ((nextValid & 0x1) == 0x0) { - chunk.set(outOffset + ii, null); - } - nextValid >>= 1; - } - } - } - - return chunk; - } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkReader.java new file mode 100644 index 00000000000..5ed1f9b36e5 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkReader.java @@ -0,0 +1,110 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage.chunk; + +import io.deephaven.chunk.ChunkType; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableIntChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.attributes.ChunkPositions; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.extensions.barrage.chunk.vector.VectorExpansionKernel; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; +import io.deephaven.vector.Vector; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.PrimitiveIterator; + +class VectorChunkReader implements ChunkReader { + private static final String DEBUG_NAME = "VectorChunkReader"; + private final ChunkReader componentReader; + private final VectorExpansionKernel kernel; + + public VectorChunkReader(final StreamReaderOptions options, final ChunkReadingFactory.ChunkTypeInfo typeInfo, + ChunkReadingFactory chunkReadingFactory) { + + final Class componentType = + VectorExpansionKernel.getComponentType(typeInfo.type(), typeInfo.componentType()); + final ChunkType chunkType = ChunkType.fromElementType(componentType); + componentReader = chunkReadingFactory.extractChunkFromInputStream( + options, + new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, componentType.getComponentType(), + typeInfo.componentArrowField())); + kernel = VectorExpansionKernel.makeExpansionKernel(chunkType, componentType); + } + + @Override + public WritableObjectChunk, Values> read(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next(); + final long validityBuffer = bufferInfoIter.nextLong(); + final long offsetsBuffer = bufferInfoIter.nextLong(); + + if (nodeInfo.numElements == 0) { + try (final WritableChunk ignored = + componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + if (outChunk != null) { + return outChunk.asWritableObjectChunk(); + } + return WritableObjectChunk.makeWritableChunk(totalRows); + } + } + + final WritableObjectChunk, Values> chunk; + final int numValidityLongs = (nodeInfo.numElements + 63) / 64; + try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs); + final WritableIntChunk offsets = + WritableIntChunk.makeWritableChunk(nodeInfo.numElements + 1)) { + // Read validity buffer: + int jj = 0; + for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { + isValid.set(jj, is.readLong()); + } + final long valBufRead = jj * 8L; + if (valBufRead < validityBuffer) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); + } + // we support short validity buffers + for (; jj < numValidityLongs; ++jj) { + isValid.set(jj, -1); // -1 is bit-wise representation of all ones + } + // consumed entire validity buffer by here + + // Read offsets: + final long offBufRead = (nodeInfo.numElements + 1L) * Integer.BYTES; + if (offsetsBuffer < offBufRead) { + throw new IllegalStateException("offset buffer is too short for the expected number of elements"); + } + for (int i = 0; i < nodeInfo.numElements + 1; ++i) { + offsets.set(i, is.readInt()); + } + if (offBufRead < offsetsBuffer) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, offsetsBuffer - offBufRead)); + } + + try (final WritableChunk inner = + componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows); + + long nextValid = 0; + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + if ((ii % 64) == 0) { + nextValid = isValid.get(ii / 64); + } + if ((nextValid & 0x1) == 0x0) { + chunk.set(outOffset + ii, null); + } + nextValid >>= 1; + } + } + } + + return chunk; + } +}