Skip to content

Commit

Permalink
Commit #2, mostly mechanical changes, splitting creation and reading
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Jun 26, 2024
1 parent c746c34 commit 75a076b
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ static WritableChunk<Values> extractChunkFromInputStream(
}

@Deprecated
static WritableChunk<Values> extractChunkFromInputStream(
private static WritableChunk<Values> extractChunkFromInputStream(
final StreamReaderOptions options,
final int factor,
final ChunkType chunkType, final Class<?> type, final Class<?> componentType,
Expand All @@ -206,8 +206,8 @@ static WritableChunk<Values> extractChunkFromInputStream(
final DataInput is,
final WritableChunk<Values> outChunk, final int outOffset, final int totalRows) throws IOException {
return DefaultChunkReadingFactory.INSTANCE.extractChunkFromInputStream(options, factor,
new ChunkReadingFactory.ChunkTypeInfo(chunkType, type, componentType, null), fieldNodeIter,
bufferInfoIter, is, outChunk, outOffset, totalRows);
new ChunkReadingFactory.ChunkTypeInfo(chunkType, type, componentType, null))
.read(fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;

import java.io.DataInput;
import java.io.IOException;
import java.util.Iterator;
import java.util.PrimitiveIterator;

/**
* Consumes Flight/Barrage streams and transforms them into WritableChunks.
*/
public interface ChunkReader {
/**
*
* @param fieldNodeIter
* @param bufferInfoIter
* @param is
* @param outChunk
* @param outOffset
* @param totalRows
* @return
*/
WritableChunk<Values> read(final Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,50 +67,24 @@ public Field componentArrowField() {
* @param options
* @param factor
* @param typeInfo
* @param fieldNodeIter
* @param bufferInfoIter
* @param is
* @param outChunk
* @param outOffset
* @param totalRows
* @return
* @throws IOException
*/
WritableChunk<Values> extractChunkFromInputStream(
ChunkReader extractChunkFromInputStream(
final StreamReaderOptions options,
final int factor,
final ChunkTypeInfo typeInfo,
final Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException;
final ChunkTypeInfo typeInfo) throws IOException;

/**
*
* @param options
* @param typeInfo
* @param fieldNodeIter
* @param bufferInfoIter
* @param is
* @param outChunk
* @param offset
* @param totalRows
* @return
* @throws IOException
*/
default WritableChunk<Values> extractChunkFromInputStream(
final StreamReaderOptions options,
final ChunkTypeInfo typeInfo,
final Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int offset,
final int totalRows) throws IOException {
return extractChunkFromInputStream(options, 1, typeInfo, fieldNodeIter, bufferInfoIter, is, outChunk, offset,
totalRows);
default ChunkReader extractChunkFromInputStream(final StreamReaderOptions options, final ChunkTypeInfo typeInfo)
throws IOException {
return extractChunkFromInputStream(options, 1, typeInfo);
}

}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,6 @@ static <T> WritableObjectChunk<T, Values> extractChunkFromInputStream(
final int totalRows,
ChunkReadingFactory chunkReadingFactory) throws IOException {

final FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.nextLong();
final long offsetsBuffer = bufferInfoIter.nextLong();

final Class<?> componentType = typeInfo.type().getComponentType();
final Class<?> innerComponentType = componentType != null ? componentType.getComponentType() : null;

Expand All @@ -259,13 +255,18 @@ static <T> WritableObjectChunk<T, Values> extractChunkFromInputStream(
chunkType = ChunkType.fromElementType(componentType);
}

ChunkReader componentReader = chunkReadingFactory.extractChunkFromInputStream(
options,
new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType,
typeInfo.componentArrowField()));

final FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.nextLong();
final long offsetsBuffer = bufferInfoIter.nextLong();

if (nodeInfo.numElements == 0) {
try (final WritableChunk<Values> ignored = chunkReadingFactory.extractChunkFromInputStream(
options,
new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType,
typeInfo.componentArrowField()),
fieldNodeIter,
bufferInfoIter, is, null, 0, 0)) {
try (final WritableChunk<Values> ignored =
componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
return WritableObjectChunk.makeWritableChunk(nodeInfo.numElements);
}
}
Expand Down Expand Up @@ -303,11 +304,8 @@ static <T> WritableObjectChunk<T, Values> extractChunkFromInputStream(
}

final ArrayExpansionKernel kernel = ArrayExpansionKernel.makeExpansionKernel(chunkType, componentType);
try (final WritableChunk<Values> inner = chunkReadingFactory.extractChunkFromInputStream(
options,
new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType,
typeInfo.componentArrowField()),
fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
try (final WritableChunk<Values> inner =
componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows);

long nextValid = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,22 +244,22 @@ static WritableObjectChunk<Vector<?>, Values> extractChunkFromInputStream(
final int totalRows,
ChunkReadingFactory chunkReadingFactory) throws IOException {

final Class<?> componentType =
VectorExpansionKernel.getComponentType(typeInfo.type(), typeInfo.componentType());
final ChunkType chunkType = ChunkType.fromElementType(componentType);
ChunkReader componentReader = chunkReadingFactory.extractChunkFromInputStream(
options,
new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, componentType.getComponentType(),
typeInfo.componentArrowField()));

final FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.nextLong();
final long offsetsBuffer = bufferInfoIter.nextLong();

final Class<?> componentType =
VectorExpansionKernel.getComponentType(typeInfo.type(), typeInfo.componentType());
final ChunkType chunkType = ChunkType.fromElementType(componentType);

if (nodeInfo.numElements == 0) {
try (final WritableChunk<Values> ignored = chunkReadingFactory.extractChunkFromInputStream(
options,
new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, componentType.getComponentType(),
typeInfo.componentArrowField()),
fieldNodeIter, bufferInfoIter,
is,
null, 0, 0)) {
try (final WritableChunk<Values> ignored =
componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
if (outChunk != null) {
return outChunk.asWritableObjectChunk();
}
Expand Down Expand Up @@ -300,13 +300,8 @@ static WritableObjectChunk<Vector<?>, Values> extractChunkFromInputStream(
}

final VectorExpansionKernel kernel = VectorExpansionKernel.makeExpansionKernel(chunkType, componentType);
try (final WritableChunk<Values> inner = chunkReadingFactory.extractChunkFromInputStream(
options,
new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, componentType.getComponentType(),
typeInfo.componentArrowField()),
fieldNodeIter, bufferInfoIter,
is,
null, 0, 0)) {
try (final WritableChunk<Values> inner =
componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows);

long nextValid = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.chunk.ChunkReader;
import io.deephaven.extensions.barrage.chunk.ChunkReadingFactory;
import io.deephaven.extensions.barrage.chunk.DefaultChunkReadingFactory;
import io.deephaven.extensions.barrage.table.BarrageTable;
Expand Down Expand Up @@ -198,11 +199,10 @@ protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, i
msg.addColumnData[ci].data = new ArrayList<>();
final int factor = (columnConversionFactors == null) ? 1 : columnConversionFactors[ci];
try {
acd.data.add(DefaultChunkReadingFactory.INSTANCE.extractChunkFromInputStream(options, factor,
ChunkReader reader = DefaultChunkReadingFactory.INSTANCE.extractChunkFromInputStream(options, factor,
new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[ci], columnTypes[ci], componentTypes[ci],
schema.fields(ci)),
fieldNodeIter,
bufferInfoIter, mi.inputStream, null, 0, 0));
schema.fields(ci)));
acd.data.add(reader.read(fieldNodeIter, bufferInfoIter, mi.inputStream, null, 0, 0));
} catch (final IOException unexpected) {
throw new UncheckedDeephavenException(unexpected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.impl.util.*;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.chunk.ChunkReader;
import io.deephaven.extensions.barrage.chunk.ChunkReadingFactory;
import io.deephaven.extensions.barrage.chunk.DefaultChunkReadingFactory;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.ChunkType;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import org.apache.arrow.flatbuf.Field;
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.flatbuf.RecordBatch;
Expand All @@ -37,6 +39,7 @@
import java.util.Arrays;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.PrimitiveIterator;
import java.util.function.LongConsumer;

Expand All @@ -54,10 +57,10 @@ public class BarrageStreamReader implements StreamReader {
private long numModRowsRead = 0;
private long numModRowsTotal = 0;

private Schema schema;
private BarrageMessage msg = null;

private final ChunkReadingFactory chunkReadingFactory = DefaultChunkReadingFactory.INSTANCE;
private final List<ChunkReader> readers = new ArrayList<>();

public BarrageStreamReader(final LongConsumer deserializeTmConsumer) {
this.deserializeTmConsumer = deserializeTmConsumer;
Expand Down Expand Up @@ -244,12 +247,8 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options,
}

// fill the chunk with data and assign back into the array
acd.data.set(lastChunkIndex,
chunkReadingFactory.extractChunkFromInputStream(options,
new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[ci],
columnTypes[ci], componentTypes[ci], schema.fields(ci)),
fieldNodeIter, bufferInfoIter, ois,
chunk, chunk.size(), (int) batch.length()));
acd.data.set(lastChunkIndex, readers.get(ci).read(fieldNodeIter, bufferInfoIter, ois, chunk,
chunk.size(), (int) batch.length()));
chunk.setSize(chunk.size() + (int) batch.length());
}
numAddRowsRead += batch.length();
Expand Down Expand Up @@ -277,12 +276,8 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options,
}

// fill the chunk with data and assign back into the array
mcd.data.set(lastChunkIndex,
chunkReadingFactory.extractChunkFromInputStream(options,
new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[ci],
columnTypes[ci], componentTypes[ci], null),
fieldNodeIter, bufferInfoIter, ois,
chunk, chunk.size(), numRowsToRead));
mcd.data.set(lastChunkIndex, readers.get(ci).read(fieldNodeIter, bufferInfoIter, ois, chunk,
chunk.size(), numRowsToRead));
chunk.setSize(chunk.size() + numRowsToRead);
}
numModRowsRead += batch.length();
Expand All @@ -292,7 +287,15 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options,

if (header != null && header.headerType() == MessageHeader.Schema) {
// there is no body and our clients do not want to see schema messages
this.schema = (Schema) header.header(new Schema());
Schema schema = new Schema();
header.header(schema);
for (int i = 0; i < schema.fieldsLength(); i++) {
Field field = schema.fields(i);
ChunkReader chunkReader = chunkReadingFactory.extractChunkFromInputStream(options,
new ChunkReadingFactory.ChunkTypeInfo(columnChunkTypes[i], columnTypes[i],
componentTypes[i], field));
readers.add(chunkReader);
}
return null;
}

Expand Down

0 comments on commit 75a076b

Please sign in to comment.