Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flight: Disable Batching Static Tables #4985

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.protobuf.ByteStringAccess;
import com.google.rpc.Code;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.base.ArrayUtil;
import io.deephaven.base.ClassUtil;
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
Expand Down Expand Up @@ -78,9 +79,10 @@ public class BarrageUtil {
Configuration.getInstance().getDoubleForClassWithDefault(BarrageUtil.class,
"targetSnapshotPercentage", 0.25);

// TODO (deephaven-core#188): drop this default to 50k once the jsapi can handle many batches
public static final long MIN_SNAPSHOT_CELL_COUNT =
Configuration.getInstance().getLongForClassWithDefault(BarrageUtil.class,
"minSnapshotCellCount", 50000);
"minSnapshotCellCount", Long.MAX_VALUE);
public static final long MAX_SNAPSHOT_CELL_COUNT =
Configuration.getInstance().getLongForClassWithDefault(BarrageUtil.class,
"maxSnapshotCellCount", Long.MAX_VALUE);
Expand Down Expand Up @@ -704,22 +706,18 @@ public static void createAndSendStaticSnapshot(
try (final RowSequence.Iterator rsIt = targetViewport.getRowSequenceIterator()) {
while (rsIt.hasMore()) {
// compute the next range to snapshot
final long cellCount =
Math.max(MIN_SNAPSHOT_CELL_COUNT,
Math.min(snapshotTargetCellCount, MAX_SNAPSHOT_CELL_COUNT));
final long cellCount = Math.max(
MIN_SNAPSHOT_CELL_COUNT, Math.min(snapshotTargetCellCount, MAX_SNAPSHOT_CELL_COUNT));
final long numRows = Math.min(Math.max(1, cellCount / columnCount), ArrayUtil.MAX_ARRAY_SIZE);

final RowSequence snapshotPartialViewport = rsIt.getNextRowSequenceWithLength(cellCount);
final RowSequence snapshotPartialViewport = rsIt.getNextRowSequenceWithLength(numRows);
// add these ranges to the running total
snapshotPartialViewport.forEachRowKeyRange((start, end) -> {
snapshotViewport.insertRange(start, end);
return true;
});
snapshotPartialViewport.forAllRowKeyRanges(snapshotViewport::insertRange);

// grab the snapshot and measure elapsed time for next projections
long start = System.nanoTime();
final BarrageMessage msg =
ConstructSnapshot.constructBackplaneSnapshotInPositionSpace(log, table,
columns, snapshotPartialViewport, null);
final BarrageMessage msg = ConstructSnapshot.constructBackplaneSnapshotInPositionSpace(
log, table, columns, snapshotPartialViewport, null);
msg.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS; // no mod column data for DoGet
long elapsed = System.nanoTime() - start;
// accumulate snapshot time in the metrics
Expand All @@ -741,7 +739,7 @@ public static void createAndSendStaticSnapshot(
}
}

if (msg.rowsIncluded.size() > 0) {
if (!msg.rowsIncluded.isEmpty()) {
// very simplistic logic to take the last snapshot and extrapolate max
// number of rows that will not exceed the target UGP processing time
// percentage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import static org.assertj.core.api.Assertions.assertThat;

public class DeephavenFlightSessionTest extends DeephavenFlightSessionTestBase {
public static <T extends TableOperations<T, T>> T i32768(TableCreator<T> c) {
return c.emptyTable(32768).view("I=i");
public static <T extends TableOperations<T, T>> T i132768(TableCreator<T> c) {
return c.emptyTable(132768).view("I=i");
}

@Test
public void getSchema() throws Exception {
final TableSpec table = i32768(TableCreatorImpl.INSTANCE);
final TableSpec table = i132768(TableCreatorImpl.INSTANCE);
try (final TableHandle handle = flightSession.session().execute(table)) {
final Schema schema = flightSession.schema(handle.export());
final Schema expected = new Schema(Collections.singletonList(
Expand All @@ -46,14 +46,17 @@ public void getSchema() throws Exception {

@Test
public void getStream() throws Exception {
final TableSpec table = i32768(TableCreatorImpl.INSTANCE);
final TableSpec table = i132768(TableCreatorImpl.INSTANCE);
try (final TableHandle handle = flightSession.session().execute(table);
final FlightStream stream = flightSession.stream(handle)) {
int numRows = 0;
int flightCount = 0;
while (stream.next()) {
++flightCount;
numRows += stream.getRoot().getRowCount();
}
Assert.assertEquals(32768, numRows);
Assert.assertEquals(1, flightCount);
Assert.assertEquals(132768, numRows);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageMessageWrapper;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSnapshotOptions;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSnapshotRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageUpdateMetadata;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.ColumnConversionMode;
import io.deephaven.web.client.api.Callbacks;
Expand All @@ -36,6 +35,7 @@
import io.deephaven.web.client.api.barrage.stream.BiDiStream;
import io.deephaven.web.client.fu.JsLog;
import io.deephaven.web.client.state.ClientTableState;
import io.deephaven.web.shared.data.Range;
import io.deephaven.web.shared.data.TableSnapshot;
import jsinterop.annotations.JsMethod;
import jsinterop.annotations.JsNullable;
Expand All @@ -45,6 +45,7 @@
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;

import static io.deephaven.web.client.api.barrage.WebBarrageUtils.makeUint8ArrayFromBitset;
import static io.deephaven.web.client.api.barrage.WebBarrageUtils.serializeRanges;
Expand Down Expand Up @@ -320,14 +321,14 @@ public Promise<TableData> snapshot(JsRangeSet rows, Column[] columns) {
new FlightData());

Builder doGetRequest = new Builder(1024);
double columnsOffset = BarrageSubscriptionRequest.createColumnsVector(doGetRequest,
double columnsOffset = BarrageSnapshotRequest.createColumnsVector(doGetRequest,
makeUint8ArrayFromBitset(columnBitset));
double viewportOffset = BarrageSubscriptionRequest.createViewportVector(doGetRequest, serializeRanges(
double viewportOffset = BarrageSnapshotRequest.createViewportVector(doGetRequest, serializeRanges(
Collections.singleton(rows.getRange())));
double serializationOptionsOffset = BarrageSnapshotOptions
.createBarrageSnapshotOptions(doGetRequest, ColumnConversionMode.Stringify, true, 0, 0);
double tableTicketOffset =
BarrageSubscriptionRequest.createTicketVector(doGetRequest, state.getHandle().getTicket());
BarrageSnapshotRequest.createTicketVector(doGetRequest, state.getHandle().getTicket());
BarrageSnapshotRequest.startBarrageSnapshotRequest(doGetRequest);
BarrageSnapshotRequest.addTicket(doGetRequest, tableTicketOffset);
BarrageSnapshotRequest.addColumns(doGetRequest, columnsOffset);
Expand Down Expand Up @@ -365,7 +366,24 @@ public Promise<TableData> snapshot(JsRangeSet rows, Column[] columns) {
WebBarrageUtils.typedArrayToLittleEndianByteBuffer(flightData.getDataBody_asU8()), update,
true,
columnTypes);
callback.onSuccess(snapshot);

// TODO deephaven-core(#188) this check no longer makes sense
Iterator<Range> rangeIterator = rows.getRange().rangeIterator();
long expectedCount = 0;
while (rangeIterator.hasNext()) {
Range range = rangeIterator.next();
if (range.getFirst() >= snapshot.getTableSize()) {
break;
}
long end = Math.min(range.getLast(), snapshot.getTableSize());
expectedCount += end - range.getFirst() + 1;
}
if (expectedCount != snapshot.getIncludedRows().size()) {
callback.onFailure("Server did not send expected number of rows, expected " + expectedCount
+ ", actual " + snapshot.getIncludedRows().size());
} else {
callback.onSuccess(snapshot);
}
});
stream.onStatus(status -> {
if (!status.isOk()) {
Expand Down
Loading