From c78cae3c1d5c9ccab8eb0f4f3a6850074ee4ddd9 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Mon, 8 Jan 2024 12:31:01 -0700 Subject: [PATCH 1/4] Allow ExportObject to be RELEASED When Already Managed --- .../server/session/SessionState.java | 96 +++++++++++-------- .../server/session/SessionStateTest.java | 13 ++- 2 files changed, 63 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index 862cb131855..6a29b707014 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -565,6 +565,9 @@ public final static class ExportObject extends LivenessArtifact { /** used to detect when this object is ready for export (is visible for atomic int field updater) */ private volatile int dependentCount = -1; + /** used to detect a parent that was already released prior to having dependencies set */ + private ExportObject alreadyDeadParent; + @SuppressWarnings("unchecked") private static final AtomicIntegerFieldUpdater> DEPENDENT_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater((Class>) (Class) ExportObject.class, @@ -648,7 +651,9 @@ private synchronized void setDependencies(final List> parents) { this.parents = parents; dependentCount = parents.size(); - parents.stream().filter(Objects::nonNull).forEach(this::tryManage); + alreadyDeadParent = parents.stream() + .filter(p -> p != null && !tryManage(p)) + .findFirst().orElse(null); if (log.isDebugEnabled()) { final Exception e = new RuntimeException(); @@ -678,18 +683,23 @@ private synchronized void setWork( throw new IllegalStateException("export object can only be defined once"); } hasHadWorkSet = true; - if (queryPerformanceRecorder != null && queryPerformanceRecorder.getState() == QueryState.RUNNING) { // transfer ownership of the qpr to the export before it can be resumed by the scheduler queryPerformanceRecorder.suspendQuery(); } this.requiresSerialQueue = requiresSerialQueue; + // we defer this type of failure until setWork for consistency in error handling + if (alreadyDeadParent != null) { + onDependencyFailure(alreadyDeadParent); + } + if (isExportStateTerminal(state)) { // The following scenarios cause us to get into this state: // - this export object was released/cancelled // - the session expiration propagated to this export object - // Note that already failed dependencies will be handled in the onResolveOne method below. + // - a parent export was released/dead prior to `setDependencies` + // Note that failed dependencies will be handled in the onResolveOne method below. // since this is the first we know of the errorHandler, it could not have been invoked yet if (errorHandler != null) { @@ -891,44 +901,12 @@ private void onResolveOne(@Nullable final ExportObject parent) { return; } - // is this a cascading failure? - if (parent != null && isExportStateTerminal(parent.state)) { - synchronized (this) { - errorId = parent.errorId; - if (parent.caughtException instanceof StatusRuntimeException) { - caughtException = parent.caughtException; - } - ExportNotification.State terminalState = ExportNotification.State.DEPENDENCY_FAILED; - - if (errorId == null) { - final String errorDetails; - switch (parent.state) { - case RELEASED: - terminalState = ExportNotification.State.DEPENDENCY_RELEASED; - errorDetails = "dependency released by user."; - break; - case CANCELLED: - terminalState = ExportNotification.State.DEPENDENCY_CANCELLED; - errorDetails = "dependency cancelled by user."; - break; - default: - // Note: the other error states should have non-null errorId - errorDetails = "dependency does not have its own error defined " + - "and is in an unexpected state: " + parent.state; - break; - } - - maybeAssignErrorId(); - failedDependencyLogIdentity = parent.logIdentity; - if (!(caughtException instanceof StatusRuntimeException)) { - log.error().append("Internal Error '").append(errorId).append("' ").append(errorDetails) - .endl(); - } - } - - setState(terminalState); - return; - } + // Is this a cascading failure? Note that we increment the parent reference count in `setDependencies` which + // keeps the result live until all children have been exported. This means that the parent is allowed to + // be in a RELEASED state, but is not allowed to be in a failure state. + if (parent != null && isExportStateFailure(parent.state)) { + onDependencyFailure(parent); + return; } final int newDepCount = DEPENDENT_COUNT_UPDATER.decrementAndGet(this); @@ -1045,6 +1023,42 @@ private void maybeAssignErrorId() { } } + private synchronized void onDependencyFailure(final ExportObject parent) { + errorId = parent.errorId; + if (parent.caughtException instanceof StatusRuntimeException) { + caughtException = parent.caughtException; + } + ExportNotification.State terminalState = ExportNotification.State.DEPENDENCY_FAILED; + + if (errorId == null) { + final String errorDetails; + switch (parent.state) { + case RELEASED: + terminalState = ExportNotification.State.DEPENDENCY_RELEASED; + errorDetails = "dependency released by user."; + break; + case CANCELLED: + terminalState = ExportNotification.State.DEPENDENCY_CANCELLED; + errorDetails = "dependency cancelled by user."; + break; + default: + // Note: the other error states should have non-null errorId + errorDetails = "dependency does not have its own error defined " + + "and is in an unexpected state: " + parent.state; + break; + } + + maybeAssignErrorId(); + failedDependencyLogIdentity = parent.logIdentity; + if (!(caughtException instanceof StatusRuntimeException)) { + log.error().append("Internal Error '").append(errorId).append("' ").append(errorDetails) + .endl(); + } + } + + setState(terminalState); + } + /** * Sets the final result for this export. * diff --git a/server/src/test/java/io/deephaven/server/session/SessionStateTest.java b/server/src/test/java/io/deephaven/server/session/SessionStateTest.java index 8a14ea1c675..6ee2cb0b4af 100644 --- a/server/src/test/java/io/deephaven/server/session/SessionStateTest.java +++ b/server/src/test/java/io/deephaven/server/session/SessionStateTest.java @@ -579,11 +579,14 @@ public void testNewExportRequiresPositiveId() { @Test public void testDependencyAlreadyReleased() { - final SessionState.ExportObject e1 = session.newExport(nextExportId++).submit(() -> { - }); - scheduler.runUntilQueueEmpty(); - e1.release(); - Assert.eq(e1.getState(), "e1.getState()", ExportNotification.State.RELEASED); + final SessionState.ExportObject e1; + try (final SafeCloseable ignored = LivenessScopeStack.open()) { + e1 = session.newExport(nextExportId++).submit(() -> { + }); + scheduler.runUntilQueueEmpty(); + e1.release(); + Assert.eq(e1.getState(), "e1.getState()", ExportNotification.State.RELEASED); + } final MutableBoolean errored = new MutableBoolean(); final MutableBoolean success = new MutableBoolean(); From e9fc4142816bab9498ea23aebd83b4006093f9a4 Mon Sep 17 00:00:00 2001 From: Nate Bauernfeind Date: Tue, 9 Jan 2024 09:17:03 -0700 Subject: [PATCH 2/4] Update server/src/main/java/io/deephaven/server/session/SessionState.java Co-authored-by: Ryan Caudy --- .../main/java/io/deephaven/server/session/SessionState.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index 6a29b707014..abba7f053db 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -901,8 +901,8 @@ private void onResolveOne(@Nullable final ExportObject parent) { return; } - // Is this a cascading failure? Note that we increment the parent reference count in `setDependencies` which - // keeps the result live until all children have been exported. This means that the parent is allowed to + // Is this a cascading failure? Note that we manage the parents in `setDependencies` which + // keeps the parent results live until this child been exported. This means that the parent is allowed to // be in a RELEASED state, but is not allowed to be in a failure state. if (parent != null && isExportStateFailure(parent.state)) { onDependencyFailure(parent); From b5946e1509175fff0cdedee6cc98044088098dce Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Tue, 9 Jan 2024 09:21:46 -0700 Subject: [PATCH 3/4] Ryan/Devin Feedback --- .../server/session/SessionState.java | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index abba7f053db..db63790e0bb 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -565,7 +565,7 @@ public final static class ExportObject extends LivenessArtifact { /** used to detect when this object is ready for export (is visible for atomic int field updater) */ private volatile int dependentCount = -1; - /** used to detect a parent that was already released prior to having dependencies set */ + /** our first parent that was already released prior to having dependencies set if one exists */ private ExportObject alreadyDeadParent; @SuppressWarnings("unchecked") @@ -651,9 +651,23 @@ private synchronized void setDependencies(final List> parents) { this.parents = parents; dependentCount = parents.size(); - alreadyDeadParent = parents.stream() - .filter(p -> p != null && !tryManage(p)) - .findFirst().orElse(null); + for (final ExportObject parent : parents) { + if (parent != null && !tryManage(parent)) { + alreadyDeadParent = parent; + break; + } + } + + if (alreadyDeadParent != null) { + // no point in holding references to other parents any longer; we've already failed + for (final ExportObject parent : parents) { + if (parent == alreadyDeadParent) { + break; + } else if (parent != null) { + unmanage(parent); + } + } + } if (log.isDebugEnabled()) { final Exception e = new RuntimeException(); @@ -692,6 +706,7 @@ private synchronized void setWork( // we defer this type of failure until setWork for consistency in error handling if (alreadyDeadParent != null) { onDependencyFailure(alreadyDeadParent); + alreadyDeadParent = null; } if (isExportStateTerminal(state)) { From 07f687637cab89450648c0ea3b23086178fbc4ea Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Tue, 9 Jan 2024 09:39:21 -0700 Subject: [PATCH 4/4] Ryan's feedback --- .../io/deephaven/server/session/SessionState.java | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index db63790e0bb..e45a86b33c6 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -653,22 +653,13 @@ private synchronized void setDependencies(final List> parents) { dependentCount = parents.size(); for (final ExportObject parent : parents) { if (parent != null && !tryManage(parent)) { + // we've failed; let's cleanup already managed parents + forceReferenceCountToZero(); alreadyDeadParent = parent; break; } } - if (alreadyDeadParent != null) { - // no point in holding references to other parents any longer; we've already failed - for (final ExportObject parent : parents) { - if (parent == alreadyDeadParent) { - break; - } else if (parent != null) { - unmanage(parent); - } - } - } - if (log.isDebugEnabled()) { final Exception e = new RuntimeException(); final LogEntry entry =