Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: JS subscriptions should return correct size while updating (#6463) #6464

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ public JsLayoutHints getLayoutHints() {
@JsProperty
public double getSize() {
TableViewportSubscription subscription = subscriptions.get(getHandle());
if (subscription != null && subscription.getStatus() == TableViewportSubscription.Status.ACTIVE) {
if (subscription != null && subscription.hasValidSize()) {
// only ask the viewport for the size if it is alive and ticking
return subscription.size();
}
Expand Down Expand Up @@ -705,7 +705,7 @@ public TableViewportSubscription setViewport(double firstRow, double lastRow,
Column[] columnsCopy = columns != null ? Js.uncheckedCast(columns.slice()) : state().getColumns();
ClientTableState currentState = state();
TableViewportSubscription activeSubscription = subscriptions.get(getHandle());
if (activeSubscription != null && activeSubscription.getStatus() != TableViewportSubscription.Status.DONE) {
if (activeSubscription != null && !activeSubscription.isClosed()) {
// hasn't finished, lets reuse it
activeSubscription.setInternalViewport(firstRow, lastRow, columnsCopy, updateIntervalMs, isReverseViewport);
return activeSubscription;
Expand Down Expand Up @@ -1583,8 +1583,7 @@ public void setState(final ClientTableState state) {
if (!isClosed() && was != null && was != state()) {
// if we held a subscription
TableViewportSubscription existingSubscription = subscriptions.remove(was.getHandle());
if (existingSubscription != null
&& existingSubscription.getStatus() != TableViewportSubscription.Status.DONE) {
if (existingSubscription != null && !existingSubscription.isClosed()) {
JsLog.debug("closing old viewport", state(), existingSubscription.state());
// with the replacement state successfully running, we can shut down the old viewport (unless
// something external retained it)
Expand Down Expand Up @@ -1715,7 +1714,7 @@ public void setSize(double s) {
this.size = s;

TableViewportSubscription subscription = subscriptions.get(getHandle());
if (changed && (subscription == null || subscription.getStatus() == TableViewportSubscription.Status.DONE)) {
if (changed && (subscription == null || !subscription.hasValidSize())) {
// If the size changed, and we have no subscription active, fire. Otherwise, we want to let the
// subscription itself manage this, so that the size changes are synchronized with data changes,
// and consumers won't be confused by the table size not matching data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ public abstract class AbstractTableSubscription extends HasEventHandling {
*/
public static final String EVENT_UPDATED = "updated";

public enum Status {
protected enum Status {
/** Waiting for some prerequisite before we can use it for the first time. */
STARTING,
/** All prerequisites are met, waiting for the first snapshot to be returned. */
SUBSCRIPTION_REQUESTED,
/** Successfully created, not waiting for any messages to be accurate. */
ACTIVE,
/** Waiting for an update to return from being active to being active again. */
Expand Down Expand Up @@ -117,7 +119,11 @@ protected void revive() {
WebBarrageSubscription.ViewportChangedHandler viewportChangedHandler = this::onViewportChange;
WebBarrageSubscription.DataChangedHandler dataChangedHandler = this::onDataChanged;

status = Status.ACTIVE;
status = Status.SUBSCRIPTION_REQUESTED;

// In order to create the subscription, we need to already have the table resolved, so we know if it
// is a blink table or not. In turn, we can't be prepared to handle any messages from the server until
// we know this, so we can't race messages with this design.
this.barrageSubscription = WebBarrageSubscription.subscribe(
subscriptionType, state, viewportChangedHandler, dataChangedHandler);

Expand All @@ -139,10 +145,6 @@ protected void revive() {
JsRunnable.doNothing());
}

public Status getStatus() {
return status;
}

protected static FlatBufferBuilder subscriptionRequest(byte[] tableTicket, BitSet columns,
@Nullable RangeSet viewport,
BarrageSubscriptionOptions options, boolean isReverseViewport) {
Expand All @@ -168,16 +170,19 @@ protected static FlatBufferBuilder subscriptionRequest(byte[] tableTicket, BitSe

protected abstract void sendFirstSubscriptionRequest();

protected void sendBarrageSubscriptionRequest(RangeSet viewport, JsArray<Column> columns, Double updateIntervalMs,
protected void sendBarrageSubscriptionRequest(@Nullable RangeSet viewport, JsArray<Column> columns,
Double updateIntervalMs,
boolean isReverseViewport) {
if (status == Status.DONE) {
if (isClosed()) {
if (failMsg == null) {
throw new IllegalStateException("Can't change subscription, already closed");
} else {
throw new IllegalStateException("Can't change subscription, already failed: " + failMsg);
}
}
status = Status.PENDING_UPDATE;
if (status == Status.ACTIVE) {
status = Status.PENDING_UPDATE;
}
this.columns = columns;
this.viewportRowSet = viewport;
this.columnBitSet = makeColumnBitset(columns);
Expand Down Expand Up @@ -214,15 +219,39 @@ protected WorkerConnection connection() {
return connection;
}

/**
* True if the subscription is in the ACTIVE state, meaning that the server and client are in sync with the state of
* the subscription.
*/
protected boolean isSubscriptionReady() {
return status == Status.ACTIVE;
}

/**
* Returns true if the subscription is closed and cannot be used again, false if it is actively listening for more
* data.
*/
public boolean isClosed() {
return status == Status.DONE;
}

/**
* Returns true if the subscription is in a state where it can be used to read data, false if still waiting for the
* server to send the first snapshot or if the subscription has been closed.
*
* @return true if the {@link #size()} method will return data based on the subscription, false if some other source
* of the table's size will be used.
*/
public boolean hasValidSize() {
return status == Status.ACTIVE || status == Status.PENDING_UPDATE;
}


public double size() {
if (status == Status.ACTIVE) {
if (hasValidSize()) {
return barrageSubscription.getCurrentSize();
}
if (status == Status.DONE) {
if (isClosed()) {
throw new IllegalStateException("Can't read size when already closed");
}
return state.getSize();
Expand Down Expand Up @@ -505,7 +534,7 @@ private void onFlightData(FlightData data) {
}

protected void onStreamEnd(ResponseStreamWrapper.Status status) {
if (this.status == Status.DONE) {
if (isClosed()) {
return;
}
if (status.isTransportError()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ public void setViewport(double firstRow, double lastRow, @JsOptional @JsNullable

public void setInternalViewport(double firstRow, double lastRow, Column[] columns, Double updateIntervalMs,
Boolean isReverseViewport) {
// Until we've created the stream, we just cache the requested viewport
if (status == Status.STARTING) {
this.firstRow = firstRow;
this.lastRow = lastRow;
Expand Down Expand Up @@ -281,7 +282,7 @@ public void setInternalViewport(double firstRow, double lastRow, Column[] column
*/
@JsMethod
public void close() {
if (status == Status.DONE) {
if (isClosed()) {
JsLog.warn("TableViewportSubscription.close called on subscription that's already done.");
}
retained = false;
Expand All @@ -300,14 +301,12 @@ public void internalClose() {

reconnectSubscription.remove();

if (retained || status == Status.DONE) {
if (retained || isClosed()) {
// the JsTable has indicated it is no longer interested in this viewport, but other calling
// code has retained it, keep it open for now.
return;
}

status = Status.DONE;

super.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,35 @@ public void testViewportOnStaticTable() {
.then(this::finish).catch_(this::report);
}

public void testChangePendingViewport() {
connect(tables)
.then(table("staticTable"))
.then(table -> {
delayTestFinish(5001);

table.setViewport(0, 25, null);
assertEquals(100.0, table.getSize());
return Promise.resolve(table);
})
.then(table -> {
assertEquals(100.0, table.getSize());
table.setViewport(5, 30, null);
assertEquals(100.0, table.getSize());
return assertUpdateReceived(table, viewport -> {
assertEquals(100.0, table.getSize());

assertEquals(5d, viewport.getOffset());
assertEquals(26, viewport.getRows().length);
}, 2500);
})
.then(table -> {
assertEquals(100.0, table.getSize());
table.close();
return null;
})
.then(this::finish).catch_(this::report);
}

// TODO: https://deephaven.atlassian.net/browse/DH-11196
public void ignore_testViewportOnGrowingTable() {
connect(tables)
Expand Down