Skip to content

Commit

Permalink
jsAPI mostly complete; looking for tree table issue
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Nov 15, 2024
1 parent 6653ca6 commit 44cdf93
Show file tree
Hide file tree
Showing 26 changed files with 729 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,7 @@ private static boolean snapshotAllTable(
@Nullable final RowSet keysToSnapshot) {

snapshot.rowsAdded = (usePrev ? table.getRowSet().prev() : table.getRowSet()).copy();
snapshot.tableSize = snapshot.rowsAdded.size();
snapshot.rowsRemoved = RowSetFactory.empty();
snapshot.addColumnData = new BarrageMessage.AddColumnData[table.getColumnSources().size()];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public static class AddColumnData {
public long firstSeq = -1;
public long lastSeq = -1;
public long step = -1;
public long tableSize = -1;

public boolean isSnapshot;
public RowSet snapshotRowSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public int maxMessageSize() {

@Override
@Default
public int previewListLengthLimit() {
public long previewListLengthLimit() {
return 0;
}

Expand Down Expand Up @@ -111,7 +111,7 @@ default Builder columnConversionMode(ColumnConversionMode columnConversionMode)
* @param previewListLengthLimit the magnitude of the number of elements to include in a preview list
* @return this builder
*/
Builder previewListLengthLimit(int previewListLengthLimit);
Builder previewListLengthLimit(long previewListLengthLimit);

/**
* @return a new BarrageSnapshotOptions instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ private ByteBuffer getSubscriptionMetadata() throws IOException {
BarrageUpdateMetadata.addAddedRowsIncluded(metadata, addedRowsIncludedOffset);
BarrageUpdateMetadata.addModColumnNodes(metadata, nodesOffset);
BarrageUpdateMetadata.addEffectiveReverseViewport(metadata, reverseViewport);
BarrageUpdateMetadata.addTableSize(metadata, message.tableSize);
metadata.finish(BarrageUpdateMetadata.endBarrageUpdateMetadata(metadata));

final FlatBufferBuilder header = new FlatBufferBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public int maxMessageSize() {

@Override
@Default
public int previewListLengthLimit() {
public long previewListLengthLimit() {
return 0;
}

Expand Down Expand Up @@ -162,7 +162,7 @@ default Builder columnConversionMode(ColumnConversionMode columnConversionMode)
* @param previewListLengthLimit the magnitude of the number of elements to include in a preview list
* @return this builder
*/
Builder previewListLengthLimit(int previewListLengthLimit);
Builder previewListLengthLimit(long previewListLengthLimit);

/**
* @return a new BarrageSubscriptionOptions instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public BarrageMessage safelyParseFrom(final StreamReaderOptions options,

msg.firstSeq = metadata.firstSeq();
msg.lastSeq = metadata.lastSeq();
msg.tableSize = metadata.tableSize();
msg.rowsAdded = extractIndex(metadata.addedRowsAsByteBuffer());
msg.rowsRemoved = extractIndex(metadata.removedRowsAsByteBuffer());
final ByteBuffer shiftData = metadata.shiftDataAsByteBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ default boolean columnsAsList() {
* @return the maximum length of any list / array to encode; zero means no limit; negative values indicate to treat
* the limit as a tail instead of a head
*/
default int previewListLengthLimit() {
default long previewListLengthLimit() {
return 0;
}
}
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ commons-text = "1.12.0"
confluent = "7.6.0"
confluent-kafka-clients = "7.6.0-ccs"
dagger = "2.52"
deephaven-barrage = "0.7.0"
deephaven-barrage = "0.7.2"
deephaven-csv = "0.15.0"
deephaven-hash = "0.1.0"
deephaven-suan-shu = "0.1.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public static class AddColumnData {
public long firstSeq = -1;
public long lastSeq = -1;
public long step = -1;
public long tableSize = -1;

public boolean isSnapshot;
public RangeSet snapshotRowSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.chunk.ChunkReader;
import io.deephaven.extensions.barrage.util.FlatBufferIteratorAdapter;
Expand Down Expand Up @@ -103,9 +104,11 @@ public WebBarrageMessage parseFrom(

msg.firstSeq = metadata.firstSeq();
msg.lastSeq = metadata.lastSeq();
msg.tableSize = metadata.tableSize();
msg.rowsAdded = extractIndex(metadata.addedRowsAsByteBuffer());
msg.rowsRemoved = extractIndex(metadata.removedRowsAsByteBuffer());
msg.shifted = extractIndexShiftData(metadata.shiftDataAsByteBuffer());
final ByteBuffer shiftData = metadata.shiftDataAsByteBuffer();
msg.shifted = shiftData != null ? extractIndexShiftData(shiftData) : new ShiftedRange[0];

final ByteBuffer rowsIncluded = metadata.addedRowsIncludedAsByteBuffer();
msg.rowsIncluded = rowsIncluded != null ? extractIndex(rowsIncluded) : msg.rowsAdded;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.chunk.attributes.Values;
import io.deephaven.web.client.api.barrage.WebBarrageMessage;
import io.deephaven.web.client.api.barrage.def.InitialTableDefinition;
import io.deephaven.web.client.api.subscription.SubscriptionType;
import io.deephaven.web.client.state.ClientTableState;
import io.deephaven.web.shared.data.Range;
import io.deephaven.web.shared.data.RangeSet;
Expand Down Expand Up @@ -37,8 +38,11 @@ public abstract class WebBarrageSubscription {
public static final int MAX_MESSAGE_SIZE = 10_000_000;
public static final int BATCH_SIZE = 100_000;

public static WebBarrageSubscription subscribe(ClientTableState cts, ViewportChangedHandler viewportChangedHandler,
DataChangedHandler dataChangedHandler) {
public static WebBarrageSubscription subscribe(
final SubscriptionType subscriptionType,
final ClientTableState cts,
final ViewportChangedHandler viewportChangedHandler,
final DataChangedHandler dataChangedHandler) {

WebColumnData[] dataSinks = new WebColumnData[cts.columnTypes().length];
ChunkType[] chunkTypes = cts.chunkTypes();
Expand Down Expand Up @@ -75,8 +79,11 @@ public static WebBarrageSubscription subscribe(ClientTableState cts, ViewportCha

if (cts.getTableDef().getAttributes().isBlinkTable()) {
return new BlinkImpl(cts, viewportChangedHandler, dataChangedHandler, dataSinks);
} else if (subscriptionType == SubscriptionType.FULL_SUBSCRIPTION) {
return new RedirectedImpl(cts, viewportChangedHandler, dataChangedHandler, dataSinks);
} else {
return new ViewportImpl(cts, viewportChangedHandler, dataChangedHandler, dataSinks);
}
return new RedirectedImpl(cts, viewportChangedHandler, dataChangedHandler, dataSinks);
}

public interface ViewportChangedHandler {
Expand Down Expand Up @@ -109,6 +116,13 @@ protected WebBarrageSubscription(ClientTableState state, ViewportChangedHandler

public abstract void applyUpdates(WebBarrageMessage message);

/**
* @return the current size of the table
*/
public long getCurrentSize() {
return currentRowSet.size();
}

protected void updateServerViewport(RangeSet viewport, BitSet columns, boolean reverseViewport) {
serverViewport = viewport;
serverColumns = columns == null || columns.cardinality() == numColumns() ? null : columns;
Expand Down Expand Up @@ -447,6 +461,90 @@ private void freeRows(RangeSet removed) {
}
}

public static class ViewportImpl extends WebBarrageSubscription {
private long lastTableSize = -1;

public ViewportImpl(ClientTableState state, ViewportChangedHandler viewportChangedHandler,
DataChangedHandler dataChangedHandler, WebColumnData[] dataSinks) {
super(state, viewportChangedHandler, dataChangedHandler, dataSinks);
}

@Override
public long getCurrentSize() {
return lastTableSize;
}

@Override
public RangeSet getCurrentRowSet() {
return RangeSet.ofRange(0, lastTableSize - 1);
}

@Override
public void applyUpdates(WebBarrageMessage message) {
lastTableSize = message.tableSize;

if (message.isSnapshot) {
updateServerViewport(message.snapshotRowSet, message.snapshotColumns, message.snapshotRowSetIsReversed);
viewportChangedHandler.onServerViewportChanged(serverViewport, serverColumns, serverReverseViewport);
}

// Update the currentRowSet; we're guaranteed to be flat
final long prevSize = currentRowSet.size();
final long newSize = prevSize - message.rowsRemoved.size() + message.rowsAdded.size();
if (prevSize < newSize) {
currentRowSet.addRange(new Range(prevSize, newSize - 1));
} else if (prevSize > newSize) {
currentRowSet.removeRange(new Range(newSize, prevSize - 1));
}

if (!message.rowsAdded.isEmpty() || !message.rowsRemoved.isEmpty()) {
for (int ii = 0; ii < message.addColumnData.length; ii++) {
if (!isSubscribedColumn(ii)) {
continue;
}

final WebBarrageMessage.AddColumnData column = message.addColumnData[ii];
for (int j = 0; j < column.data.size(); j++) {
destSources[ii].applyUpdate(column.data, message.rowsAdded, message.rowsRemoved);
}
}
}

final BitSet modifiedColumnSet = new BitSet(numColumns());
for (int ii = 0; ii < message.modColumnData.length; ii++) {
WebBarrageMessage.ModColumnData column = message.modColumnData[ii];
if (column.rowsModified.isEmpty()) {
continue;
}

modifiedColumnSet.set(ii);

for (int j = 0; j < column.data.size(); j++) {
Chunk<Values> chunk = column.data.get(j);
destSources[ii].fillChunk(chunk, column.rowsModified.indexIterator());
}
}

state.setSize(message.tableSize);
dataChangedHandler.onDataChanged(
RangeSet.ofRange(0, currentRowSet.size()),
RangeSet.ofRange(0, prevSize),
RangeSet.empty(), new ShiftedRange[0], modifiedColumnSet);
}

@Override
public Any getData(long key, int col) {
if (!isSubscribedColumn(col)) {
throw new NoSuchElementException("No column at index " + col);
}
long pos = serverViewport.find(key);
if (pos < 0) {
return null;
}
return this.destSources[col].get(pos);
}
}

/**
* Helper to avoid appending many times when modifying indexes. The append() method should be called for each key
* <i>in order</i> to ensure that addRange/removeRange isn't called excessively. When no more items will be added,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,23 @@
// @formatter:off
package io.deephaven.web.client.api.barrage.data;

import elemental2.core.JsArray;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.util.QueryConstants;
import io.deephaven.web.shared.data.Range;
import io.deephaven.web.shared.data.RangeSet;
import jsinterop.base.Any;
import jsinterop.base.Js;

import java.util.Iterator;
import java.util.List;
import java.util.PrimitiveIterator;

public class WebByteColumnData extends WebColumnData {
private JsArray<Any> tmpStorage;

@Override
public void fillChunk(Chunk<?> data, PrimitiveIterator.OfLong destIterator) {
ByteChunk<?> byteChunk = data.asByteChunk();
Expand All @@ -24,4 +33,62 @@ public void fillChunk(Chunk<?> data, PrimitiveIterator.OfLong destIterator) {
arr.setAt((int) destIterator.nextLong(), value == QueryConstants.NULL_BYTE ? null : Js.asAny(value));
}
}

@Override
public void applyUpdate(
final List<Chunk<Values>> data,
final RangeSet added,
final RangeSet removed) {
// ensure tmpStorage exists
if (tmpStorage == null) {
tmpStorage = new JsArray<>();
}
final int newLength = (int) (length - removed.size() + added.size());

int destOffset = 0;
int retainSourceOffset = 0;
int chunkSourceOffset = 0;
final Iterator<Range> addIter = added.rangeIterator();
final Iterator<Range> removeIter = removed.rangeIterator();
final Iterator<Chunk<Values>> dataIter = data.iterator();

Range nextAdd = addIter.hasNext() ? addIter.next() : null;
Range nextRemove = removeIter.hasNext() ? removeIter.next() : null;
ByteChunk<Values> byteChunk = dataIter.hasNext() ? dataIter.next().asByteChunk() : null;
while (destOffset < newLength) {
if (nextRemove != null && nextRemove.getFirst() == retainSourceOffset) {
// skip the range from the source chunk
retainSourceOffset += (int) nextRemove.size();
nextRemove = removeIter.hasNext() ? removeIter.next() : null;
} else if (nextAdd != null && nextAdd.getFirst() == destOffset) {
// copy the range from the source chunk
long size = nextAdd.size();
for (long ii = 0; ii < size; ++ii) {
while (byteChunk != null && chunkSourceOffset == byteChunk.size()) {
byteChunk = dataIter.hasNext() ? dataIter.next().asByteChunk() : null;
chunkSourceOffset = 0;
}
assert byteChunk != null;
byte value = byteChunk.get(chunkSourceOffset++);
tmpStorage.setAt(destOffset++, value == QueryConstants.NULL_BYTE ? null : Js.asAny(value));
}
nextAdd = addIter.hasNext() ? addIter.next() : null;
} else {
// copy the range from the source chunk
long size = (nextRemove == null ? length : nextRemove.getFirst()) - retainSourceOffset;
if (nextAdd != null) {
size = Math.min(size, nextAdd.getFirst() - destOffset);
}
for (long ii = 0; ii < size; ++ii) {
tmpStorage.setAt(destOffset++, arr.getAt(retainSourceOffset++));
}
}
}

// swap arrays to avoid copying and garbage collection
JsArray<Any> tmp = arr;
arr = tmpStorage;
tmpStorage = tmp;
length = newLength;
}
}
Loading

0 comments on commit 44cdf93

Please sign in to comment.