Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReplayTable: Only Fail Table (not the server) on Error #5020

Merged
merged 9 commits into from
Jan 10, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.table.impl;

import io.deephaven.engine.table.impl.util.DelayedErrorNotifier;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.util.annotations.ReferentialIntegrity;
import org.jetbrains.annotations.NotNull;

Expand All @@ -16,19 +17,26 @@ public abstract class InstrumentedTableUpdateSource extends InstrumentedUpdateSo
@ReferentialIntegrity
private Runnable delayedErrorReference;

public InstrumentedTableUpdateSource(final BaseTable<?> table, final String description) {
super(table.getUpdateGraph(), description);
public InstrumentedTableUpdateSource(
final UpdateSourceRegistrar updateSourceRegistrar,
final BaseTable<?> table,
final String description) {
super(updateSourceRegistrar, description);
tableReference = new WeakReference<>(table);
}

public InstrumentedTableUpdateSource(final BaseTable<?> table, final String description) {
this(table.getUpdateGraph(), table, description);
}

@Override
protected final void onRefreshError(@NotNull final Exception error) {
final BaseTable<?> table = tableReference.get();
if (table == null) {
return;
}

if (table.satisfied(updateGraph.clock().currentStep())) {
if (table.satisfied(updateSourceRegistrar.getUpdateGraph().clock().currentStep())) {
// If the result is already satisfied (because it managed to send its notification, or was otherwise
// satisfied) we should not send our error notification on this cycle.
if (!table.isFailed()) {
Expand All @@ -37,6 +45,7 @@ protected final void onRefreshError(@NotNull final Exception error) {
}
} else {
table.notifyListenersOnError(error, entry);
table.forceReferenceCountToZero();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@
package io.deephaven.engine.table.impl;

import io.deephaven.engine.table.impl.perf.PerformanceEntry;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import org.jetbrains.annotations.NotNull;

import javax.annotation.Nullable;
import java.util.Objects;

public abstract class InstrumentedUpdateSource implements Runnable {

protected final UpdateGraph updateGraph;
protected final UpdateSourceRegistrar updateSourceRegistrar;
@Nullable
protected final PerformanceEntry entry;

public InstrumentedUpdateSource(final UpdateGraph updateGraph, final String description) {
this.updateGraph = updateGraph;
this.entry = PeriodicUpdateGraph.createUpdatePerformanceEntry(updateGraph, description);
public InstrumentedUpdateSource(
@NotNull final UpdateSourceRegistrar updateSourceRegistrar,
@Nullable final String description) {
this.updateSourceRegistrar = Objects.requireNonNull(updateSourceRegistrar);
this.entry = PeriodicUpdateGraph.createUpdatePerformanceEntry(
updateSourceRegistrar.getUpdateGraph(), description);
}

@Override
Expand All @@ -28,7 +33,7 @@ public final void run() {
try {
instrumentedRefresh();
} catch (final Exception error) {
updateGraph.removeSource(this);
updateSourceRegistrar.removeSource(this);
onRefreshError(error);
} finally {
if (entry != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private class LocationChangePoller extends InstrumentedTableUpdateSource {
private final TableLocationSubscriptionBuffer locationBuffer;

private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer locationBuffer) {
super(SourceTable.this, description + ".rowSetUpdateSource");
super(SourceTable.this.updateSourceRegistrar, SourceTable.this, description + ".rowSetUpdateSource");
this.locationBuffer = locationBuffer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void run() {
private class SourceRefresher extends InstrumentedTableUpdateSource {

public SourceRefresher() {
super(TimeTable.this, name);
super(registrar, TimeTable.this, name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public Replayer(Instant startTime, Instant endTime) {
@Override
public void start() {
deltaNanos = DateTimeUtils.millisToNanos(System.currentTimeMillis()) - DateTimeUtils.epochNanos(startTime);
// Note: ReplayTables are allowed to run in parallel with this Replayer. However, we may wish to use an
// UpdateSourceCombiner to ensure that all of these tables are refreshed as a unit.
updateGraph.addSource(sourceRefresher);
for (ReplayTableBase currentTable : currentTables) {
currentTable.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public abstract class BaseUpdateGraph implements UpdateGraph, LogOutputAppendabl
*/
@Nullable
public static PerformanceEntry createUpdatePerformanceEntry(
final UpdateGraph updateGraph,
final String description) {
@Nullable final UpdateGraph updateGraph,
@Nullable final String description) {
if (updateGraph instanceof BaseUpdateGraph) {
final BaseUpdateGraph bug = (BaseUpdateGraph) updateGraph;
if (bug.updatePerformanceTracker != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import io.deephaven.util.locks.AwareFunctionalLock;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

/**
Expand All @@ -27,7 +27,7 @@ public class CapturingUpdateGraph implements UpdateGraph {

private final ExecutionContext context;

private final List<Runnable> sources = new ArrayList<>();
private final Queue<Runnable> sources = new ConcurrentLinkedQueue<>();

public CapturingUpdateGraph(@NotNull final ControlledUpdateGraph delegate) {
this.delegate = delegate;
Expand All @@ -52,6 +52,10 @@ void refreshSources() {
sources.forEach(Runnable::run);
}

void markSourcesRefreshedForUnitTests() {
delegate.markSourcesRefreshedForUnitTests();
}

@Override
public LogOutput append(LogOutput logOutput) {
return logOutput.append("CapturingUpdateGraph of ").append(delegate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ public void testAddAndRemoveLocations() {

allowingError(() -> updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
updateGraph.refreshSources();
updateGraph.markSourcesRefreshedForUnitTests();
registrar.run();
}), errors -> errors.size() == 1 &&
FindExceptionCause.isOrCausedBy(errors.get(0),
TableLocationRemovedException.class).isPresent());
}, false), errors -> errors.size() == 1 &&
FindExceptionCause.isOrCausedBy(errors.get(0), TableLocationRemovedException.class).isPresent());
getUpdateErrors().clear();

assertEquals(1, partitionTable.size());
Expand Down Expand Up @@ -147,10 +147,10 @@ public void testAddAndRemoveLocations() {

allowingError(() -> updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
updateGraph.refreshSources();
updateGraph.markSourcesRefreshedForUnitTests();
registrar.run();
}), errors -> errors.size() == 1 &&
FindExceptionCause.isOrCausedBy(errors.get(0),
TableLocationRemovedException.class).isPresent());
}, false), errors -> errors.size() == 1 &&
FindExceptionCause.isOrCausedBy(errors.get(0), TableLocationRemovedException.class).isPresent());
getUpdateErrors().clear();

assertEquals(2, partitionTable.size());
Expand All @@ -163,10 +163,11 @@ public void testAddAndRemoveLocations() {
// The TableBackedTableLocation has a copy() of the p3 table which is itself a leaf. Erroring P3 will
// cause one error to come from the copied table, and one from the merged() table. We just need to validate
// that the exceptions we see are a ConstituentTableException and an ISE
allowingError(() -> updateGraph.getDelegate().runWithinUnitTestCycle(
() -> p3.notifyListenersOnError(new IllegalStateException("This is a test error"), null)),
errors -> errors.size() == 1 &&
FindExceptionCause.isOrCausedBy(errors.get(0), IllegalStateException.class).isPresent());
allowingError(() -> updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
p3.notifyListenersOnError(new IllegalStateException("This is a test error"), null);
updateGraph.markSourcesRefreshedForUnitTests();
}, false), errors -> errors.size() == 1 &&
FindExceptionCause.isOrCausedBy(errors.get(0), IllegalStateException.class).isPresent());
}

/**
Expand Down Expand Up @@ -197,10 +198,10 @@ public void testRemoveAndFail() {
allowingError(() -> updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
// This should process the pending update from the refresh above.
updateGraph.refreshSources();
updateGraph.markSourcesRefreshedForUnitTests();
registrar.run();
}),
errors -> errors.size() == 1 &&
FindExceptionCause.isOrCausedBy(errors.get(0), TableDataException.class).isPresent());
}, false), errors -> errors.size() == 1 &&
FindExceptionCause.isOrCausedBy(errors.get(0), TableDataException.class).isPresent());
getUpdateErrors().clear();

// Then delete it for real
Expand All @@ -210,8 +211,9 @@ public void testRemoveAndFail() {
// We should NOT get an error here because the failed table should have removed itself from the registrar.
updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
updateGraph.refreshSources();
updateGraph.markSourcesRefreshedForUnitTests();
registrar.run();
});
}, false);

assertEquals(1, partitionTable.size());
try (final CloseableIterator<Table> tableIt = partitionTable.columnIterator("LocationTable")) {
Expand Down Expand Up @@ -240,8 +242,9 @@ public void testCantReadPrev() {

allowingError(() -> updateGraph.runWithinUnitTestCycle(() -> {
updateGraph.refreshSources();
updateGraph.markSourcesRefreshedForUnitTests();
registrar.run();
}), errors -> errors.stream().anyMatch(e -> FindExceptionCause.isOrCausedBy(e,
}, false), errors -> errors.stream().anyMatch(e -> FindExceptionCause.isOrCausedBy(e,
InvalidatedRegionException.class).isPresent()) &&
errors.stream().anyMatch(e -> FindExceptionCause.isOrCausedBy(e,
TableLocationRemovedException.class).isPresent()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,10 @@ public Object invoke(Invocation invocation) {

errorNotification.reset();
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
updateGraph.runWithinUnitTestCycle(SUT::refresh);
updateGraph.runWithinUnitTestCycle(() -> {
SUT.refresh();
updateGraph.markSourcesRefreshedForUnitTests();
}, false);
assertIsSatisfied();
errorNotification.assertInvoked();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public <T> T allowingError(Supplier<T> function, Predicate<List<Throwable>> erro
} finally {
setExpectError(original);
}
if (!errorsAcceptable.test(errors)) {
if (errors == null && !errorsAcceptable.test(errors)) {
TestCase.fail("Unacceptable errors: " + errors);
}
return retval;
Expand Down
Loading