Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReplayTable: Only Fail Table (not the server) on Error #5020

Merged
merged 9 commits into from
Jan 10, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.util.QueryConstants;
import org.jetbrains.annotations.NotNull;

Expand All @@ -19,21 +20,23 @@ public class ReplayTable extends QueryTable implements Runnable {
* Creates a new ReplayTable based on a row set, set of column sources, time column, and a replayer
*/
private final Replayer replayer;
private final UpdateSourceRegistrar updateSourceRegistrar;
private final ColumnSource<Long> nanoTimeSource;
private final RowSet.Iterator rowSetIterator;

private long nextRowKey = RowSequence.NULL_ROW_KEY;
private long currentTimeNanos = QueryConstants.NULL_LONG;
private long nextTimeNanos = QueryConstants.NULL_LONG;
private boolean done;

public ReplayTable(
@NotNull final RowSet rowSet,
@NotNull final Map<String, ? extends ColumnSource<?>> columns,
@NotNull final String timeColumn,
@NotNull final Replayer replayer) {
@NotNull final Replayer replayer,
@NotNull final UpdateSourceRegistrar updateSourceRegistrar) {
super(RowSetFactory.empty().toTracking(), columns);
this.replayer = Require.neqNull(replayer, "replayer");
this.updateSourceRegistrar = updateSourceRegistrar;
// NB: This will behave incorrectly if our row set or any data in columns can change. Our source table *must*
// be static. We also seem to be assuming that timeSource has no null values in rowSet. It would be nice to use
// a column iterator for this, but that would upset unit tests by keeping pooled chunks across cycles.
Expand All @@ -60,7 +63,7 @@ public ReplayTable(
private void advanceIterators() {
if (rowSetIterator.hasNext()) {
nextRowKey = rowSetIterator.nextLong();
currentTimeNanos = nextTimeNanos;
final long currentTimeNanos = nextTimeNanos;
nextTimeNanos = nanoTimeSource.getLong(nextRowKey);
if (nextTimeNanos == QueryConstants.NULL_LONG || nextTimeNanos < currentTimeNanos) {
throw new RuntimeException(
Expand Down Expand Up @@ -91,12 +94,19 @@ public void run() {
if (done) {
return;
}
final RowSet added = advanceToCurrentTime();
if (added.isNonempty()) {
getRowSet().writableCast().insert(added);
notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty());
} else {
added.close();
try {
final RowSet added = advanceToCurrentTime();
if (added.isNonempty()) {
getRowSet().writableCast().insert(added);
notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty());
} else {
added.close();
}
} catch (final Exception err) {
updateSourceRegistrar.removeSource(this);

// propagate the error to our listeners
notifyListenersOnError(err, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ public Table replay(Table dataSource, String timeColumn) {
if (dataSource.isRefreshing()) {
dataSource = dataSource.snapshot();
}
final ReplayTable result =
new ReplayTable(dataSource.getRowSet(), dataSource.getColumnSourceMap(), timeColumn, this);
final ReplayTable result = new ReplayTable(
dataSource.getRowSet(), dataSource.getColumnSourceMap(), timeColumn, this, updateGraph);
currentTables.add(result);
if (deltaNanos < Long.MAX_VALUE) {
updateGraph.addSource(result);
Expand Down
Loading