Skip to content

Commit

Permalink
fix: Correct rollup and tree snapshot code to wait for missed structu…
Browse files Browse the repository at this point in the history
…ral dependencies (deephaven#6395)

Fixes deephaven#6394
  • Loading branch information
rcaudy authored Nov 20, 2024
1 parent ba53da2 commit acdb6bf
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 2,082 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1010,11 +1010,14 @@ private Collection<KeyTableDirective> extractKeyTableNodeDirectives(
getDefaultExpansionNodeKeys().forEach((final Object nodeKey) -> accumulateKeyTableDirective(
nodeKey, Expand, directivesByNodeKey, orderedDirectives));

try (final RowSet prevRowSet = usePrev ? keyTable.getRowSet().copyPrev() : null) {
final RowSequence rowsToExtract = usePrev ? prevRowSet : keyTable.getRowSet();
final ColumnIterator<?> nodeKeyIter = ChunkedColumnIterator.make(nodeKeySource, rowsToExtract);
final ByteColumnIterator actionIter =
actionSource == null ? null : new ChunkedByteColumnIterator(actionSource, rowsToExtract);
final RowSequence rowsToExtract = usePrev
? keyTable.getRowSet().prev()
: keyTable.getRowSet();
// @formatter:off
try (final ColumnIterator<?> nodeKeyIter = ChunkedColumnIterator.make(nodeKeySource, rowsToExtract);
final ByteColumnIterator actionIter =
actionSource == null ? null : new ChunkedByteColumnIterator(actionSource, rowsToExtract)) {
// @formatter:on
// If no action source is supplied, we default to "Expand"
final Supplier<VisitAction> nextAction = actionIter == null
? () -> Expand
Expand Down Expand Up @@ -1237,11 +1240,11 @@ private void visitExpandedNode(
numChildDirectives = 0;
}

final RowSet prevRows = snapshotState.usePrev ? forExpansion.getRowSet().copyPrev() : null;
final RowSet rowsToVisit = prevRows != null ? prevRows : forExpansion.getRowSet();
final RowSet rowsToVisit = snapshotState.usePrev
? forExpansion.getRowSet().prev()
: forExpansion.getRowSet();
// @formatter:off
try (final SafeCloseable ignored = prevRows;
final RowSequence.Iterator rowsToVisitIter = rowsToVisit.getRowSequenceIterator();
try (final RowSequence.Iterator rowsToVisitIter = rowsToVisit.getRowSequenceIterator();
final NodeFillContext filler = filling
? new NodeFillContext(snapshotState, nodeTableState, rowsToVisit.size())
: null;
Expand Down Expand Up @@ -1275,7 +1278,8 @@ private void visitExpandedNode(
// We're at the start of a contracted range; we need to consume the contracted range
// and expand the first row after if there is one.
final long lastContractedPositionInRange = contractedRowPositionsIter.currentRangeEnd();
final long rangeSize = lastContractedPositionInRange - nextContractedPosition + 1;
final int rangeSize = Math.toIntExact(
lastContractedPositionInRange - nextContractedPosition + 1);
cdi += rangeSize;
nextChildDirective = cdi < numChildDirectives ? childDirectives.get(cdi) : null;
if (rowsToVisit.size() > lastContractedPositionInRange + 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,6 @@ NotificationStepSource[] getSourceDependencies() {

@Override
void maybeWaitForStructuralSatisfaction() {
// NB: It's sufficient to wait for the root node, which is done at the beginning of traversal.
maybeWaitForSatisfaction(getRoot());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ public MemoizedOperationKey getMemoKey() {
/**
* The row identifier source from {@link #source}.
*/
private final ColumnSource idSource;
private final ColumnSource<?> idSource;

/**
* The parent identifier source from {@link #source}.
*/
private final ColumnSource parentIdSource;
private final ColumnSource<?> parentIdSource;

/**
* The eventual listener that maintains {@link #result}.
Expand Down Expand Up @@ -177,16 +177,15 @@ private TreeTableFilter(@NotNull final TreeTableImpl tree, @NotNull final WhereF
}

private void doInitialFilter(@Nullable final OperationSnapshotControl snapshotControl, final boolean usePrev) {
try (final RowSet sourcePrevRows = usePrev ? source.getRowSet().copyPrev() : null) {
final RowSet sourceRows = usePrev ? sourcePrevRows : source.getRowSet();
final RowSet sourceRows = usePrev
? source.getRowSet().prev()
: source.getRowSet();
matchedSourceRows = filterValues(usePrev, sourceRows, sourceRows);
parentIdToChildRows = new HashMap<>(matchedSourceRows.intSize("parentReferences"));
ancestorSourceRows = computeParents(usePrev, matchedSourceRows);
resultRows = matchedSourceRows.union(ancestorSourceRows).toTracking();

matchedSourceRows = filterValues(usePrev, sourceRows, sourceRows);
parentIdToChildRows = new HashMap<>(matchedSourceRows.intSize("parentReferences"));
ancestorSourceRows = computeParents(usePrev, matchedSourceRows);
resultRows = matchedSourceRows.union(ancestorSourceRows).toTracking();

validateState(usePrev, sourceRows);
}
validateState(usePrev, sourceRows);

result = source.getSubTable(resultRows);
if (snapshotControl != null) {
Expand Down Expand Up @@ -262,6 +261,7 @@ private void validateState(final boolean usePrev, @NotNull final RowSet sourceRo
});
}
}
// noinspection resource
childRowsToProcess = newParentKeys.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ boolean nodeIdExpandable(@NotNull final SnapshotStateImpl snapshotState, final l

@Override
NotificationStepSource[] getSourceDependencies() {
// NB: The reverse lookup may be derived from an unfiltered parent of our source, hence we need to treat it as a
// The reverse lookup may be derived from an unfiltered parent of our source, hence we need to treat it as a
// separate dependency if we're filtered.
return filtered
? new NotificationStepSource[] {source, sourceRowLookup}
Expand All @@ -423,9 +423,12 @@ NotificationStepSource[] getSourceDependencies() {

@Override
void maybeWaitForStructuralSatisfaction() {
// NB: Our root is just a node in the tree (which is a partitioned table of constituent nodes), so waiting for
// Our root is just a node in the tree (which is a partitioned table of constituent nodes), so waiting for
// satisfaction of the root would be insufficient (and unnecessary if we're waiting for the tree).
maybeWaitForSatisfaction(tree);
if (!filtered) {
// The row lookup aggregation must also be satisfied if we aren't using it as a source dependency.
maybeWaitForSatisfaction(sourceRowLookup);
}
}

}
Loading

0 comments on commit acdb6bf

Please sign in to comment.