diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index 862cb131855..e45a86b33c6 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -565,6 +565,9 @@ public final static class ExportObject extends LivenessArtifact { /** used to detect when this object is ready for export (is visible for atomic int field updater) */ private volatile int dependentCount = -1; + /** our first parent that was already released prior to having dependencies set if one exists */ + private ExportObject alreadyDeadParent; + @SuppressWarnings("unchecked") private static final AtomicIntegerFieldUpdater> DEPENDENT_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater((Class>) (Class) ExportObject.class, @@ -648,7 +651,14 @@ private synchronized void setDependencies(final List> parents) { this.parents = parents; dependentCount = parents.size(); - parents.stream().filter(Objects::nonNull).forEach(this::tryManage); + for (final ExportObject parent : parents) { + if (parent != null && !tryManage(parent)) { + // we've failed; let's cleanup already managed parents + forceReferenceCountToZero(); + alreadyDeadParent = parent; + break; + } + } if (log.isDebugEnabled()) { final Exception e = new RuntimeException(); @@ -678,18 +688,24 @@ private synchronized void setWork( throw new IllegalStateException("export object can only be defined once"); } hasHadWorkSet = true; - if (queryPerformanceRecorder != null && queryPerformanceRecorder.getState() == QueryState.RUNNING) { // transfer ownership of the qpr to the export before it can be resumed by the scheduler queryPerformanceRecorder.suspendQuery(); } this.requiresSerialQueue = requiresSerialQueue; + // we defer this type of failure until setWork for consistency in error handling + if (alreadyDeadParent != null) { + onDependencyFailure(alreadyDeadParent); + alreadyDeadParent = null; + } + if (isExportStateTerminal(state)) { // The following scenarios cause us to get into this state: // - this export object was released/cancelled // - the session expiration propagated to this export object - // Note that already failed dependencies will be handled in the onResolveOne method below. + // - a parent export was released/dead prior to `setDependencies` + // Note that failed dependencies will be handled in the onResolveOne method below. // since this is the first we know of the errorHandler, it could not have been invoked yet if (errorHandler != null) { @@ -891,44 +907,12 @@ private void onResolveOne(@Nullable final ExportObject parent) { return; } - // is this a cascading failure? - if (parent != null && isExportStateTerminal(parent.state)) { - synchronized (this) { - errorId = parent.errorId; - if (parent.caughtException instanceof StatusRuntimeException) { - caughtException = parent.caughtException; - } - ExportNotification.State terminalState = ExportNotification.State.DEPENDENCY_FAILED; - - if (errorId == null) { - final String errorDetails; - switch (parent.state) { - case RELEASED: - terminalState = ExportNotification.State.DEPENDENCY_RELEASED; - errorDetails = "dependency released by user."; - break; - case CANCELLED: - terminalState = ExportNotification.State.DEPENDENCY_CANCELLED; - errorDetails = "dependency cancelled by user."; - break; - default: - // Note: the other error states should have non-null errorId - errorDetails = "dependency does not have its own error defined " + - "and is in an unexpected state: " + parent.state; - break; - } - - maybeAssignErrorId(); - failedDependencyLogIdentity = parent.logIdentity; - if (!(caughtException instanceof StatusRuntimeException)) { - log.error().append("Internal Error '").append(errorId).append("' ").append(errorDetails) - .endl(); - } - } - - setState(terminalState); - return; - } + // Is this a cascading failure? Note that we manage the parents in `setDependencies` which + // keeps the parent results live until this child been exported. This means that the parent is allowed to + // be in a RELEASED state, but is not allowed to be in a failure state. + if (parent != null && isExportStateFailure(parent.state)) { + onDependencyFailure(parent); + return; } final int newDepCount = DEPENDENT_COUNT_UPDATER.decrementAndGet(this); @@ -1045,6 +1029,42 @@ private void maybeAssignErrorId() { } } + private synchronized void onDependencyFailure(final ExportObject parent) { + errorId = parent.errorId; + if (parent.caughtException instanceof StatusRuntimeException) { + caughtException = parent.caughtException; + } + ExportNotification.State terminalState = ExportNotification.State.DEPENDENCY_FAILED; + + if (errorId == null) { + final String errorDetails; + switch (parent.state) { + case RELEASED: + terminalState = ExportNotification.State.DEPENDENCY_RELEASED; + errorDetails = "dependency released by user."; + break; + case CANCELLED: + terminalState = ExportNotification.State.DEPENDENCY_CANCELLED; + errorDetails = "dependency cancelled by user."; + break; + default: + // Note: the other error states should have non-null errorId + errorDetails = "dependency does not have its own error defined " + + "and is in an unexpected state: " + parent.state; + break; + } + + maybeAssignErrorId(); + failedDependencyLogIdentity = parent.logIdentity; + if (!(caughtException instanceof StatusRuntimeException)) { + log.error().append("Internal Error '").append(errorId).append("' ").append(errorDetails) + .endl(); + } + } + + setState(terminalState); + } + /** * Sets the final result for this export. * diff --git a/server/src/test/java/io/deephaven/server/session/SessionStateTest.java b/server/src/test/java/io/deephaven/server/session/SessionStateTest.java index 8a14ea1c675..6ee2cb0b4af 100644 --- a/server/src/test/java/io/deephaven/server/session/SessionStateTest.java +++ b/server/src/test/java/io/deephaven/server/session/SessionStateTest.java @@ -579,11 +579,14 @@ public void testNewExportRequiresPositiveId() { @Test public void testDependencyAlreadyReleased() { - final SessionState.ExportObject e1 = session.newExport(nextExportId++).submit(() -> { - }); - scheduler.runUntilQueueEmpty(); - e1.release(); - Assert.eq(e1.getState(), "e1.getState()", ExportNotification.State.RELEASED); + final SessionState.ExportObject e1; + try (final SafeCloseable ignored = LivenessScopeStack.open()) { + e1 = session.newExport(nextExportId++).submit(() -> { + }); + scheduler.runUntilQueueEmpty(); + e1.release(); + Assert.eq(e1.getState(), "e1.getState()", ExportNotification.State.RELEASED); + } final MutableBoolean errored = new MutableBoolean(); final MutableBoolean success = new MutableBoolean();