From d549d794df20affca14645ce47547da2383d1a79 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Fri, 15 Nov 2024 13:11:50 -0700 Subject: [PATCH] Fixes for jsapi and HierarchicalTable --- .../table/impl/remote/ConstructSnapshot.java | 2 +- .../table/impl/util/BarrageMessage.java | 2 +- .../barrage/BarrageMessageProducer.java | 3 +- .../HierarchicalTableViewSubscription.java | 44 ++++++++++--------- .../barrage/data/WebBarrageSubscription.java | 35 ++++++++++----- 5 files changed, 52 insertions(+), 34 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java index d2f8468c089..88d05fdbf92 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java @@ -618,7 +618,7 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace( final long clockStep = callDataSnapshotFunction(System.identityHashCode(logIdentityObject), control, doSnapshot); final BarrageMessage snapshot = snapshotMsg.getValue(); - snapshot.step = snapshot.firstSeq = snapshot.lastSeq = clockStep; + snapshot.firstSeq = snapshot.lastSeq = clockStep; return snapshot; } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BarrageMessage.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BarrageMessage.java index 6b60a57310e..6041925aa6a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BarrageMessage.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/BarrageMessage.java @@ -45,7 +45,7 @@ public static class AddColumnData { public long firstSeq = -1; public long lastSeq = -1; - public long step = -1; + /** The size of the table after this update. -1 if unknown. */ public long tableSize = -1; public boolean isSnapshot; diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index ecf5d73e289..fd86e5b7860 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -1413,7 +1413,7 @@ private void updateSubscriptionsSnapshotAndPropagate() { } // prepare updates to propagate - final long maxStep = snapshot != null ? snapshot.step : Long.MAX_VALUE; + final long maxStep = snapshot != null ? snapshot.firstSeq : Long.MAX_VALUE; int deltaSplitIdx = pendingDeltas.size(); for (; deltaSplitIdx > 0; --deltaSplitIdx) { @@ -2037,6 +2037,7 @@ final class ColumnInfo { propagationRowSet.remove(downstream.rowsRemoved); downstream.shifted.apply(propagationRowSet); propagationRowSet.insert(downstream.rowsAdded); + downstream.tableSize = propagationRowSet.size(); return downstream; } diff --git a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java index fb631c8fbbc..9bcfbc9e142 100644 --- a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java +++ b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java @@ -14,6 +14,7 @@ import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.RowSetShiftData; +import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.TableUpdateListener; @@ -92,7 +93,7 @@ HierarchicalTableViewSubscription create( // region Guarded by snapshot lock private BitSet columns; private RowSet rows; - private long lastExpandedSize; + private final WritableRowSet prevKeyspaceViewportRows = RowSetFactory.empty(); // endregion Guarded by snapshot lock private enum State { @@ -282,8 +283,8 @@ private void process() { return; } try { - lastExpandedSize = buildAndSendSnapshot(streamGeneratorFactory, listener, subscriptionOptions, view, - this::recordSnapshotNanos, this::recordWriteMetrics, columns, rows, lastExpandedSize); + buildAndSendSnapshot(streamGeneratorFactory, listener, subscriptionOptions, view, + this::recordSnapshotNanos, this::recordWriteMetrics, columns, rows, prevKeyspaceViewportRows); } catch (Exception e) { GrpcUtil.safelyError(listener, errorTransformer.transform(e)); state = State.Done; @@ -291,7 +292,7 @@ private void process() { } } - private static long buildAndSendSnapshot( + private static void buildAndSendSnapshot( @NotNull final BarrageStreamGenerator.Factory streamGeneratorFactory, @NotNull final StreamObserver listener, @NotNull final BarrageSubscriptionOptions subscriptionOptions, @@ -300,7 +301,7 @@ private static long buildAndSendSnapshot( @NotNull final BarragePerformanceLog.WriteMetricsConsumer writeMetricsConsumer, @NotNull final BitSet columns, @NotNull final RowSet rows, - final long lastExpandedSize) { + @NotNull final WritableRowSet prevKeyspaceViewportRows) { // 1. Grab some schema and snapshot information final List> columnDefinitions = view.getHierarchicalTable().getAvailableColumnDefinitions(); @@ -322,18 +323,20 @@ private static long buildAndSendSnapshot( columns, rows, destinations); snapshotNanosConsumer.accept(System.nanoTime() - snapshotStartNanos); + // note that keyspace is identical to position space for HierarchicalTableView snapshots + final RowSet snapshotRows = RowSetFactory.flat(expandedSize); + final RowSet keyspaceViewportRows = rows.intersect(snapshotRows); + // 4. Make and populate a BarrageMessage final BarrageMessage barrageMessage = new BarrageMessage(); + // We don't populate firstSeq, or lastSeq debugging information; they are not relevant to this use case. + barrageMessage.isSnapshot = true; - // We don't populate length, snapshotRowSet, snapshotRowSetIsReversed, or snapshotColumns; they are only set by - // the client. - // We don't populate step, firstSeq, or lastSeq debugging information; they are not relevant to this use case. - - barrageMessage.rowsAdded = RowSetFactory.flat(expandedSize); - barrageMessage.rowsIncluded = RowSetFactory.fromRange(rows.firstRowKey(), - Math.min(barrageMessage.rowsAdded.lastRowKey(), rows.lastRowKey())); - barrageMessage.rowsRemoved = RowSetFactory.flat(lastExpandedSize); + barrageMessage.rowsAdded = snapshotRows; + barrageMessage.rowsIncluded = keyspaceViewportRows; + barrageMessage.rowsRemoved = RowSetFactory.empty(); barrageMessage.shifted = RowSetShiftData.EMPTY; + barrageMessage.tableSize = expandedSize; barrageMessage.addColumnData = new BarrageMessage.AddColumnData[numAvailableColumns]; for (int ci = 0, di = 0; ci < numAvailableColumns; ++ci) { @@ -357,15 +360,16 @@ private static long buildAndSendSnapshot( // 5. Send the BarrageMessage try (final BarrageStreamGenerator streamGenerator = streamGeneratorFactory.newGenerator(barrageMessage, writeMetricsConsumer)) { - // Note that we're always specifying "isInitialSnapshot=true". This is to provoke the subscription view to - // send the added rows on every snapshot, since (1) our added rows are flat, and thus cheap to send, and - // (2) we're relying on added rows to signal the full expanded size to the client. - GrpcUtil.safelyOnNext(listener, - streamGenerator.getSubView(subscriptionOptions, true, true, rows, false, rows, rows, columns)); + // initialSnapshot flag is ignored for non-growing viewports + final boolean initialSnapshot = false; + final boolean isFullSubscription = false; + GrpcUtil.safelyOnNext(listener, streamGenerator.getSubView( + subscriptionOptions, initialSnapshot, isFullSubscription, rows, false, + prevKeyspaceViewportRows, keyspaceViewportRows, columns)); } - // 6. Let the caller know what the expanded size was - return expandedSize; + prevKeyspaceViewportRows.clear(); + prevKeyspaceViewportRows.insert(keyspaceViewportRows); } public void setViewport( diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/data/WebBarrageSubscription.java b/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/data/WebBarrageSubscription.java index 97ba53ed071..c7609e25f53 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/data/WebBarrageSubscription.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/data/WebBarrageSubscription.java @@ -467,6 +467,7 @@ public static class ViewportImpl extends WebBarrageSubscription { public ViewportImpl(ClientTableState state, ViewportChangedHandler viewportChangedHandler, DataChangedHandler dataChangedHandler, WebColumnData[] dataSinks) { super(state, viewportChangedHandler, dataChangedHandler, dataSinks); + serverViewport = RangeSet.empty(); } @Override @@ -481,8 +482,10 @@ public RangeSet getCurrentRowSet() { @Override public void applyUpdates(WebBarrageMessage message) { + final BitSet prevServerColumns = serverColumns == null ? null : (BitSet) serverColumns.clone(); lastTableSize = message.tableSize; + final RangeSet prevServerViewport = serverViewport.copy(); if (message.isSnapshot) { updateServerViewport(message.snapshotRowSet, message.snapshotColumns, message.snapshotRowSetIsReversed); viewportChangedHandler.onServerViewportChanged(serverViewport, serverColumns, serverReverseViewport); @@ -497,15 +500,22 @@ public void applyUpdates(WebBarrageMessage message) { currentRowSet.removeRange(new Range(newSize, prevSize - 1)); } - if (!message.rowsAdded.isEmpty() || !message.rowsRemoved.isEmpty()) { - for (int ii = 0; ii < message.addColumnData.length; ii++) { - if (!isSubscribedColumn(ii)) { - continue; - } + for (int ii = 0; ii < message.addColumnData.length; ii++) { + final WebBarrageMessage.AddColumnData column = message.addColumnData[ii]; + final boolean prevSubscribed = prevServerColumns == null || prevServerColumns.get(ii); + final boolean currSubscribed = serverColumns == null || serverColumns.get(ii); - final WebBarrageMessage.AddColumnData column = message.addColumnData[ii]; - for (int j = 0; j < column.data.size(); j++) { + if (!currSubscribed && prevSubscribed && prevSize > 0) { + destSources[ii].applyUpdate(column.data, RangeSet.empty(), RangeSet.ofRange(0, prevSize - 1)); + continue; + } + + if (!message.rowsAdded.isEmpty() || !message.rowsRemoved.isEmpty()) { + if (prevSubscribed && currSubscribed) { destSources[ii].applyUpdate(column.data, message.rowsAdded, message.rowsRemoved); + } else if (currSubscribed) { + // this column is a new subscription + destSources[ii].applyUpdate(column.data, message.rowsAdded, RangeSet.empty()); } } } @@ -513,7 +523,7 @@ public void applyUpdates(WebBarrageMessage message) { final BitSet modifiedColumnSet = new BitSet(numColumns()); for (int ii = 0; ii < message.modColumnData.length; ii++) { WebBarrageMessage.ModColumnData column = message.modColumnData[ii]; - if (column.rowsModified.isEmpty()) { + if (!isSubscribedColumn(ii) || column.rowsModified.isEmpty()) { continue; } @@ -525,11 +535,14 @@ public void applyUpdates(WebBarrageMessage message) { } } + assert message.tableSize >= 0; state.setSize(message.tableSize); dataChangedHandler.onDataChanged( - RangeSet.ofRange(0, currentRowSet.size()), - RangeSet.ofRange(0, prevSize), - RangeSet.empty(), new ShiftedRange[0], modifiedColumnSet); + serverViewport.copy(), + prevServerViewport.copy(), + RangeSet.empty(), + new ShiftedRange[0], + serverColumns == null ? null : (BitSet) serverColumns.clone()); } @Override