Skip to content

Commit

Permalink
fix: DH-18482: EventDrivenUpdateGraph: Allow two concurrent threads t…
Browse files Browse the repository at this point in the history
…o safely requestRefresh(), and ensure clean errors for nested refresh attempts (#6603)
  • Loading branch information
rcaudy authored Jan 28, 2025
1 parent 24f1d83 commit f79fe67
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ private void flushTerminalNotifications() {
runNotification(notificationForThisThread);
}

// We can not proceed until all of the terminal notifications have executed.
// We can not proceed until all the terminal notifications have executed.
notificationProcessor.doAllWork();
}

Expand Down Expand Up @@ -836,6 +836,10 @@ private void computeStatsAndLogCycle(final long cycleTimeNanos) {
}
}

void reportLockWaitNanos(final long lockWaitNanos) {
currentCycleLockWaitTotalNanos += lockWaitNanos;
}

/**
* Is the provided cycle time on budget?
*
Expand Down Expand Up @@ -907,7 +911,7 @@ void refreshAllTables() {
private void doRefresh(@NotNull final Runnable refreshFunction) {
final long lockStartTimeNanos = System.nanoTime();
exclusiveLock().doLocked(() -> {
currentCycleLockWaitTotalNanos += System.nanoTime() - lockStartTimeNanos;
reportLockWaitNanos(System.nanoTime() - lockStartTimeNanos);
if (!running) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,22 @@ public int parallelismFactor() {
*/
@Override
public void requestRefresh() {
maybeStart();
// do the work to refresh everything, on this thread
isUpdateThread.set(true);
try (final SafeCloseable ignored = ExecutionContext.newBuilder().setUpdateGraph(this).build().open()) {
refreshAllTables();
} finally {
isUpdateThread.remove();
if (isUpdateThread.get()) {
throw new IllegalStateException("Cannot request a refresh from an update thread");
}
maybeStart();
// Do the work to refresh everything, driven by this thread. Note that we acquire the lock "early" in order to
// avoid any inconsistencies w.r.t. assumptions about clock, lock, and update-thread state.
final long lockStartTimeNanos = System.nanoTime();
exclusiveLock().doLocked(() -> {
reportLockWaitNanos(System.nanoTime() - lockStartTimeNanos);
isUpdateThread.set(true);
try (final SafeCloseable ignored = ExecutionContext.newBuilder().setUpdateGraph(this).build().open()) {
refreshAllTables();
} finally {
isUpdateThread.remove();
}
});
final long nowNanos = System.nanoTime();
synchronized (this) {
maybeFlushUpdatePerformance(nowNanos, nowNanos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@
import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker;
import io.deephaven.engine.table.impl.sources.LongSingleValueSource;
import io.deephaven.engine.testutil.TstUtils;
import io.deephaven.engine.updategraph.TerminalNotification;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.TableTools;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ReflexiveUse;
import junit.framework.TestCase;
import org.apache.commons.lang3.mutable.MutableInt;
import org.junit.*;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;

import static io.deephaven.engine.context.TestExecutionContext.OPERATION_INITIALIZATION;
import static io.deephaven.engine.util.TableTools.*;
Expand Down Expand Up @@ -165,6 +170,57 @@ public void testSimpleModify() {
}
}

@Test
public void testRefreshRace() throws ExecutionException, InterruptedException, TimeoutException {
final EventDrivenUpdateGraph eventDrivenUpdateGraph = EventDrivenUpdateGraph.newBuilder("TestEDUG").build();
final List<Runnable> retainedReferences = new ArrayList<>();

final MutableInt sourceRefreshCount = new MutableInt(0);
final Runnable sleepingSource = () -> {
try {
Thread.sleep(100);
sourceRefreshCount.increment();
} catch (InterruptedException e) {
Assert.fail("Interrupted while sleeping");
}
};
retainedReferences.add(sleepingSource);
eventDrivenUpdateGraph.addSource(sleepingSource);

final int numConcurrentRefreshes = 10;
final Future<?>[] refreshFutures = new Future[numConcurrentRefreshes];
final ExecutorService executor = Executors.newFixedThreadPool(numConcurrentRefreshes);
try {
for (int cri = 0; cri < numConcurrentRefreshes; ++cri) {
refreshFutures[cri] = executor.submit(eventDrivenUpdateGraph::requestRefresh);
Thread.sleep(10);
}
for (final Future<?> refreshFuture : refreshFutures) {
refreshFuture.get(10, TimeUnit.SECONDS);
}
} finally {
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
}

Assert.assertEquals(numConcurrentRefreshes, sourceRefreshCount.intValue());
Assert.assertEquals(sleepingSource, retainedReferences.get(0));
}

@Test
public void testIllegalRefresh() {
final EventDrivenUpdateGraph eventDrivenUpdateGraph = EventDrivenUpdateGraph.newBuilder("TestEDUG").build();

eventDrivenUpdateGraph.addNotification(new TerminalNotification() {
@Override
public void run() {
Assert.assertThrows(IllegalStateException.class, eventDrivenUpdateGraph::requestRefresh);
}
});

eventDrivenUpdateGraph.requestRefresh();
}

@Test
public void testUpdatePerformanceTracker() {
final Table upt = UpdatePerformanceTracker.getQueryTable();
Expand Down

0 comments on commit f79fe67

Please sign in to comment.