Skip to content

Commit

Permalink
Flight: Disable Batching Static Tables
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Dec 22, 2023
1 parent 01b6cf4 commit c52a739
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.protobuf.ByteStringAccess;
import com.google.rpc.Code;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.base.ArrayUtil;
import io.deephaven.base.ClassUtil;
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
Expand Down Expand Up @@ -78,9 +79,10 @@ public class BarrageUtil {
Configuration.getInstance().getDoubleForClassWithDefault(BarrageUtil.class,
"targetSnapshotPercentage", 0.25);

// TODO (deephaven-core#188): drop this default to 50k once the jsapi can handle many batches
public static final long MIN_SNAPSHOT_CELL_COUNT =
Configuration.getInstance().getLongForClassWithDefault(BarrageUtil.class,
"minSnapshotCellCount", 50000);
"minSnapshotCellCount", Long.MAX_VALUE);
public static final long MAX_SNAPSHOT_CELL_COUNT =
Configuration.getInstance().getLongForClassWithDefault(BarrageUtil.class,
"maxSnapshotCellCount", Long.MAX_VALUE);
Expand Down Expand Up @@ -704,22 +706,18 @@ public static void createAndSendStaticSnapshot(
try (final RowSequence.Iterator rsIt = targetViewport.getRowSequenceIterator()) {
while (rsIt.hasMore()) {
// compute the next range to snapshot
final long cellCount =
Math.max(MIN_SNAPSHOT_CELL_COUNT,
Math.min(snapshotTargetCellCount, MAX_SNAPSHOT_CELL_COUNT));
final long cellCount = Math.max(
MIN_SNAPSHOT_CELL_COUNT, Math.min(snapshotTargetCellCount, MAX_SNAPSHOT_CELL_COUNT));
final long numRows = Math.min(Math.max(1, cellCount / columnCount), ArrayUtil.MAX_ARRAY_SIZE);

final RowSequence snapshotPartialViewport = rsIt.getNextRowSequenceWithLength(cellCount);
final RowSequence snapshotPartialViewport = rsIt.getNextRowSequenceWithLength(numRows);
// add these ranges to the running total
snapshotPartialViewport.forEachRowKeyRange((start, end) -> {
snapshotViewport.insertRange(start, end);
return true;
});
snapshotPartialViewport.forAllRowKeyRanges(snapshotViewport::insertRange);

// grab the snapshot and measure elapsed time for next projections
long start = System.nanoTime();
final BarrageMessage msg =
ConstructSnapshot.constructBackplaneSnapshotInPositionSpace(log, table,
columns, snapshotPartialViewport, null);
final BarrageMessage msg = ConstructSnapshot.constructBackplaneSnapshotInPositionSpace(
log, table, columns, snapshotPartialViewport, null);
msg.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS; // no mod column data for DoGet
long elapsed = System.nanoTime() - start;
// accumulate snapshot time in the metrics
Expand All @@ -741,7 +739,7 @@ public static void createAndSendStaticSnapshot(
}
}

if (msg.rowsIncluded.size() > 0) {
if (!msg.rowsIncluded.isEmpty()) {
// very simplistic logic to take the last snapshot and extrapolate max
// number of rows that will not exceed the target UGP processing time
// percentage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import static org.assertj.core.api.Assertions.assertThat;

public class DeephavenFlightSessionTest extends DeephavenFlightSessionTestBase {
public static <T extends TableOperations<T, T>> T i32768(TableCreator<T> c) {
return c.emptyTable(32768).view("I=i");
public static <T extends TableOperations<T, T>> T i132768(TableCreator<T> c) {
return c.emptyTable(132768).view("I=i");
}

@Test
public void getSchema() throws Exception {
final TableSpec table = i32768(TableCreatorImpl.INSTANCE);
final TableSpec table = i132768(TableCreatorImpl.INSTANCE);
try (final TableHandle handle = flightSession.session().execute(table)) {
final Schema schema = flightSession.schema(handle.export());
final Schema expected = new Schema(Collections.singletonList(
Expand All @@ -46,14 +46,17 @@ public void getSchema() throws Exception {

@Test
public void getStream() throws Exception {
final TableSpec table = i32768(TableCreatorImpl.INSTANCE);
final TableSpec table = i132768(TableCreatorImpl.INSTANCE);
try (final TableHandle handle = flightSession.session().execute(table);
final FlightStream stream = flightSession.stream(handle)) {
int numRows = 0;
int flightCount = 0;
while (stream.next()) {
++flightCount;
numRows += stream.getRoot().getRowCount();
}
Assert.assertEquals(32768, numRows);
Assert.assertEquals(1, flightCount);
Assert.assertEquals(132768, numRows);
}
}

Expand Down

0 comments on commit c52a739

Please sign in to comment.