From c52a7395a00b196e828b638d1b1a66e660faac0c Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Fri, 22 Dec 2023 16:51:54 -0700 Subject: [PATCH 1/2] Flight: Disable Batching Static Tables --- .../extensions/barrage/util/BarrageUtil.java | 24 +++++++++---------- .../client/DeephavenFlightSessionTest.java | 13 ++++++---- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java index b246c4fa6b4..5ca0e20d027 100755 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java @@ -8,6 +8,7 @@ import com.google.protobuf.ByteStringAccess; import com.google.rpc.Code; import io.deephaven.UncheckedDeephavenException; +import io.deephaven.base.ArrayUtil; import io.deephaven.base.ClassUtil; import io.deephaven.base.verify.Assert; import io.deephaven.configuration.Configuration; @@ -78,9 +79,10 @@ public class BarrageUtil { Configuration.getInstance().getDoubleForClassWithDefault(BarrageUtil.class, "targetSnapshotPercentage", 0.25); + // TODO (deephaven-core#188): drop this default to 50k once the jsapi can handle many batches public static final long MIN_SNAPSHOT_CELL_COUNT = Configuration.getInstance().getLongForClassWithDefault(BarrageUtil.class, - "minSnapshotCellCount", 50000); + "minSnapshotCellCount", Long.MAX_VALUE); public static final long MAX_SNAPSHOT_CELL_COUNT = Configuration.getInstance().getLongForClassWithDefault(BarrageUtil.class, "maxSnapshotCellCount", Long.MAX_VALUE); @@ -704,22 +706,18 @@ public static void createAndSendStaticSnapshot( try (final RowSequence.Iterator rsIt = targetViewport.getRowSequenceIterator()) { while (rsIt.hasMore()) { // compute the next range to snapshot - final long cellCount = - Math.max(MIN_SNAPSHOT_CELL_COUNT, - Math.min(snapshotTargetCellCount, MAX_SNAPSHOT_CELL_COUNT)); + final long cellCount = Math.max( + MIN_SNAPSHOT_CELL_COUNT, Math.min(snapshotTargetCellCount, MAX_SNAPSHOT_CELL_COUNT)); + final long numRows = Math.min(Math.max(1, cellCount / columnCount), ArrayUtil.MAX_ARRAY_SIZE); - final RowSequence snapshotPartialViewport = rsIt.getNextRowSequenceWithLength(cellCount); + final RowSequence snapshotPartialViewport = rsIt.getNextRowSequenceWithLength(numRows); // add these ranges to the running total - snapshotPartialViewport.forEachRowKeyRange((start, end) -> { - snapshotViewport.insertRange(start, end); - return true; - }); + snapshotPartialViewport.forAllRowKeyRanges(snapshotViewport::insertRange); // grab the snapshot and measure elapsed time for next projections long start = System.nanoTime(); - final BarrageMessage msg = - ConstructSnapshot.constructBackplaneSnapshotInPositionSpace(log, table, - columns, snapshotPartialViewport, null); + final BarrageMessage msg = ConstructSnapshot.constructBackplaneSnapshotInPositionSpace( + log, table, columns, snapshotPartialViewport, null); msg.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS; // no mod column data for DoGet long elapsed = System.nanoTime() - start; // accumulate snapshot time in the metrics @@ -741,7 +739,7 @@ public static void createAndSendStaticSnapshot( } } - if (msg.rowsIncluded.size() > 0) { + if (!msg.rowsIncluded.isEmpty()) { // very simplistic logic to take the last snapshot and extrapolate max // number of rows that will not exceed the target UGP processing time // percentage diff --git a/java-client/flight-dagger/src/test/java/io/deephaven/client/DeephavenFlightSessionTest.java b/java-client/flight-dagger/src/test/java/io/deephaven/client/DeephavenFlightSessionTest.java index 53035744866..143ce4a6e04 100644 --- a/java-client/flight-dagger/src/test/java/io/deephaven/client/DeephavenFlightSessionTest.java +++ b/java-client/flight-dagger/src/test/java/io/deephaven/client/DeephavenFlightSessionTest.java @@ -29,13 +29,13 @@ import static org.assertj.core.api.Assertions.assertThat; public class DeephavenFlightSessionTest extends DeephavenFlightSessionTestBase { - public static > T i32768(TableCreator c) { - return c.emptyTable(32768).view("I=i"); + public static > T i132768(TableCreator c) { + return c.emptyTable(132768).view("I=i"); } @Test public void getSchema() throws Exception { - final TableSpec table = i32768(TableCreatorImpl.INSTANCE); + final TableSpec table = i132768(TableCreatorImpl.INSTANCE); try (final TableHandle handle = flightSession.session().execute(table)) { final Schema schema = flightSession.schema(handle.export()); final Schema expected = new Schema(Collections.singletonList( @@ -46,14 +46,17 @@ public void getSchema() throws Exception { @Test public void getStream() throws Exception { - final TableSpec table = i32768(TableCreatorImpl.INSTANCE); + final TableSpec table = i132768(TableCreatorImpl.INSTANCE); try (final TableHandle handle = flightSession.session().execute(table); final FlightStream stream = flightSession.stream(handle)) { int numRows = 0; + int flightCount = 0; while (stream.next()) { + ++flightCount; numRows += stream.getRoot().getRowCount(); } - Assert.assertEquals(32768, numRows); + Assert.assertEquals(1, flightCount); + Assert.assertEquals(132768, numRows); } } From 100e24b9fe1f889ef0d03b3b0bdf06b1607d1696 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 9 Jan 2024 14:39:14 -0600 Subject: [PATCH 2/2] JS API should error if it receives a smaller snapshot than expected --- .../TableViewportSubscription.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java b/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java index 043ec165e5f..2087542f90e 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java @@ -21,7 +21,6 @@ import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageMessageWrapper; import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSnapshotOptions; import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSnapshotRequest; -import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest; import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageUpdateMetadata; import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.ColumnConversionMode; import io.deephaven.web.client.api.Callbacks; @@ -36,6 +35,7 @@ import io.deephaven.web.client.api.barrage.stream.BiDiStream; import io.deephaven.web.client.fu.JsLog; import io.deephaven.web.client.state.ClientTableState; +import io.deephaven.web.shared.data.Range; import io.deephaven.web.shared.data.TableSnapshot; import jsinterop.annotations.JsMethod; import jsinterop.annotations.JsNullable; @@ -45,6 +45,7 @@ import java.util.Arrays; import java.util.BitSet; import java.util.Collections; +import java.util.Iterator; import static io.deephaven.web.client.api.barrage.WebBarrageUtils.makeUint8ArrayFromBitset; import static io.deephaven.web.client.api.barrage.WebBarrageUtils.serializeRanges; @@ -320,14 +321,14 @@ public Promise snapshot(JsRangeSet rows, Column[] columns) { new FlightData()); Builder doGetRequest = new Builder(1024); - double columnsOffset = BarrageSubscriptionRequest.createColumnsVector(doGetRequest, + double columnsOffset = BarrageSnapshotRequest.createColumnsVector(doGetRequest, makeUint8ArrayFromBitset(columnBitset)); - double viewportOffset = BarrageSubscriptionRequest.createViewportVector(doGetRequest, serializeRanges( + double viewportOffset = BarrageSnapshotRequest.createViewportVector(doGetRequest, serializeRanges( Collections.singleton(rows.getRange()))); double serializationOptionsOffset = BarrageSnapshotOptions .createBarrageSnapshotOptions(doGetRequest, ColumnConversionMode.Stringify, true, 0, 0); double tableTicketOffset = - BarrageSubscriptionRequest.createTicketVector(doGetRequest, state.getHandle().getTicket()); + BarrageSnapshotRequest.createTicketVector(doGetRequest, state.getHandle().getTicket()); BarrageSnapshotRequest.startBarrageSnapshotRequest(doGetRequest); BarrageSnapshotRequest.addTicket(doGetRequest, tableTicketOffset); BarrageSnapshotRequest.addColumns(doGetRequest, columnsOffset); @@ -365,7 +366,24 @@ public Promise snapshot(JsRangeSet rows, Column[] columns) { WebBarrageUtils.typedArrayToLittleEndianByteBuffer(flightData.getDataBody_asU8()), update, true, columnTypes); - callback.onSuccess(snapshot); + + // TODO deephaven-core(#188) this check no longer makes sense + Iterator rangeIterator = rows.getRange().rangeIterator(); + long expectedCount = 0; + while (rangeIterator.hasNext()) { + Range range = rangeIterator.next(); + if (range.getFirst() >= snapshot.getTableSize()) { + break; + } + long end = Math.min(range.getLast(), snapshot.getTableSize()); + expectedCount += end - range.getFirst() + 1; + } + if (expectedCount != snapshot.getIncludedRows().size()) { + callback.onFailure("Server did not send expected number of rows, expected " + expectedCount + + ", actual " + snapshot.getIncludedRows().size()); + } else { + callback.onSuccess(snapshot); + } }); stream.onStatus(status -> { if (!status.isOk()) {