diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableUpdateSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableUpdateSource.java new file mode 100644 index 00000000000..40ec78383ed --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableUpdateSource.java @@ -0,0 +1,53 @@ +/** + * Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.engine.table.impl; + +import io.deephaven.engine.table.impl.util.DelayedErrorNotifier; +import io.deephaven.engine.updategraph.UpdateSourceRegistrar; +import io.deephaven.util.annotations.ReferentialIntegrity; +import org.jetbrains.annotations.NotNull; + +import java.lang.ref.WeakReference; + +public abstract class InstrumentedTableUpdateSource extends InstrumentedUpdateSource { + + private final WeakReference> tableReference; + + @ReferentialIntegrity + private Runnable delayedErrorReference; + + public InstrumentedTableUpdateSource( + final UpdateSourceRegistrar updateSourceRegistrar, + final BaseTable table, + final String description) { + super(updateSourceRegistrar, description); + // verify that the updateSourceRegistrar's update graph is the same as the table's update graph (if applicable) + updateSourceRegistrar.getUpdateGraph(table); + tableReference = new WeakReference<>(table); + } + + public InstrumentedTableUpdateSource(final BaseTable table, final String description) { + this(table.getUpdateGraph(), table, description); + } + + @Override + protected final void onRefreshError(@NotNull final Exception error) { + final BaseTable table = tableReference.get(); + if (table == null) { + return; + } + + if (table.satisfied(updateSourceRegistrar.getUpdateGraph().clock().currentStep())) { + // If the result is already satisfied (because it managed to send its notification, or was otherwise + // satisfied) we should not send our error notification on this cycle. + if (!table.isFailed()) { + // If the result isn't failed, we need to mark it as such on the next cycle. + delayedErrorReference = new DelayedErrorNotifier(error, entry, table); + } + } else { + table.notifyListenersOnError(error, entry); + table.forceReferenceCountToZero(); + } + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedUpdateSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedUpdateSource.java index 86010c32349..015c218d84a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedUpdateSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedUpdateSource.java @@ -4,18 +4,25 @@ package io.deephaven.engine.table.impl; import io.deephaven.engine.table.impl.perf.PerformanceEntry; -import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.engine.updategraph.UpdateSourceRegistrar; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; +import org.jetbrains.annotations.NotNull; import javax.annotation.Nullable; +import java.util.Objects; public abstract class InstrumentedUpdateSource implements Runnable { + protected final UpdateSourceRegistrar updateSourceRegistrar; @Nullable protected final PerformanceEntry entry; - public InstrumentedUpdateSource(final UpdateGraph updateGraph, final String description) { - this.entry = PeriodicUpdateGraph.createUpdatePerformanceEntry(updateGraph, description); + public InstrumentedUpdateSource( + @NotNull final UpdateSourceRegistrar updateSourceRegistrar, + @Nullable final String description) { + this.updateSourceRegistrar = Objects.requireNonNull(updateSourceRegistrar); + this.entry = PeriodicUpdateGraph.createUpdatePerformanceEntry( + updateSourceRegistrar.getUpdateGraph(), description); } @Override @@ -25,6 +32,9 @@ public final void run() { } try { instrumentedRefresh(); + } catch (final Exception error) { + updateSourceRegistrar.removeSource(this); + onRefreshError(error); } finally { if (entry != null) { entry.onUpdateEnd(); @@ -33,4 +43,6 @@ public final void run() { } protected abstract void instrumentedRefresh(); + + protected abstract void onRefreshError(Exception error); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java index b3245f7a2f4..9b8d7eccd87 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java @@ -134,8 +134,8 @@ private UnderlyingTableMaintainer( IntrusiveDoublyLinkedNode.Adapter.getInstance()); readyLocationStates = new IntrusiveDoublyLinkedQueue<>( IntrusiveDoublyLinkedNode.Adapter.getInstance()); - processNewLocationsUpdateRoot = new InstrumentedUpdateSource( - result.getUpdateGraph(), + processNewLocationsUpdateRoot = new InstrumentedTableUpdateSource( + result, SourcePartitionedTable.class.getSimpleName() + '[' + tableLocationProvider + ']' + "-processPendingLocations") { @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java index f2870f10631..2e42da20a2a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java @@ -209,46 +209,39 @@ private WritableRowSet refreshLocationSizes() { } } - private class LocationChangePoller extends InstrumentedUpdateSource { + private class LocationChangePoller extends InstrumentedTableUpdateSource { private final TableLocationSubscriptionBuffer locationBuffer; private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer locationBuffer) { - super(updateGraph, description + ".rowSetUpdateSource"); + super(SourceTable.this.updateSourceRegistrar, SourceTable.this, description + ".rowSetUpdateSource"); this.locationBuffer = locationBuffer; } @Override protected void instrumentedRefresh() { - try { - final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending(); - final ImmutableTableLocationKey[] removedKeys = - maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys()); - if (removedKeys.length > 0) { - throw new TableLocationRemovedException("Source table does not support removed locations", - removedKeys); - } - maybeAddLocations(locationUpdate.getPendingAddedLocationKeys()); - - // NB: This class previously had functionality to notify "location listeners", but it was never used. - // Resurrect from git history if needed. - if (!locationSizesInitialized) { - // We don't want to start polling size changes until the initial RowSet has been computed. - return; - } - - final RowSet added = refreshLocationSizes(); - if (added.isEmpty()) { - return; - } + final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending(); + final ImmutableTableLocationKey[] removedKeys = + maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys()); + if (removedKeys.length > 0) { + throw new TableLocationRemovedException("Source table does not support removed locations", + removedKeys); + } + maybeAddLocations(locationUpdate.getPendingAddedLocationKeys()); - rowSet.insert(added); - notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty()); - } catch (Exception e) { - updateSourceRegistrar.removeSource(this); + // NB: This class previously had functionality to notify "location listeners", but it was never used. + // Resurrect from git history if needed. + if (!locationSizesInitialized) { + // We don't want to start polling size changes until the initial RowSet has been computed. + return; + } - // Notify listeners to the SourceTable when we had an issue refreshing available locations. - notifyListenersOnError(e, null); + final RowSet added = refreshLocationSizes(); + if (added.isEmpty()) { + return; } + + rowSet.insert(added); + notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty()); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/TimeTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/TimeTable.java index c10120559e1..728cd45068d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/TimeTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/TimeTable.java @@ -158,10 +158,10 @@ public void run() { refresh(true); } - private class SourceRefresher extends InstrumentedUpdateSource { + private class SourceRefresher extends InstrumentedTableUpdateSource { public SourceRefresher() { - super(updateGraph, name); + super(registrar, TimeTable.this, name); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/QueryReplayGroupedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/QueryReplayGroupedTable.java index b6066f83c58..dcb3c795781 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/QueryReplayGroupedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/QueryReplayGroupedTable.java @@ -3,26 +3,21 @@ */ package io.deephaven.engine.table.impl.replay; - -import io.deephaven.base.verify.Require; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.TrackingRowSet; import io.deephaven.engine.table.impl.indexer.RowSetIndexer; -import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.sources.RedirectedColumnSource; import io.deephaven.engine.table.TupleSource; import io.deephaven.engine.table.impl.TupleSourceFactory; import io.deephaven.engine.table.impl.util.*; +import org.jetbrains.annotations.NotNull; import java.time.Instant; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.PriorityQueue; +import java.util.*; -public abstract class QueryReplayGroupedTable extends QueryTable implements Runnable { +public abstract class QueryReplayGroupedTable extends ReplayTableBase implements Runnable { protected final WritableRowRedirection rowRedirection; @@ -39,7 +34,7 @@ private static Map> getResultSources(Map { + protected static class IteratorsAndNextTime implements Comparable { private final RowSet.Iterator iterator; private final ColumnSource columnSource; @@ -74,11 +69,19 @@ public int compareTo(IteratorsAndNextTime o) { } } - protected QueryReplayGroupedTable(TrackingRowSet rowSet, Map> input, - String timeColumn, Replayer replayer, WritableRowRedirection rowRedirection, String[] groupingColumns) { + protected QueryReplayGroupedTable( + @NotNull final String description, + @NotNull final TrackingRowSet rowSet, + @NotNull final Map> input, + @NotNull final String timeColumn, + @NotNull final Replayer replayer, + @NotNull final WritableRowRedirection rowRedirection, + @NotNull final String[] groupingColumns) { - super(RowSetFactory.empty().toTracking(), getResultSources(input, rowRedirection)); + super(description, RowSetFactory.empty().toTracking(), getResultSources(input, rowRedirection)); this.rowRedirection = rowRedirection; + this.replayer = Objects.requireNonNull(replayer, "replayer"); + Map grouping; final ColumnSource[] columnSources = @@ -95,9 +98,6 @@ protected QueryReplayGroupedTable(TrackingRowSet rowSet, Map> input, String timeColumn, Replayer replayer, String groupingColumn) { - super(rowSet, input, timeColumn, replayer, + super("ReplayGroupedFullTable", rowSet, input, timeColumn, replayer, WritableRowRedirection.FACTORY.createRowRedirection((int) rowSet.size()), new String[] {groupingColumn}); redirIndexSize = 0; // We do not modify existing entries in the WritableRowRedirection (we only add at the end), so there's no need - // to - // ask the WritableRowRedirection to track previous values. + // to ask the WritableRowRedirection to track previous values. } @Override @@ -46,7 +45,7 @@ public void run() { } } final RowSet added = rowSetBuilder.build(); - if (added.size() > 0) { + if (!added.isEmpty()) { getRowSet().writableCast().insert(added); notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty()); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayLastByGroupedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayLastByGroupedTable.java index 7ee013bec4e..d49c0d76afa 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayLastByGroupedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayLastByGroupedTable.java @@ -19,8 +19,8 @@ public class ReplayLastByGroupedTable extends QueryReplayGroupedTable { public ReplayLastByGroupedTable(TrackingRowSet rowSet, Map> input, String timeColumn, Replayer replayer, String[] groupingColumns) { - super(rowSet, input, timeColumn, replayer, WritableRowRedirection.FACTORY.createRowRedirection(100), - groupingColumns); + super("ReplayLastByGroupedTable", rowSet, input, timeColumn, replayer, + WritableRowRedirection.FACTORY.createRowRedirection(100), groupingColumns); // noinspection unchecked replayer.registerTimeSource(rowSet, (ColumnSource) input.get(timeColumn)); } @@ -52,7 +52,7 @@ public void run() { } final RowSet added = addedBuilder.build(); final RowSet modified = modifiedBuilder.build(); - if (added.size() > 0 || modified.size() > 0) { + if (!added.isEmpty() || !modified.isEmpty()) { getRowSet().writableCast().insert(added); notifyListeners(added, RowSetFactory.empty(), modified); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java index 91462e7e270..dc5363f90a1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTable.java @@ -6,7 +6,6 @@ import io.deephaven.base.verify.Require; import io.deephaven.engine.rowset.*; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; -import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.ColumnSource; import io.deephaven.util.QueryConstants; import org.jetbrains.annotations.NotNull; @@ -14,7 +13,7 @@ import java.time.Instant; import java.util.Map; -public class ReplayTable extends QueryTable implements Runnable { +public class ReplayTable extends ReplayTableBase implements Runnable { /** * Creates a new ReplayTable based on a row set, set of column sources, time column, and a replayer */ @@ -23,7 +22,6 @@ public class ReplayTable extends QueryTable implements Runnable { private final RowSet.Iterator rowSetIterator; private long nextRowKey = RowSequence.NULL_ROW_KEY; - private long currentTimeNanos = QueryConstants.NULL_LONG; private long nextTimeNanos = QueryConstants.NULL_LONG; private boolean done; @@ -32,7 +30,7 @@ public ReplayTable( @NotNull final Map> columns, @NotNull final String timeColumn, @NotNull final Replayer replayer) { - super(RowSetFactory.empty().toTracking(), columns); + super("ReplayTable", RowSetFactory.empty().toTracking(), columns); this.replayer = Require.neqNull(replayer, "replayer"); // NB: This will behave incorrectly if our row set or any data in columns can change. Our source table *must* // be static. We also seem to be assuming that timeSource has no null values in rowSet. It would be nice to use @@ -42,8 +40,6 @@ public ReplayTable( nanoTimeSource = ReinterpretUtils.instantToLongSource(instantSource); rowSetIterator = rowSet.iterator(); - setRefreshing(true); - advanceIterators(); if (!done) { try (final RowSet initial = advanceToCurrentTime()) { @@ -60,7 +56,7 @@ public ReplayTable( private void advanceIterators() { if (rowSetIterator.hasNext()) { nextRowKey = rowSetIterator.nextLong(); - currentTimeNanos = nextTimeNanos; + final long currentTimeNanos = nextTimeNanos; nextTimeNanos = nanoTimeSource.getLong(nextRowKey); if (nextTimeNanos == QueryConstants.NULL_LONG || nextTimeNanos < currentTimeNanos) { throw new RuntimeException( diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTableBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTableBase.java new file mode 100644 index 00000000000..c876ad0baef --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayTableBase.java @@ -0,0 +1,42 @@ +package io.deephaven.engine.table.impl.replay; + +import io.deephaven.engine.rowset.TrackingRowSet; +import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.impl.InstrumentedTableUpdateSource; +import io.deephaven.engine.table.impl.QueryTable; +import org.jetbrains.annotations.NotNull; + +import java.util.Map; + +public abstract class ReplayTableBase extends QueryTable implements Runnable { + + private final SourceRefresher sourceRefresher; + + public ReplayTableBase( + @NotNull final String description, + @NotNull final TrackingRowSet rowSet, + @NotNull final Map> columns) { + super(rowSet, columns); + setRefreshing(true); + sourceRefresher = new SourceRefresher(description); + } + + public void start() { + updateGraph.addSource(sourceRefresher); + } + + public void stop() { + updateGraph.removeSource(sourceRefresher); + } + + private class SourceRefresher extends InstrumentedTableUpdateSource { + SourceRefresher(final String description) { + super(ReplayTableBase.this, description); + } + + @Override + protected void instrumentedRefresh() { + ReplayTableBase.this.run(); + } + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/Replayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/Replayer.java index 257e1518636..2ab6a6ddb07 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/Replayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/Replayer.java @@ -6,6 +6,7 @@ import io.deephaven.base.clock.Clock; import io.deephaven.base.verify.Assert; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.table.impl.InstrumentedUpdateSource; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.table.Table; @@ -15,8 +16,8 @@ import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.SafeCloseable; -import java.io.IOException; import java.time.Instant; import java.util.TimerTask; import java.util.concurrent.CopyOnWriteArrayList; @@ -32,12 +33,14 @@ public class Replayer implements ReplayerInterface, Runnable { protected Instant startTime; protected Instant endTime; private long deltaNanos = Long.MAX_VALUE; - private CopyOnWriteArrayList currentTables = new CopyOnWriteArrayList<>(); + private CopyOnWriteArrayList currentTables = new CopyOnWriteArrayList<>(); private volatile boolean done; private boolean lastLap; + private final ReplayerHandle handle = () -> Replayer.this; private final UpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph(); + private final SourceRefresher sourceRefresher = new SourceRefresher(); // Condition variable for use with PeriodicUpdateGraph lock - the object monitor is no longer used private final Condition ugpCondition = updateGraph.exclusiveLock().newCondition(); @@ -51,7 +54,6 @@ public class Replayer implements ReplayerInterface, Runnable { public Replayer(Instant startTime, Instant endTime) { this.endTime = endTime; this.startTime = startTime; - currentTables.add(this); } /** @@ -60,8 +62,11 @@ public Replayer(Instant startTime, Instant endTime) { @Override public void start() { deltaNanos = DateTimeUtils.millisToNanos(System.currentTimeMillis()) - DateTimeUtils.epochNanos(startTime); - for (Runnable currentTable : currentTables) { - updateGraph.addSource(currentTable); + // Note: ReplayTables are allowed to run in parallel with this Replayer. However, we may wish to use an + // UpdateSourceCombiner to ensure that all of these tables are refreshed as a unit. + updateGraph.addSource(sourceRefresher); + for (ReplayTableBase currentTable : currentTables) { + currentTable.start(); } } @@ -77,17 +82,18 @@ public boolean isDone() { /** * Shuts down the replayer. - * - * @throws IOException problem shutting down the replayer. */ @Override - public void shutdown() throws IOException { + public void shutdown() { Clock clock = clock(); endTime = clock.instantNanos(); if (done) { return; } - updateGraph.removeSources(currentTables); + updateGraph.removeSource(sourceRefresher); + for (ReplayTableBase currentTable : currentTables) { + currentTable.stop(); + } currentTables = null; if (updateGraph.exclusiveLock().isHeldByCurrentThread()) { shutdownInternal(); @@ -190,11 +196,13 @@ public Table replay(Table dataSource, String timeColumn) { if (dataSource.isRefreshing()) { dataSource = dataSource.snapshot(); } - final ReplayTable result = - new ReplayTable(dataSource.getRowSet(), dataSource.getColumnSourceMap(), timeColumn, this); + final ReplayTable result; + try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { + result = new ReplayTable(dataSource.getRowSet(), dataSource.getColumnSourceMap(), timeColumn, this); + } currentTables.add(result); if (deltaNanos < Long.MAX_VALUE) { - updateGraph.addSource(result); + result.start(); } return result; } @@ -210,11 +218,14 @@ public Table replay(Table dataSource, String timeColumn) { */ @Override public Table replayGrouped(Table dataSource, String timeColumn, String groupingColumn) { - final ReplayGroupedFullTable result = new ReplayGroupedFullTable(dataSource.getRowSet(), - dataSource.getColumnSourceMap(), timeColumn, this, groupingColumn); + final ReplayGroupedFullTable result; + try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { + result = new ReplayGroupedFullTable( + dataSource.getRowSet(), dataSource.getColumnSourceMap(), timeColumn, this, groupingColumn); + } currentTables.add(result); if (deltaNanos < Long.MAX_VALUE) { - updateGraph.addSource(result); + result.start(); } return result; } @@ -229,11 +240,14 @@ public Table replayGrouped(Table dataSource, String timeColumn, String groupingC */ @Override public Table replayGroupedLastBy(Table dataSource, String timeColumn, String... groupingColumns) { - final ReplayLastByGroupedTable result = new ReplayLastByGroupedTable(dataSource.getRowSet(), - dataSource.getColumnSourceMap(), timeColumn, this, groupingColumns); + final ReplayLastByGroupedTable result; + try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { + result = new ReplayLastByGroupedTable( + dataSource.getRowSet(), dataSource.getColumnSourceMap(), timeColumn, this, groupingColumns); + } currentTables.add(result); if (deltaNanos < Long.MAX_VALUE) { - updateGraph.addSource(result); + result.start(); } return result; } @@ -260,11 +274,7 @@ public void run() { } if (lastLap) { - try { - shutdown(); - } catch (IOException e) { - e.printStackTrace(); - } + shutdown(); } Clock clock = clock(); if (clock.instantNanos().compareTo(endTime) >= 0) { @@ -272,6 +282,23 @@ public void run() { } } + private class SourceRefresher extends InstrumentedUpdateSource { + SourceRefresher() { + super(Replayer.this.updateGraph, "Replayer"); + } + + @Override + protected void instrumentedRefresh() { + Replayer.this.run(); + } + + @Override + protected void onRefreshError(Exception error) { + log.error().append("Unexpected Error Refreshing Replayer: ").append(error).endl(); + shutdown(); + } + } + private static class PeriodicTask { private final TimerTask task; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayerInterface.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayerInterface.java index 562e047ef47..4f81a4c5817 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayerInterface.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/replay/ReplayerInterface.java @@ -27,10 +27,8 @@ public interface ReplayerInterface { /** * Shuts down the replayer. - * - * @throws IOException problem shutting down the replayer. */ - void shutdown() throws IOException; + void shutdown(); /** * Wait a specified interval for the replayer to complete. If the replayer has not completed by the diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/DelayedErrorNotifier.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/DelayedErrorNotifier.java new file mode 100644 index 00000000000..4328088fdb0 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/DelayedErrorNotifier.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.engine.table.impl.util; + +import io.deephaven.engine.table.TableListener; +import io.deephaven.engine.table.impl.BaseTable; +import io.deephaven.engine.updategraph.UpdateGraph; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.lang.ref.WeakReference; + +/** + * Re-usable tool for scheduling delayed error notifications to be enqueued on the next update graph cycle. This is used + * when an error is detected, but delivering the error would violate single-notification guarantees. + */ +public final class DelayedErrorNotifier implements Runnable { + + private final Throwable error; + private final TableListener.Entry entry; + private final UpdateGraph updateGraph; + private final WeakReference> tableReference; + + public DelayedErrorNotifier( + @NotNull final Throwable error, + @Nullable final TableListener.Entry entry, + @NotNull final BaseTable table) { + this.error = error; + updateGraph = table.getUpdateGraph(); + tableReference = new WeakReference<>(table); + this.entry = entry; + updateGraph.addSource(this); + } + + @Override + public void run() { + updateGraph.removeSource(this); + + final BaseTable table = tableReference.get(); + if (table == null) { + return; + } + + table.notifyListenersOnError(error, entry); + table.forceReferenceCountToZero(); + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/FunctionGeneratedTableFactory.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/FunctionGeneratedTableFactory.java index 226d8ea0d84..3116d72dc96 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/FunctionGeneratedTableFactory.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/FunctionGeneratedTableFactory.java @@ -6,7 +6,6 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.rowset.*; import io.deephaven.engine.table.*; -import io.deephaven.engine.table.impl.BaseTable; import io.deephaven.engine.table.impl.ListenerRecorder; import io.deephaven.engine.table.impl.MergedListener; import io.deephaven.engine.table.impl.QueryTable; @@ -17,7 +16,6 @@ import io.deephaven.util.annotations.ReferentialIntegrity; import org.jetbrains.annotations.NotNull; -import java.lang.ref.WeakReference; import java.util.*; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -273,7 +271,7 @@ private void doRefresh() { if (parentListener != null) { parentListener.forceReferenceCountToZero(); } - delayedErrorReference = new DelayedErrorNotifier(e, this); + delayedErrorReference = new DelayedErrorNotifier(e, null, this); } else { notifyListenersOnError(e, null); forceReferenceCountToZero(); @@ -292,32 +290,4 @@ public void destroy() { } } } - - private static final class DelayedErrorNotifier implements Runnable { - - private final Throwable error; - private final UpdateGraph updateGraph; - private final WeakReference> tableReference; - - private DelayedErrorNotifier(@NotNull final Throwable error, - @NotNull final BaseTable table) { - this.error = error; - updateGraph = table.getUpdateGraph(); - tableReference = new WeakReference<>(table); - updateGraph.addSource(this); - } - - @Override - public void run() { - updateGraph.removeSource(this); - - final BaseTable table = tableReference.get(); - if (table == null) { - return; - } - - table.notifyListenersOnError(error, null); - table.forceReferenceCountToZero(); - } - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/BaseUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/BaseUpdateGraph.java index e2d369f8012..656ae518ba9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/BaseUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/BaseUpdateGraph.java @@ -57,8 +57,8 @@ public abstract class BaseUpdateGraph implements UpdateGraph, LogOutputAppendabl */ @Nullable public static PerformanceEntry createUpdatePerformanceEntry( - final UpdateGraph updateGraph, - final String description) { + @Nullable final UpdateGraph updateGraph, + @Nullable final String description) { if (updateGraph instanceof BaseUpdateGraph) { final BaseUpdateGraph bug = (BaseUpdateGraph) updateGraph; if (bug.updatePerformanceTracker != null) { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/CapturingUpdateGraph.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/CapturingUpdateGraph.java index 63e98610be9..7201e58b196 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/CapturingUpdateGraph.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/CapturingUpdateGraph.java @@ -12,9 +12,9 @@ import io.deephaven.util.locks.AwareFunctionalLock; import org.jetbrains.annotations.NotNull; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; /** @@ -27,7 +27,7 @@ public class CapturingUpdateGraph implements UpdateGraph { private final ExecutionContext context; - private final List sources = new ArrayList<>(); + private final Queue sources = new ConcurrentLinkedQueue<>(); public CapturingUpdateGraph(@NotNull final ControlledUpdateGraph delegate) { this.delegate = delegate; @@ -52,6 +52,10 @@ void refreshSources() { sources.forEach(Runnable::run); } + void markSourcesRefreshedForUnitTests() { + delegate.markSourcesRefreshedForUnitTests(); + } + @Override public LogOutput append(LogOutput logOutput) { return logOutput.append("CapturingUpdateGraph of ").append(delegate); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java index 6fad0d53e52..4762e4f49f1 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java @@ -116,10 +116,10 @@ public void testAddAndRemoveLocations() { allowingError(() -> updateGraph.getDelegate().runWithinUnitTestCycle(() -> { updateGraph.refreshSources(); + updateGraph.markSourcesRefreshedForUnitTests(); registrar.run(); - }), errors -> errors.size() == 1 && - FindExceptionCause.isOrCausedBy(errors.get(0), - TableLocationRemovedException.class).isPresent()); + }, false), errors -> errors.size() == 1 && + FindExceptionCause.isOrCausedBy(errors.get(0), TableLocationRemovedException.class).isPresent()); getUpdateErrors().clear(); assertEquals(1, partitionTable.size()); @@ -147,10 +147,10 @@ public void testAddAndRemoveLocations() { allowingError(() -> updateGraph.getDelegate().runWithinUnitTestCycle(() -> { updateGraph.refreshSources(); + updateGraph.markSourcesRefreshedForUnitTests(); registrar.run(); - }), errors -> errors.size() == 1 && - FindExceptionCause.isOrCausedBy(errors.get(0), - TableLocationRemovedException.class).isPresent()); + }, false), errors -> errors.size() == 1 && + FindExceptionCause.isOrCausedBy(errors.get(0), TableLocationRemovedException.class).isPresent()); getUpdateErrors().clear(); assertEquals(2, partitionTable.size()); @@ -163,10 +163,11 @@ public void testAddAndRemoveLocations() { // The TableBackedTableLocation has a copy() of the p3 table which is itself a leaf. Erroring P3 will // cause one error to come from the copied table, and one from the merged() table. We just need to validate // that the exceptions we see are a ConstituentTableException and an ISE - allowingError(() -> updateGraph.getDelegate().runWithinUnitTestCycle( - () -> p3.notifyListenersOnError(new IllegalStateException("This is a test error"), null)), - errors -> errors.size() == 1 && - FindExceptionCause.isOrCausedBy(errors.get(0), IllegalStateException.class).isPresent()); + allowingError(() -> updateGraph.getDelegate().runWithinUnitTestCycle(() -> { + p3.notifyListenersOnError(new IllegalStateException("This is a test error"), null); + updateGraph.markSourcesRefreshedForUnitTests(); + }, false), errors -> errors.size() == 1 && + FindExceptionCause.isOrCausedBy(errors.get(0), IllegalStateException.class).isPresent()); } /** @@ -197,10 +198,10 @@ public void testRemoveAndFail() { allowingError(() -> updateGraph.getDelegate().runWithinUnitTestCycle(() -> { // This should process the pending update from the refresh above. updateGraph.refreshSources(); + updateGraph.markSourcesRefreshedForUnitTests(); registrar.run(); - }), - errors -> errors.size() == 1 && - FindExceptionCause.isOrCausedBy(errors.get(0), TableDataException.class).isPresent()); + }, false), errors -> errors.size() == 1 && + FindExceptionCause.isOrCausedBy(errors.get(0), TableDataException.class).isPresent()); getUpdateErrors().clear(); // Then delete it for real @@ -210,8 +211,9 @@ public void testRemoveAndFail() { // We should NOT get an error here because the failed table should have removed itself from the registrar. updateGraph.getDelegate().runWithinUnitTestCycle(() -> { updateGraph.refreshSources(); + updateGraph.markSourcesRefreshedForUnitTests(); registrar.run(); - }); + }, false); assertEquals(1, partitionTable.size()); try (final CloseableIterator tableIt = partitionTable.columnIterator("LocationTable")) { @@ -240,8 +242,9 @@ public void testCantReadPrev() { allowingError(() -> updateGraph.runWithinUnitTestCycle(() -> { updateGraph.refreshSources(); + updateGraph.markSourcesRefreshedForUnitTests(); registrar.run(); - }), errors -> errors.stream().anyMatch(e -> FindExceptionCause.isOrCausedBy(e, + }, false), errors -> errors.stream().anyMatch(e -> FindExceptionCause.isOrCausedBy(e, InvalidatedRegionException.class).isPresent()) && errors.stream().anyMatch(e -> FindExceptionCause.isOrCausedBy(e, TableLocationRemovedException.class).isPresent())); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestPartitionAwareSourceTable.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestPartitionAwareSourceTable.java index d162aca1717..229ea31a34b 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestPartitionAwareSourceTable.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestPartitionAwareSourceTable.java @@ -387,7 +387,10 @@ public Object invoke(Invocation invocation) { errorNotification.reset(); final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); - updateGraph.runWithinUnitTestCycle(SUT::refresh); + updateGraph.runWithinUnitTestCycle(() -> { + SUT.refresh(); + updateGraph.markSourcesRefreshedForUnitTests(); + }, false); assertIsSatisfied(); errorNotification.assertInvoked(); diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java index 6989d7119ba..813cd115d5e 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java @@ -136,7 +136,7 @@ public T allowingError(Supplier function, Predicate> erro } finally { setExpectError(original); } - if (!errorsAcceptable.test(errors)) { + if (errors != null && !errorsAcceptable.test(errors)) { TestCase.fail("Unacceptable errors: " + errors); } return retval; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java index af16945c7ba..8622c70b472 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java @@ -11,7 +11,7 @@ import io.deephaven.chunk.util.pools.ChunkPoolConstants; import io.deephaven.configuration.Configuration; import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.table.impl.InstrumentedUpdateSource; +import io.deephaven.engine.table.impl.InstrumentedTableUpdateSource; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.UpdateSourceRegistrar; @@ -266,10 +266,10 @@ private synchronized void tryToDeliverErrorToCallback(final Throwable err) { } } - private class SourceRefresher extends InstrumentedUpdateSource { + private class SourceRefresher extends InstrumentedTableUpdateSource { SourceRefresher() { - super(updateGraph, "BarrageTable(" + System.identityHashCode(BarrageTable.this) + super(BarrageTable.this, "BarrageTable(" + System.identityHashCode(BarrageTable.this) + (stats != null ? ") " + stats.tableKey : ")")); } @@ -282,13 +282,9 @@ protected void instrumentedRefresh() { } catch (Throwable err) { beginLog(LogLevel.ERROR).append(": Failure during BarrageTable instrumentedRefresh: ") .append(err).endl(); - notifyListenersOnError(err, null); - tryToDeliverErrorToCallback(err); - if (err instanceof Error) { - // rethrow if this was an error (which should not be swallowed) - throw err; - } + // rethrow for the caller to handle + throw err; } } }