Skip to content

Commit

Permalink
fix: JS subscriptions should return correct size while updating (#6463)
Browse files Browse the repository at this point in the history
Fixes #6423
  • Loading branch information
niloc132 committed Dec 6, 2024
1 parent 303fd8e commit de68812
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ public JsLayoutHints getLayoutHints() {
@JsProperty
public double getSize() {
TableViewportSubscription subscription = subscriptions.get(getHandle());
if (subscription != null && subscription.getStatus() == TableViewportSubscription.Status.ACTIVE) {
if (subscription != null && subscription.hasValidSize()) {
// only ask the viewport for the size if it is alive and ticking
return subscription.size();
}
Expand Down Expand Up @@ -705,7 +705,7 @@ public TableViewportSubscription setViewport(double firstRow, double lastRow,
Column[] columnsCopy = columns != null ? Js.uncheckedCast(columns.slice()) : state().getColumns();
ClientTableState currentState = state();
TableViewportSubscription activeSubscription = subscriptions.get(getHandle());
if (activeSubscription != null && activeSubscription.getStatus() != TableViewportSubscription.Status.DONE) {
if (activeSubscription != null && !activeSubscription.isClosed()) {
// hasn't finished, lets reuse it
activeSubscription.setInternalViewport(firstRow, lastRow, columnsCopy, updateIntervalMs, isReverseViewport);
return activeSubscription;
Expand Down Expand Up @@ -1583,8 +1583,7 @@ public void setState(final ClientTableState state) {
if (!isClosed() && was != null && was != state()) {
// if we held a subscription
TableViewportSubscription existingSubscription = subscriptions.remove(was.getHandle());
if (existingSubscription != null
&& existingSubscription.getStatus() != TableViewportSubscription.Status.DONE) {
if (existingSubscription != null && !existingSubscription.isClosed()) {
JsLog.debug("closing old viewport", state(), existingSubscription.state());
// with the replacement state successfully running, we can shut down the old viewport (unless
// something external retained it)
Expand Down Expand Up @@ -1715,7 +1714,7 @@ public void setSize(double s) {
this.size = s;

TableViewportSubscription subscription = subscriptions.get(getHandle());
if (changed && (subscription == null || subscription.getStatus() == TableViewportSubscription.Status.DONE)) {
if (changed && (subscription == null || !subscription.hasValidSize())) {
// If the size changed, and we have no subscription active, fire. Otherwise, we want to let the
// subscription itself manage this, so that the size changes are synchronized with data changes,
// and consumers won't be confused by the table size not matching data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public abstract class AbstractTableSubscription extends HasEventHandling {
*/
public static final String EVENT_UPDATED = "updated";

public enum Status {
protected enum Status {
/** Waiting for some prerequisite before we can use it for the first time. */
STARTING,
/** Successfully created, not waiting for any messages to be accurate. */
Expand Down Expand Up @@ -139,10 +139,6 @@ protected void revive() {
JsRunnable.doNothing());
}

public Status getStatus() {
return status;
}

protected static FlatBufferBuilder subscriptionRequest(byte[] tableTicket, BitSet columns,
@Nullable RangeSet viewport,
BarrageSubscriptionOptions options, boolean isReverseViewport) {
Expand Down Expand Up @@ -170,7 +166,7 @@ protected static FlatBufferBuilder subscriptionRequest(byte[] tableTicket, BitSe

protected void sendBarrageSubscriptionRequest(RangeSet viewport, JsArray<Column> columns, Double updateIntervalMs,
boolean isReverseViewport) {
if (status == Status.DONE) {
if (isClosed()) {
if (failMsg == null) {
throw new IllegalStateException("Can't change subscription, already closed");
} else {
Expand Down Expand Up @@ -214,15 +210,39 @@ protected WorkerConnection connection() {
return connection;
}

/**
* True if the subscription is in the ACTIVE state, meaning that the server and client are in sync with the state of
* the subscription.
*/
protected boolean isSubscriptionReady() {
return status == Status.ACTIVE;
}

/**
* Returns true if the subscription is closed and cannot be used again, false if it is actively listening for more
* data.
*/
public boolean isClosed() {
return status == Status.DONE;
}

/**
* Returns true if the subscription is in a state where it can be used to read data, false if still waiting for the
* server to send the first snapshot or if the subscription has been closed.
*
* @return true if the {@link #size()} method will return data based on the subscription, false if some other source
* of the table's size will be used.
*/
public boolean hasValidSize() {
return status == Status.ACTIVE || status == Status.PENDING_UPDATE;
}


public double size() {
if (status == Status.ACTIVE) {
if (hasValidSize()) {
return barrageSubscription.getCurrentSize();
}
if (status == Status.DONE) {
if (isClosed()) {
throw new IllegalStateException("Can't read size when already closed");
}
return state.getSize();
Expand Down Expand Up @@ -505,7 +525,7 @@ private void onFlightData(FlightData data) {
}

protected void onStreamEnd(ResponseStreamWrapper.Status status) {
if (this.status == Status.DONE) {
if (isClosed()) {
return;
}
if (status.isTransportError()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public void setInternalViewport(double firstRow, double lastRow, Column[] column
*/
@JsMethod
public void close() {
if (status == Status.DONE) {
if (isClosed()) {
JsLog.warn("TableViewportSubscription.close called on subscription that's already done.");
}
retained = false;
Expand All @@ -300,14 +300,12 @@ public void internalClose() {

reconnectSubscription.remove();

if (retained || status == Status.DONE) {
if (retained || isClosed()) {
// the JsTable has indicated it is no longer interested in this viewport, but other calling
// code has retained it, keep it open for now.
return;
}

status = Status.DONE;

super.close();
}

Expand Down

0 comments on commit de68812

Please sign in to comment.