Skip to content

Commit

Permalink
Share PyArrow table data with the server through ByteBuffer (deephave…
Browse files Browse the repository at this point in the history
…n#4936)

* Use ByteBuffer to receive data from Python

* Batch load arrow record batches

* Sync with changes to JPY

* Add final modifier

* Add warning comments for memory safety risk

* Improve warning comments

* Fix the format problems of the comments
  • Loading branch information
jmao-denver authored Jan 10, 2024
1 parent bfb1644 commit 4e783a1
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ public class ArrowToTableConverter {

private volatile boolean completed = false;

private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ipcMessage) throws IOException {
private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final ByteBuffer bb) throws IOException {
final BarrageProtoUtil.MessageInfo mi = new BarrageProtoUtil.MessageInfo();

final ByteBuffer bb = ByteBuffer.wrap(ipcMessage);
bb.order(ByteOrder.LITTLE_ENDIAN);
final int continuation = bb.getInt();
final int metadata_size = bb.getInt();
Expand All @@ -70,7 +69,10 @@ private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ip
}

@ScriptApi
public synchronized void setSchema(final byte[] ipcMessage) {
public synchronized void setSchema(final ByteBuffer ipcMessage) {
// The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the
// return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy
// the data out of the input ByteBuffer to use after the return of this method.
if (completed) {
throw new IllegalStateException("Conversion is complete; cannot process additional messages");
}
Expand All @@ -82,7 +84,20 @@ public synchronized void setSchema(final byte[] ipcMessage) {
}

@ScriptApi
public synchronized void addRecordBatch(final byte[] ipcMessage) {
public synchronized void addRecordBatches(final ByteBuffer... ipcMessages) {
// The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the
// return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy
// the data out of the input ByteBuffer to use after the return of this method.
for (final ByteBuffer ipcMessage : ipcMessages) {
addRecordBatch(ipcMessage);
}
}

@ScriptApi
public synchronized void addRecordBatch(final ByteBuffer ipcMessage) {
// The input ByteBuffer instance (especially originated from Python) can't be assumed to be valid after the
// return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to copy
// the data out of the input ByteBuffer to use after the return of this method.
if (completed) {
throw new IllegalStateException("Conversion is complete; cannot process additional messages");
}
Expand Down Expand Up @@ -121,6 +136,9 @@ public synchronized void onCompleted() throws InterruptedException {
}

protected void parseSchema(final Schema header) {
// The Schema instance (especially originated from Python) can't be assumed to be valid after the return
// of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need to make a copy of
// the header to use after the return of this method.
if (resultTable != null) {
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Schema evolution not supported");
}
Expand All @@ -139,6 +157,9 @@ protected void parseSchema(final Schema header) {
}

protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, int numColumns) {
// The BarrageProtoUtil.MessageInfo instance (especially originated from Python) can't be assumed to be valid
// after the return of this method. Until https://github.com/jpy-consortium/jpy/issues/126 is resolved, we need
// to make a copy of it to use after the return of this method.
final BarrageMessage msg = new BarrageMessage();
final RecordBatch batch = (RecordBatch) mi.header.header(new RecordBatch());

Expand Down Expand Up @@ -192,7 +213,7 @@ protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, i
return msg;
}

private BarrageProtoUtil.MessageInfo getMessageInfo(byte[] ipcMessage) {
private BarrageProtoUtil.MessageInfo getMessageInfo(ByteBuffer ipcMessage) {
final BarrageProtoUtil.MessageInfo mi;
try {
mi = parseArrowIpcMessage(ipcMessage);
Expand All @@ -201,4 +222,6 @@ private BarrageProtoUtil.MessageInfo getMessageInfo(byte[] ipcMessage) {
}
return mi;
}


}
7 changes: 2 additions & 5 deletions py/server/deephaven/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,10 @@ def to_table(pa_table: pa.Table, cols: List[str] = None) -> Table:
dh_schema = pa.schema(dh_fields)

try:
pa_buffer = dh_schema.serialize()
j_barrage_table_builder.setSchema(dtypes.array(dtypes.byte, pa_buffer))
j_barrage_table_builder.setSchema(jpy.byte_buffer(dh_schema.serialize()))

record_batches = pa_table.to_batches()
for rb in record_batches:
pa_buffer = rb.serialize()
j_barrage_table_builder.addRecordBatch(dtypes.array(dtypes.byte, pa_buffer))
j_barrage_table_builder.addRecordBatches([jpy.byte_buffer(rb.serialize()) for rb in record_batches])
j_barrage_table_builder.onCompleted()

return Table(j_table=j_barrage_table_builder.getResultTable())
Expand Down
1 change: 1 addition & 0 deletions py/server/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from deephaven.table import Table
from tests.testbase import BaseTestCase


class ArrowTestCase(BaseTestCase):
test_table: Table

Expand Down

0 comments on commit 4e783a1

Please sign in to comment.