Skip to content

Commit

Permalink
Fixes for jsapi and HierarchicalTable
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Nov 15, 2024
1 parent 44cdf93 commit d549d79
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace(
final long clockStep =
callDataSnapshotFunction(System.identityHashCode(logIdentityObject), control, doSnapshot);
final BarrageMessage snapshot = snapshotMsg.getValue();
snapshot.step = snapshot.firstSeq = snapshot.lastSeq = clockStep;
snapshot.firstSeq = snapshot.lastSeq = clockStep;
return snapshot;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static class AddColumnData {

public long firstSeq = -1;
public long lastSeq = -1;
public long step = -1;
/** The size of the table after this update. -1 if unknown. */
public long tableSize = -1;

public boolean isSnapshot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ private void updateSubscriptionsSnapshotAndPropagate() {
}

// prepare updates to propagate
final long maxStep = snapshot != null ? snapshot.step : Long.MAX_VALUE;
final long maxStep = snapshot != null ? snapshot.firstSeq : Long.MAX_VALUE;

int deltaSplitIdx = pendingDeltas.size();
for (; deltaSplitIdx > 0; --deltaSplitIdx) {
Expand Down Expand Up @@ -2037,6 +2037,7 @@ final class ColumnInfo {
propagationRowSet.remove(downstream.rowsRemoved);
downstream.shifted.apply(propagationRowSet);
propagationRowSet.insert(downstream.rowsAdded);
downstream.tableSize = propagationRowSet.size();

return downstream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.TableUpdateListener;
Expand Down Expand Up @@ -92,7 +93,7 @@ HierarchicalTableViewSubscription create(
// region Guarded by snapshot lock
private BitSet columns;
private RowSet rows;
private long lastExpandedSize;
private final WritableRowSet prevKeyspaceViewportRows = RowSetFactory.empty();
// endregion Guarded by snapshot lock

private enum State {
Expand Down Expand Up @@ -282,16 +283,16 @@ private void process() {
return;
}
try {
lastExpandedSize = buildAndSendSnapshot(streamGeneratorFactory, listener, subscriptionOptions, view,
this::recordSnapshotNanos, this::recordWriteMetrics, columns, rows, lastExpandedSize);
buildAndSendSnapshot(streamGeneratorFactory, listener, subscriptionOptions, view,
this::recordSnapshotNanos, this::recordWriteMetrics, columns, rows, prevKeyspaceViewportRows);
} catch (Exception e) {
GrpcUtil.safelyError(listener, errorTransformer.transform(e));
state = State.Done;
}
}
}

private static long buildAndSendSnapshot(
private static void buildAndSendSnapshot(
@NotNull final BarrageStreamGenerator.Factory streamGeneratorFactory,
@NotNull final StreamObserver<BarrageStreamGenerator.MessageView> listener,
@NotNull final BarrageSubscriptionOptions subscriptionOptions,
Expand All @@ -300,7 +301,7 @@ private static long buildAndSendSnapshot(
@NotNull final BarragePerformanceLog.WriteMetricsConsumer writeMetricsConsumer,
@NotNull final BitSet columns,
@NotNull final RowSet rows,
final long lastExpandedSize) {
@NotNull final WritableRowSet prevKeyspaceViewportRows) {
// 1. Grab some schema and snapshot information
final List<ColumnDefinition<?>> columnDefinitions =
view.getHierarchicalTable().getAvailableColumnDefinitions();
Expand All @@ -322,18 +323,20 @@ private static long buildAndSendSnapshot(
columns, rows, destinations);
snapshotNanosConsumer.accept(System.nanoTime() - snapshotStartNanos);

// note that keyspace is identical to position space for HierarchicalTableView snapshots
final RowSet snapshotRows = RowSetFactory.flat(expandedSize);
final RowSet keyspaceViewportRows = rows.intersect(snapshotRows);

// 4. Make and populate a BarrageMessage
final BarrageMessage barrageMessage = new BarrageMessage();
// We don't populate firstSeq, or lastSeq debugging information; they are not relevant to this use case.

barrageMessage.isSnapshot = true;
// We don't populate length, snapshotRowSet, snapshotRowSetIsReversed, or snapshotColumns; they are only set by
// the client.
// We don't populate step, firstSeq, or lastSeq debugging information; they are not relevant to this use case.

barrageMessage.rowsAdded = RowSetFactory.flat(expandedSize);
barrageMessage.rowsIncluded = RowSetFactory.fromRange(rows.firstRowKey(),
Math.min(barrageMessage.rowsAdded.lastRowKey(), rows.lastRowKey()));
barrageMessage.rowsRemoved = RowSetFactory.flat(lastExpandedSize);
barrageMessage.rowsAdded = snapshotRows;
barrageMessage.rowsIncluded = keyspaceViewportRows;
barrageMessage.rowsRemoved = RowSetFactory.empty();
barrageMessage.shifted = RowSetShiftData.EMPTY;
barrageMessage.tableSize = expandedSize;

barrageMessage.addColumnData = new BarrageMessage.AddColumnData[numAvailableColumns];
for (int ci = 0, di = 0; ci < numAvailableColumns; ++ci) {
Expand All @@ -357,15 +360,16 @@ private static long buildAndSendSnapshot(
// 5. Send the BarrageMessage
try (final BarrageStreamGenerator streamGenerator =
streamGeneratorFactory.newGenerator(barrageMessage, writeMetricsConsumer)) {
// Note that we're always specifying "isInitialSnapshot=true". This is to provoke the subscription view to
// send the added rows on every snapshot, since (1) our added rows are flat, and thus cheap to send, and
// (2) we're relying on added rows to signal the full expanded size to the client.
GrpcUtil.safelyOnNext(listener,
streamGenerator.getSubView(subscriptionOptions, true, true, rows, false, rows, rows, columns));
// initialSnapshot flag is ignored for non-growing viewports
final boolean initialSnapshot = false;
final boolean isFullSubscription = false;
GrpcUtil.safelyOnNext(listener, streamGenerator.getSubView(
subscriptionOptions, initialSnapshot, isFullSubscription, rows, false,
prevKeyspaceViewportRows, keyspaceViewportRows, columns));
}

// 6. Let the caller know what the expanded size was
return expandedSize;
prevKeyspaceViewportRows.clear();
prevKeyspaceViewportRows.insert(keyspaceViewportRows);
}

public void setViewport(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ public static class ViewportImpl extends WebBarrageSubscription {
public ViewportImpl(ClientTableState state, ViewportChangedHandler viewportChangedHandler,
DataChangedHandler dataChangedHandler, WebColumnData[] dataSinks) {
super(state, viewportChangedHandler, dataChangedHandler, dataSinks);
serverViewport = RangeSet.empty();
}

@Override
Expand All @@ -481,8 +482,10 @@ public RangeSet getCurrentRowSet() {

@Override
public void applyUpdates(WebBarrageMessage message) {
final BitSet prevServerColumns = serverColumns == null ? null : (BitSet) serverColumns.clone();
lastTableSize = message.tableSize;

final RangeSet prevServerViewport = serverViewport.copy();
if (message.isSnapshot) {
updateServerViewport(message.snapshotRowSet, message.snapshotColumns, message.snapshotRowSetIsReversed);
viewportChangedHandler.onServerViewportChanged(serverViewport, serverColumns, serverReverseViewport);
Expand All @@ -497,23 +500,30 @@ public void applyUpdates(WebBarrageMessage message) {
currentRowSet.removeRange(new Range(newSize, prevSize - 1));
}

if (!message.rowsAdded.isEmpty() || !message.rowsRemoved.isEmpty()) {
for (int ii = 0; ii < message.addColumnData.length; ii++) {
if (!isSubscribedColumn(ii)) {
continue;
}
for (int ii = 0; ii < message.addColumnData.length; ii++) {
final WebBarrageMessage.AddColumnData column = message.addColumnData[ii];
final boolean prevSubscribed = prevServerColumns == null || prevServerColumns.get(ii);
final boolean currSubscribed = serverColumns == null || serverColumns.get(ii);

final WebBarrageMessage.AddColumnData column = message.addColumnData[ii];
for (int j = 0; j < column.data.size(); j++) {
if (!currSubscribed && prevSubscribed && prevSize > 0) {
destSources[ii].applyUpdate(column.data, RangeSet.empty(), RangeSet.ofRange(0, prevSize - 1));
continue;
}

if (!message.rowsAdded.isEmpty() || !message.rowsRemoved.isEmpty()) {
if (prevSubscribed && currSubscribed) {
destSources[ii].applyUpdate(column.data, message.rowsAdded, message.rowsRemoved);
} else if (currSubscribed) {
// this column is a new subscription
destSources[ii].applyUpdate(column.data, message.rowsAdded, RangeSet.empty());
}
}
}

final BitSet modifiedColumnSet = new BitSet(numColumns());
for (int ii = 0; ii < message.modColumnData.length; ii++) {
WebBarrageMessage.ModColumnData column = message.modColumnData[ii];
if (column.rowsModified.isEmpty()) {
if (!isSubscribedColumn(ii) || column.rowsModified.isEmpty()) {
continue;
}

Expand All @@ -525,11 +535,14 @@ public void applyUpdates(WebBarrageMessage message) {
}
}

assert message.tableSize >= 0;
state.setSize(message.tableSize);
dataChangedHandler.onDataChanged(
RangeSet.ofRange(0, currentRowSet.size()),
RangeSet.ofRange(0, prevSize),
RangeSet.empty(), new ShiftedRange[0], modifiedColumnSet);
serverViewport.copy(),
prevServerViewport.copy(),
RangeSet.empty(),
new ShiftedRange[0],
serverColumns == null ? null : (BitSet) serverColumns.clone());
}

@Override
Expand Down

0 comments on commit d549d79

Please sign in to comment.