Skip to content

Commit

Permalink
Remove unnecessary update() functions from BarrageMessageProducer (
Browse files Browse the repository at this point in the history
…#2090)

* removed unneeded update function from BarrageMessageProducer
  • Loading branch information
lbooker42 authored Mar 14, 2022
1 parent cdf2ddd commit 26cdd2b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import io.deephaven.barrage.flatbuf.BarrageMessageType;
import io.deephaven.barrage.flatbuf.BarrageSnapshotRequest;
import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.liveness.SingletonLivenessManager;
import io.deephaven.engine.rowset.RowSet;
Expand Down Expand Up @@ -294,8 +292,6 @@ public interface Factory {
private final String myPrefix;
private final SessionState session;

private boolean isViewport;

private final StreamObserver<BarrageStreamGenerator.View> listener;

private boolean isClosed = false;
Expand Down Expand Up @@ -651,9 +647,9 @@ private synchronized void onExportResolved(final SessionState.ExportObject<Objec
final BitSet columns =
hasColumns ? BitSet.valueOf(subscriptionRequest.columnsAsByteBuffer()) : null;

isViewport = subscriptionRequest.viewportVector() != null;
final boolean hasViewport = subscriptionRequest.viewportVector() != null;
final RowSet viewport =
isViewport ? BarrageProtoUtil.toRowSet(subscriptionRequest.viewportAsByteBuffer()) : null;
hasViewport ? BarrageProtoUtil.toRowSet(subscriptionRequest.viewportAsByteBuffer()) : null;

final boolean reverseViewport = subscriptionRequest.reverseViewport();

Expand All @@ -676,22 +672,16 @@ private synchronized void onExportResolved(final SessionState.ExportObject<Objec
private void apply(final BarrageSubscriptionRequest subscriptionRequest) {
final boolean hasColumns = subscriptionRequest.columnsVector() != null;
final BitSet columns =
hasColumns ? BitSet.valueOf(subscriptionRequest.columnsAsByteBuffer()) : new BitSet();
hasColumns ? BitSet.valueOf(subscriptionRequest.columnsAsByteBuffer()) : null;

final boolean hasViewport = subscriptionRequest.viewportVector() != null;
final RowSet viewport =
isViewport ? BarrageProtoUtil.toRowSet(subscriptionRequest.viewportAsByteBuffer()) : null;

final boolean subscriptionFound;
if (isViewport && hasColumns && hasViewport) {
subscriptionFound = bmp.updateViewportAndColumns(listener, viewport, columns);
} else if (isViewport && hasViewport) {
subscriptionFound = bmp.updateViewport(listener, viewport);
} else if (hasColumns) {
subscriptionFound = bmp.updateSubscription(listener, columns);
} else {
subscriptionFound = true;
}
hasViewport ? BarrageProtoUtil.toRowSet(subscriptionRequest.viewportAsByteBuffer()) : null;

final boolean reverseViewport = subscriptionRequest.reverseViewport();


final boolean subscriptionFound = bmp.updateSubscription(listener, viewport, columns, reverseViewport);

if (!subscriptionFound) {
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND, "Subscription was not found.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,52 +532,29 @@ private boolean findAndUpdateSubscription(final StreamObserver<MessageView> list
}

public boolean updateSubscription(final StreamObserver<MessageView> listener,
final BitSet newSubscribedColumns) {
return findAndUpdateSubscription(listener, sub -> {
sub.pendingColumns = (BitSet) newSubscribedColumns.clone();
if (sub.isViewport() && sub.pendingViewport == null) {
sub.pendingViewport = sub.viewport.copy();
}
log.info().append(logPrefix).append(sub.logPrefix)
.append("scheduling update immediately, for column updates.").endl();
});
}

public boolean updateViewport(final StreamObserver<MessageView> listener,
final RowSet newViewport) {
return updateViewport(listener, newViewport, false);
final @Nullable RowSet newViewport, final @Nullable BitSet columnsToSubscribe) {
// assume forward viewport when not specified
return updateSubscription(listener, newViewport, columnsToSubscribe, false);
}

public boolean updateViewport(final StreamObserver<MessageView> listener, final RowSet newViewport,
final boolean newReverseViewport) {
public boolean updateSubscription(final StreamObserver<MessageView> listener, final @Nullable RowSet newViewport,
final @Nullable BitSet columnsToSubscribe, final boolean newReverseViewport) {
return findAndUpdateSubscription(listener, sub -> {
if (sub.pendingViewport != null) {
sub.pendingViewport.close();
}
sub.pendingViewport = newViewport.copy();
sub.pendingReverseViewport = newReverseViewport;
if (sub.pendingColumns == null) {
sub.pendingColumns = (BitSet) sub.subscribedColumns.clone();
}
log.info().append(logPrefix).append(sub.logPrefix)
.append("scheduling update immediately, for viewport updates.").endl();
});
}

public boolean updateViewportAndColumns(final StreamObserver<MessageView> listener,
final RowSet newViewport, final BitSet columnsToSubscribe) {
return updateViewportAndColumns(listener, newViewport, columnsToSubscribe);
}

public boolean updateViewportAndColumns(final StreamObserver<MessageView> listener, final RowSet newViewport,
final BitSet columnsToSubscribe, final boolean newReverseViewport) {
return findAndUpdateSubscription(listener, sub -> {
if (sub.pendingViewport != null) {
sub.pendingViewport.close();
final BitSet cols;
if (columnsToSubscribe == null) {
cols = new BitSet(sourceColumns.length);
cols.set(0, sourceColumns.length);
} else {
cols = (BitSet) columnsToSubscribe.clone();
}
sub.pendingViewport = newViewport.copy();
sub.pendingReverseViewport = newReverseViewport;
sub.pendingColumns = (BitSet) columnsToSubscribe.clone();

sub.pendingColumns = cols;
log.info().append(logPrefix).append(sub.logPrefix)
.append("scheduling update immediately, for viewport and column updates.").endl();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,15 @@ public void setViewport(final RowSet newViewport, final boolean newReverseViewpo
viewport = newViewport;
reverseViewport = newReverseViewport;

barrageMessageProducer.updateViewport(dummyObserver, viewport, reverseViewport);
// maintain the existing subscribedColumns set
barrageMessageProducer.updateSubscription(dummyObserver, viewport, subscribedColumns, reverseViewport);
}

public void setSubscribedColumns(final BitSet newColumns) {
subscribedColumns = newColumns;
barrageMessageProducer.updateSubscription(dummyObserver, newColumns);

// maintain the existing viewport and viewport direction
barrageMessageProducer.updateSubscription(dummyObserver, viewport, subscribedColumns, reverseViewport);
}

public void setViewportAndColumns(final RowSet newViewport, final BitSet newColumns) {
Expand All @@ -333,7 +336,7 @@ public void setViewportAndColumns(final RowSet newViewport, final BitSet newColu
viewport = newViewport;
reverseViewport = newReverseViewport;
subscribedColumns = newColumns;
barrageMessageProducer.updateViewportAndColumns(dummyObserver, viewport, subscribedColumns);
barrageMessageProducer.updateSubscription(dummyObserver, viewport, subscribedColumns, newReverseViewport);
}
}

Expand Down

0 comments on commit 26cdd2b

Please sign in to comment.