Skip to content

Commit

Permalink
BaseTable: Remove Swap Listener and Add Atomic addUpdateListener (dee…
Browse files Browse the repository at this point in the history
…phaven#4652)

- Adds `TableAlreadyFailedException` and `SnapshotUnsuccessfulException` to fix initialization race condition in `ExportedTableUpdateListener`

Co-authored-by: Ryan Caudy <[email protected]>
  • Loading branch information
nbauernfeind and rcaudy authored Oct 18, 2023
1 parent efc3bc8 commit 2bc0192
Show file tree
Hide file tree
Showing 28 changed files with 501 additions and 488 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ public Table apply(final Table wholeTable) {
// TODO restore this to support non-QueryTable types
// if (wholeTable instanceof BaseTable) {
// BaseTable baseTable = (BaseTable) wholeTable;
// final SwapListener swapListener =
// baseTable.createSwapListenerIfRefreshing(SwapListener::new);
// final OperationSnapshotControl snapshotControl =
// baseTable.createSnapshotControlIfRefreshing(OperationSnapshotControl::new);
//
// final Mutable<QueryTable> result = new MutableObject<>();
//
// baseTable.initializeWithSnapshot("downsample", swapListener, (prevRequested, beforeClock) -> {
// baseTable.initializeWithSnapshot("downsample", snapshotControl, (prevRequested, beforeClock) -> {
// final boolean usePrev = prevRequested && baseTable.isRefreshing();
// final WritableRowSet rowSetToUse = usePrev ? baseTable.build().copyPrev() : baseTable.build();
//
Expand All @@ -136,20 +136,20 @@ public Table apply(final Table wholeTable) {
}

private static Table makeDownsampledQueryTable(final QueryTable wholeQueryTable, final DownsampleKey memoKey) {
final SwapListener swapListener =
wholeQueryTable.createSwapListenerIfRefreshing(SwapListener::new);
final OperationSnapshotControl snapshotControl =
wholeQueryTable.createSnapshotControlIfRefreshing(OperationSnapshotControl::new);

final Mutable<Table> result = new MutableObject<>();

BaseTable.initializeWithSnapshot("downsample", swapListener, (prevRequested, beforeClock) -> {
BaseTable.initializeWithSnapshot("downsample", snapshotControl, (prevRequested, beforeClock) -> {
final boolean usePrev = prevRequested && wholeQueryTable.isRefreshing();

final DownsamplerListener downsampleListener = DownsamplerListener.of(wholeQueryTable, memoKey);
downsampleListener.init(usePrev);
result.setValue(downsampleListener.resultTable);

if (swapListener != null) {
swapListener.setListenerAndResult(downsampleListener, downsampleListener.resultTable);
if (snapshotControl != null) {
snapshotControl.setListenerAndResult(downsampleListener, downsampleListener.resultTable);
downsampleListener.resultTable.addParentReference(downsampleListener);
}

Expand Down Expand Up @@ -694,7 +694,7 @@ private void performRescans(final DownsampleChunkContext context) {
/**
* Indicates that a change has probably happened and we should notify the result table. The contents of the
* change will be our state map (i.e. there is
*
*
* @param upstream the change that happened upstream
* @param lastRowSet the base rowSet to use when considering what items to tell the result table changed. if
* this.rowSet, then update it normally, otherwise this.rowSet must be empty and this.rowSet should be
Expand Down
10 changes: 10 additions & 0 deletions engine/api/src/main/java/io/deephaven/engine/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,16 @@ RollupTable rollup(Collection<? extends Aggregation> aggregations, boolean inclu
*/
void addUpdateListener(TableUpdateListener listener);

/**
* Subscribe for updates to this table if its last notification step matches {@code requiredLastNotificationStep}.
* {@code listener} will be invoked via the {@link NotificationQueue} associated with this Table.
*
* @param listener listener for updates
* @param requiredLastNotificationStep the expected last notification step to match
* @return true if the listener was added, false if the last notification step requirement was not met
*/
boolean addUpdateListener(final TableUpdateListener listener, final long requiredLastNotificationStep);

/**
* Unsubscribe the supplied listener.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.engine.exceptions;

import io.deephaven.UncheckedDeephavenException;
import org.jetbrains.annotations.NotNull;

/**
* This exception is thrown when {@link io.deephaven.engine.table.impl.remote.ConstructSnapshot} fails to successfully
* execute the data snapshot function in an otherwise consistent state.
*/
public class SnapshotUnsuccessfulException extends UncheckedDeephavenException {
public SnapshotUnsuccessfulException(@NotNull final String message) {
super(message);
}

public SnapshotUnsuccessfulException(@NotNull final String message, @NotNull final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.engine.exceptions;

import io.deephaven.UncheckedDeephavenException;
import org.jetbrains.annotations.NotNull;

/**
* This exception is thrown when an {@link io.deephaven.engine.table.TableUpdateListener update listener} cannot be
* added to a {@link io.deephaven.engine.table.Table} because it has already failed.
*/
public class TableAlreadyFailedException extends UncheckedDeephavenException {
public TableAlreadyFailedException(@NotNull final String message) {
super(message);
}

public TableAlreadyFailedException(@NotNull final String message, @NotNull final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ public static Table toBlink(@NotNull final Table table) {
final MutableObject<QueryTable> resultHolder = new MutableObject<>();
final MutableObject<AddOnlyToBlinkListener> listenerHolder = new MutableObject<>();
final BaseTable<?> coalesced = (BaseTable<?>) table.coalesce();
final SwapListener swapListener = coalesced.createSwapListenerIfRefreshing(SwapListener::new);
final OperationSnapshotControl snapshotControl =
coalesced.createSnapshotControlIfRefreshing(OperationSnapshotControl::new);

// noinspection DataFlowIssue swapListener cannot be null here, since we know the table is refreshing
ConstructSnapshot.callDataSnapshotFunction("addOnlyToBlink", swapListener.makeSnapshotControl(),
ConstructSnapshot.callDataSnapshotFunction("addOnlyToBlink", snapshotControl,
(final boolean usePrev, final long beforeClockValue) -> {
// Start with the same rows as the original table
final TrackingRowSet resultRowSet = usePrev
Expand All @@ -69,7 +70,7 @@ public static Table toBlink(@NotNull final Table table) {
final AddOnlyToBlinkListener listener = new AddOnlyToBlinkListener(recorder, result);
recorder.setMergedListener(listener);
result.addParentReference(listener);
swapListener.setListenerAndResult(recorder, result);
snapshotControl.setListenerAndResult(recorder, result);

listenerHolder.setValue(listener);
resultHolder.setValue(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.deephaven.base.verify.Require;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.TableAlreadyFailedException;
import io.deephaven.engine.exceptions.UpdateGraphConflictException;
import io.deephaven.engine.table.impl.util.StepUpdater;
import io.deephaven.engine.updategraph.NotificationQueue;
Expand Down Expand Up @@ -557,9 +558,9 @@ public void addUpdateListener(final ShiftObliviousListener listener, final boole
}

@Override
public void addUpdateListener(final TableUpdateListener listener) {
public void addUpdateListener(@NotNull final TableUpdateListener listener) {
if (isFailed) {
throw new IllegalStateException("Can not listen to failed table " + description);
throw new TableAlreadyFailedException("Can not listen to failed table " + description);
}
if (isRefreshing()) {
// ensure that listener is in the same update graph if applicable
Expand All @@ -570,14 +571,38 @@ public void addUpdateListener(final TableUpdateListener listener) {
}
}

@Override
public boolean addUpdateListener(
@NotNull final TableUpdateListener listener, final long requiredLastNotificationStep) {
if (isFailed) {
throw new TableAlreadyFailedException("Can not listen to failed table " + description);
}

if (!isRefreshing()) {
return false;
}

synchronized (this) {
if (this.lastNotificationStep != requiredLastNotificationStep) {
return false;
}

// ensure that listener is in the same update graph if applicable
if (listener instanceof NotificationQueue.Dependency) {
getUpdateGraph((NotificationQueue.Dependency) listener);
}
ensureChildListenerReferences().add(listener);

return true;
}
}

private SimpleReferenceManager<TableUpdateListener, ? extends SimpleReference<TableUpdateListener>> ensureChildListenerReferences() {
// noinspection unchecked
return FieldUtils.ensureField(this, CHILD_LISTENER_REFERENCES_UPDATER, EMPTY_CHILD_LISTENER_REFERENCES,
() -> new SimpleReferenceManager<>((final TableUpdateListener tableUpdateListener) -> {
if (tableUpdateListener instanceof LegacyListenerAdapter) {
return (LegacyListenerAdapter) tableUpdateListener;
} else if (tableUpdateListener instanceof SwapListener) {
return ((SwapListener) tableUpdateListener).getReferenceForSource();
} else {
return new WeakSimpleReference<>(tableUpdateListener);
}
Expand Down Expand Up @@ -701,12 +726,14 @@ public final void notifyListeners(final TableUpdate update) {
validateUpdateOverlaps(update);
}

lastNotificationStep = currentStep;

// notify children
final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach(
(listenerRef, listener) -> notificationQueue.addNotification(listener.getNotification(update)));
synchronized (this) {
lastNotificationStep = currentStep;

final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach(
(listenerRef, listener) -> notificationQueue.addNotification(listener.getNotification(update)));
}

update.release();
}
Expand Down Expand Up @@ -813,11 +840,14 @@ public final void notifyListenersOnError(final Throwable e, @Nullable final Tabl

isFailed = true;
maybeSignal();
lastNotificationStep = currentStep;

final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach((listenerRef, listener) -> notificationQueue
.addNotification(listener.getErrorNotification(e, sourceEntry)));
synchronized (this) {
lastNotificationStep = currentStep;

final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach((listenerRef, listener) -> notificationQueue
.addNotification(listener.getErrorNotification(e, sourceEntry)));
}
}

/**
Expand Down Expand Up @@ -1261,34 +1291,35 @@ public Table setTotalsTable(String directive) {
}

public static void initializeWithSnapshot(
String logPrefix, SwapListener swapListener, ConstructSnapshot.SnapshotFunction snapshotFunction) {
if (swapListener == null) {
@NotNull final String logPrefix,
@Nullable final ConstructSnapshot.SnapshotControl snapshotControl,
@NotNull final ConstructSnapshot.SnapshotFunction snapshotFunction) {
if (snapshotControl == null) {
snapshotFunction.call(false, LogicalClock.NULL_CLOCK_VALUE);
return;
}
ConstructSnapshot.callDataSnapshotFunction(logPrefix, swapListener.makeSnapshotControl(), snapshotFunction);
ConstructSnapshot.callDataSnapshotFunction(logPrefix, snapshotControl, snapshotFunction);
}

public interface SwapListenerFactory<T extends SwapListener> {
T newListener(BaseTable<?> sourceTable);
public interface SnapshotControlFactory<T extends ConstructSnapshot.SnapshotControl> {
T newControl(BaseTable<?> sourceTable);
}

/**
* If we are a refreshing table, then we should create a swap listener that listens for updates to this table.
*
* If we are a refreshing table, then we should create a snapshot control to validate the snapshot.
* <p>
* Otherwise, we return null.
*
* @return a swap listener for this table (or null)
* @return a snapshot control to snapshot this table (or null)
*/
@Nullable
public <T extends SwapListener> T createSwapListenerIfRefreshing(final SwapListenerFactory<T> factory) {
public <T extends OperationSnapshotControl> T createSnapshotControlIfRefreshing(
final SnapshotControlFactory<T> factory) {
if (!isRefreshing()) {
return null;
}

final T swapListener = factory.newListener(this);
swapListener.subscribeForUpdates();
return swapListener;
return factory.newControl(this);
}

// ------------------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ private static Table internalBlinkToAppendOnly(final Table blinkTable, long size
}

final BaseTable<?> baseBlinkTable = (BaseTable<?>) blinkTable.coalesce();
final SwapListener swapListener = baseBlinkTable.createSwapListenerIfRefreshing(SwapListener::new);
final OperationSnapshotControl snapshotControl =
baseBlinkTable.createSnapshotControlIfRefreshing(OperationSnapshotControl::new);
// blink tables must tick
Assert.neqNull(swapListener, "swapListener");
Assert.neqNull(snapshotControl, "snapshotControl");

final Mutable<QueryTable> resultHolder = new MutableObject<>();

ConstructSnapshot.callDataSnapshotFunction("blinkToAppendOnly", swapListener.makeSnapshotControl(),
ConstructSnapshot.callDataSnapshotFunction("blinkToAppendOnly", snapshotControl,
(boolean usePrev, long beforeClockValue) -> {
final Map<String, WritableColumnSource<?>> columns = new LinkedHashMap<>();
final Map<String, ? extends ColumnSource<?>> columnSourceMap =
Expand Down Expand Up @@ -119,7 +120,7 @@ private static Table internalBlinkToAppendOnly(final Table blinkTable, long size

Assert.leq(result.size(), "result.size()", sizeLimit, "sizeLimit");

swapListener.setListenerAndResult(new BaseTable.ListenerImpl("streamToAppendOnly",
snapshotControl.setListenerAndResult(new BaseTable.ListenerImpl("streamToAppendOnly",
baseBlinkTable, result) {
@Override
public void onUpdate(TableUpdate upstream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.deephaven.engine.table.impl;

/**
* Used by {@link SwapListener swap listeners} to set the notification step of elements in our DAG.
* Used by {@link OperationSnapshotControl} to set the notification step of elements in our DAG.
*/
public interface NotificationStepReceiver {

Expand Down
Loading

0 comments on commit 2bc0192

Please sign in to comment.