Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReplayTable: Only Fail Table (not the server) on Error #5020

Merged
merged 9 commits into from
Jan 10, 2024
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.engine.table.impl;

import io.deephaven.engine.table.impl.util.DelayedErrorNotifier;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.util.annotations.ReferentialIntegrity;
import org.jetbrains.annotations.NotNull;

import java.lang.ref.WeakReference;

public abstract class InstrumentedTableUpdateSource extends InstrumentedUpdateSource {

private final WeakReference<BaseTable<?>> tableReference;

@ReferentialIntegrity
private Runnable delayedErrorReference;

public InstrumentedTableUpdateSource(
final UpdateSourceRegistrar updateSourceRegistrar,
final BaseTable<?> table,
final String description) {
super(updateSourceRegistrar, description);
tableReference = new WeakReference<>(table);
}

public InstrumentedTableUpdateSource(final BaseTable<?> table, final String description) {
this(table.getUpdateGraph(), table, description);
}

@Override
protected final void onRefreshError(@NotNull final Exception error) {
final BaseTable<?> table = tableReference.get();
if (table == null) {
return;
}

if (table.satisfied(updateSourceRegistrar.getUpdateGraph().clock().currentStep())) {
// If the result is already satisfied (because it managed to send its notification, or was otherwise
// satisfied) we should not send our error notification on this cycle.
if (!table.isFailed()) {
// If the result isn't failed, we need to mark it as such on the next cycle.
delayedErrorReference = new DelayedErrorNotifier(error, entry, table);
}
} else {
table.notifyListenersOnError(error, entry);
table.forceReferenceCountToZero();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,25 @@
package io.deephaven.engine.table.impl;

import io.deephaven.engine.table.impl.perf.PerformanceEntry;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import org.jetbrains.annotations.NotNull;

import javax.annotation.Nullable;
import java.util.Objects;

public abstract class InstrumentedUpdateSource implements Runnable {

protected final UpdateSourceRegistrar updateSourceRegistrar;
@Nullable
protected final PerformanceEntry entry;

public InstrumentedUpdateSource(final UpdateGraph updateGraph, final String description) {
this.entry = PeriodicUpdateGraph.createUpdatePerformanceEntry(updateGraph, description);
public InstrumentedUpdateSource(
@NotNull final UpdateSourceRegistrar updateSourceRegistrar,
@Nullable final String description) {
this.updateSourceRegistrar = Objects.requireNonNull(updateSourceRegistrar);
this.entry = PeriodicUpdateGraph.createUpdatePerformanceEntry(
updateSourceRegistrar.getUpdateGraph(), description);
}

@Override
Expand All @@ -25,6 +32,9 @@ public final void run() {
}
try {
instrumentedRefresh();
} catch (final Exception error) {
updateSourceRegistrar.removeSource(this);
onRefreshError(error);
} finally {
if (entry != null) {
entry.onUpdateEnd();
Expand All @@ -33,4 +43,6 @@ public final void run() {
}

protected abstract void instrumentedRefresh();

protected abstract void onRefreshError(Exception error);
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ private UnderlyingTableMaintainer(
IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance());
readyLocationStates = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance());
processNewLocationsUpdateRoot = new InstrumentedUpdateSource(
result.getUpdateGraph(),
processNewLocationsUpdateRoot = new InstrumentedTableUpdateSource(
result,
SourcePartitionedTable.class.getSimpleName() + '[' + tableLocationProvider + ']'
+ "-processPendingLocations") {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,46 +209,39 @@ private WritableRowSet refreshLocationSizes() {
}
}

private class LocationChangePoller extends InstrumentedUpdateSource {
private class LocationChangePoller extends InstrumentedTableUpdateSource {
private final TableLocationSubscriptionBuffer locationBuffer;

private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer locationBuffer) {
super(updateGraph, description + ".rowSetUpdateSource");
super(SourceTable.this.updateSourceRegistrar, SourceTable.this, description + ".rowSetUpdateSource");
this.locationBuffer = locationBuffer;
}

@Override
protected void instrumentedRefresh() {
try {
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending();
final ImmutableTableLocationKey[] removedKeys =
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
if (removedKeys.length > 0) {
throw new TableLocationRemovedException("Source table does not support removed locations",
removedKeys);
}
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());

// NB: This class previously had functionality to notify "location listeners", but it was never used.
// Resurrect from git history if needed.
if (!locationSizesInitialized) {
// We don't want to start polling size changes until the initial RowSet has been computed.
return;
}

final RowSet added = refreshLocationSizes();
if (added.isEmpty()) {
return;
}
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending();
final ImmutableTableLocationKey[] removedKeys =
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
if (removedKeys.length > 0) {
throw new TableLocationRemovedException("Source table does not support removed locations",
removedKeys);
}
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());

rowSet.insert(added);
notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty());
} catch (Exception e) {
updateSourceRegistrar.removeSource(this);
// NB: This class previously had functionality to notify "location listeners", but it was never used.
// Resurrect from git history if needed.
if (!locationSizesInitialized) {
// We don't want to start polling size changes until the initial RowSet has been computed.
return;
}

// Notify listeners to the SourceTable when we had an issue refreshing available locations.
notifyListenersOnError(e, null);
final RowSet added = refreshLocationSizes();
if (added.isEmpty()) {
return;
}

rowSet.insert(added);
notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ public void run() {
refresh(true);
}

private class SourceRefresher extends InstrumentedUpdateSource {
private class SourceRefresher extends InstrumentedTableUpdateSource {

public SourceRefresher() {
super(updateGraph, name);
super(registrar, TimeTable.this, name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,21 @@
*/
package io.deephaven.engine.table.impl.replay;


import io.deephaven.base.verify.Require;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.table.impl.indexer.RowSetIndexer;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.sources.RedirectedColumnSource;
import io.deephaven.engine.table.TupleSource;
import io.deephaven.engine.table.impl.TupleSourceFactory;
import io.deephaven.engine.table.impl.util.*;
import org.jetbrains.annotations.NotNull;

import java.time.Instant;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.*;

public abstract class QueryReplayGroupedTable extends QueryTable implements Runnable {
public abstract class QueryReplayGroupedTable extends ReplayTableBase implements Runnable {


protected final WritableRowRedirection rowRedirection;
Expand All @@ -39,7 +34,7 @@ private static Map<String, ColumnSource<?>> getResultSources(Map<String, ? exten
return result;
}

static class IteratorsAndNextTime implements Comparable<IteratorsAndNextTime> {
protected static class IteratorsAndNextTime implements Comparable<IteratorsAndNextTime> {

private final RowSet.Iterator iterator;
private final ColumnSource<Instant> columnSource;
Expand Down Expand Up @@ -74,11 +69,19 @@ public int compareTo(IteratorsAndNextTime o) {
}
}

protected QueryReplayGroupedTable(TrackingRowSet rowSet, Map<String, ? extends ColumnSource<?>> input,
String timeColumn, Replayer replayer, WritableRowRedirection rowRedirection, String[] groupingColumns) {
protected QueryReplayGroupedTable(
@NotNull final String description,
@NotNull final TrackingRowSet rowSet,
@NotNull final Map<String, ? extends ColumnSource<?>> input,
@NotNull final String timeColumn,
@NotNull final Replayer replayer,
@NotNull final WritableRowRedirection rowRedirection,
@NotNull final String[] groupingColumns) {

super(RowSetFactory.empty().toTracking(), getResultSources(input, rowRedirection));
super(description, RowSetFactory.empty().toTracking(), getResultSources(input, rowRedirection));
this.rowRedirection = rowRedirection;
this.replayer = Objects.requireNonNull(replayer, "replayer");

Map<Object, RowSet> grouping;

final ColumnSource<?>[] columnSources =
Expand All @@ -95,9 +98,6 @@ protected QueryReplayGroupedTable(TrackingRowSet rowSet, Map<String, ? extends C
allIterators.add(new IteratorsAndNextTime(iterator, timeSource, pos++));
}
}
Require.requirement(replayer != null, "replayer != null");
setRefreshing(true);
this.replayer = replayer;
run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ public class ReplayGroupedFullTable extends QueryReplayGroupedTable {
public ReplayGroupedFullTable(TrackingRowSet rowSet, Map<String, ? extends ColumnSource<?>> input,
String timeColumn,
Replayer replayer, String groupingColumn) {
super(rowSet, input, timeColumn, replayer,
super("ReplayGroupedFullTable", rowSet, input, timeColumn, replayer,
WritableRowRedirection.FACTORY.createRowRedirection((int) rowSet.size()),
new String[] {groupingColumn});
redirIndexSize = 0;
// We do not modify existing entries in the WritableRowRedirection (we only add at the end), so there's no need
// to
// ask the WritableRowRedirection to track previous values.
// to ask the WritableRowRedirection to track previous values.
}

@Override
Expand All @@ -46,7 +45,7 @@ public void run() {
}
}
final RowSet added = rowSetBuilder.build();
if (added.size() > 0) {
if (!added.isEmpty()) {
getRowSet().writableCast().insert(added);
notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ public class ReplayLastByGroupedTable extends QueryReplayGroupedTable {
public ReplayLastByGroupedTable(TrackingRowSet rowSet, Map<String, ? extends ColumnSource<?>> input,
String timeColumn,
Replayer replayer, String[] groupingColumns) {
super(rowSet, input, timeColumn, replayer, WritableRowRedirection.FACTORY.createRowRedirection(100),
groupingColumns);
super("ReplayLastByGroupedTable", rowSet, input, timeColumn, replayer,
WritableRowRedirection.FACTORY.createRowRedirection(100), groupingColumns);
// noinspection unchecked
replayer.registerTimeSource(rowSet, (ColumnSource<Instant>) input.get(timeColumn));
}
Expand Down Expand Up @@ -52,7 +52,7 @@ public void run() {
}
final RowSet added = addedBuilder.build();
final RowSet modified = modifiedBuilder.build();
if (added.size() > 0 || modified.size() > 0) {
if (!added.isEmpty() || !modified.isEmpty()) {
getRowSet().writableCast().insert(added);
notifyListeners(added, RowSetFactory.empty(), modified);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
import io.deephaven.base.verify.Require;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.util.QueryConstants;
import org.jetbrains.annotations.NotNull;

import java.time.Instant;
import java.util.Map;

public class ReplayTable extends QueryTable implements Runnable {
public class ReplayTable extends ReplayTableBase implements Runnable {
/**
* Creates a new ReplayTable based on a row set, set of column sources, time column, and a replayer
*/
Expand All @@ -23,7 +22,6 @@ public class ReplayTable extends QueryTable implements Runnable {
private final RowSet.Iterator rowSetIterator;

private long nextRowKey = RowSequence.NULL_ROW_KEY;
private long currentTimeNanos = QueryConstants.NULL_LONG;
private long nextTimeNanos = QueryConstants.NULL_LONG;
private boolean done;

Expand All @@ -32,7 +30,7 @@ public ReplayTable(
@NotNull final Map<String, ? extends ColumnSource<?>> columns,
@NotNull final String timeColumn,
@NotNull final Replayer replayer) {
super(RowSetFactory.empty().toTracking(), columns);
super("ReplayTable", RowSetFactory.empty().toTracking(), columns);
this.replayer = Require.neqNull(replayer, "replayer");
// NB: This will behave incorrectly if our row set or any data in columns can change. Our source table *must*
// be static. We also seem to be assuming that timeSource has no null values in rowSet. It would be nice to use
Expand All @@ -42,8 +40,6 @@ public ReplayTable(
nanoTimeSource = ReinterpretUtils.instantToLongSource(instantSource);
rowSetIterator = rowSet.iterator();

setRefreshing(true);

advanceIterators();
if (!done) {
try (final RowSet initial = advanceToCurrentTime()) {
Expand All @@ -60,7 +56,7 @@ public ReplayTable(
private void advanceIterators() {
if (rowSetIterator.hasNext()) {
nextRowKey = rowSetIterator.nextLong();
currentTimeNanos = nextTimeNanos;
final long currentTimeNanos = nextTimeNanos;
nextTimeNanos = nanoTimeSource.getLong(nextRowKey);
if (nextTimeNanos == QueryConstants.NULL_LONG || nextTimeNanos < currentTimeNanos) {
throw new RuntimeException(
Expand Down
Loading
Loading