Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ExportObject to be RELEASED When Already Managed #5014

Merged
merged 4 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 61 additions & 41 deletions server/src/main/java/io/deephaven/server/session/SessionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ public final static class ExportObject<T> extends LivenessArtifact {

/** used to detect when this object is ready for export (is visible for atomic int field updater) */
private volatile int dependentCount = -1;
/** our first parent that was already released prior to having dependencies set if one exists */
private ExportObject<?> alreadyDeadParent;

@SuppressWarnings("unchecked")
private static final AtomicIntegerFieldUpdater<ExportObject<?>> DEPENDENT_COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater((Class<ExportObject<?>>) (Class<?>) ExportObject.class,
Expand Down Expand Up @@ -648,7 +651,14 @@ private synchronized void setDependencies(final List<ExportObject<?>> parents) {

this.parents = parents;
dependentCount = parents.size();
parents.stream().filter(Objects::nonNull).forEach(this::tryManage);
for (final ExportObject<?> parent : parents) {
if (parent != null && !tryManage(parent)) {
// we've failed; let's cleanup already managed parents
forceReferenceCountToZero();
alreadyDeadParent = parent;
break;
}
}

if (log.isDebugEnabled()) {
final Exception e = new RuntimeException();
Expand Down Expand Up @@ -678,18 +688,24 @@ private synchronized void setWork(
throw new IllegalStateException("export object can only be defined once");
}
hasHadWorkSet = true;

if (queryPerformanceRecorder != null && queryPerformanceRecorder.getState() == QueryState.RUNNING) {
// transfer ownership of the qpr to the export before it can be resumed by the scheduler
queryPerformanceRecorder.suspendQuery();
}
this.requiresSerialQueue = requiresSerialQueue;

// we defer this type of failure until setWork for consistency in error handling
if (alreadyDeadParent != null) {
onDependencyFailure(alreadyDeadParent);
alreadyDeadParent = null;
}

if (isExportStateTerminal(state)) {
// The following scenarios cause us to get into this state:
// - this export object was released/cancelled
// - the session expiration propagated to this export object
// Note that already failed dependencies will be handled in the onResolveOne method below.
// - a parent export was released/dead prior to `setDependencies`
// Note that failed dependencies will be handled in the onResolveOne method below.

// since this is the first we know of the errorHandler, it could not have been invoked yet
if (errorHandler != null) {
Expand Down Expand Up @@ -891,44 +907,12 @@ private void onResolveOne(@Nullable final ExportObject<?> parent) {
return;
}

// is this a cascading failure?
if (parent != null && isExportStateTerminal(parent.state)) {
synchronized (this) {
errorId = parent.errorId;
if (parent.caughtException instanceof StatusRuntimeException) {
caughtException = parent.caughtException;
}
ExportNotification.State terminalState = ExportNotification.State.DEPENDENCY_FAILED;

if (errorId == null) {
final String errorDetails;
switch (parent.state) {
case RELEASED:
terminalState = ExportNotification.State.DEPENDENCY_RELEASED;
errorDetails = "dependency released by user.";
break;
case CANCELLED:
terminalState = ExportNotification.State.DEPENDENCY_CANCELLED;
errorDetails = "dependency cancelled by user.";
break;
default:
// Note: the other error states should have non-null errorId
errorDetails = "dependency does not have its own error defined " +
"and is in an unexpected state: " + parent.state;
break;
}

maybeAssignErrorId();
failedDependencyLogIdentity = parent.logIdentity;
if (!(caughtException instanceof StatusRuntimeException)) {
log.error().append("Internal Error '").append(errorId).append("' ").append(errorDetails)
.endl();
}
}

setState(terminalState);
return;
}
// Is this a cascading failure? Note that we manage the parents in `setDependencies` which
// keeps the parent results live until this child been exported. This means that the parent is allowed to
// be in a RELEASED state, but is not allowed to be in a failure state.
if (parent != null && isExportStateFailure(parent.state)) {
onDependencyFailure(parent);
return;
}

final int newDepCount = DEPENDENT_COUNT_UPDATER.decrementAndGet(this);
Expand Down Expand Up @@ -1045,6 +1029,42 @@ private void maybeAssignErrorId() {
}
}

private synchronized void onDependencyFailure(final ExportObject<?> parent) {
errorId = parent.errorId;
if (parent.caughtException instanceof StatusRuntimeException) {
caughtException = parent.caughtException;
}
ExportNotification.State terminalState = ExportNotification.State.DEPENDENCY_FAILED;

if (errorId == null) {
final String errorDetails;
switch (parent.state) {
case RELEASED:
terminalState = ExportNotification.State.DEPENDENCY_RELEASED;
errorDetails = "dependency released by user.";
break;
case CANCELLED:
terminalState = ExportNotification.State.DEPENDENCY_CANCELLED;
errorDetails = "dependency cancelled by user.";
break;
default:
// Note: the other error states should have non-null errorId
errorDetails = "dependency does not have its own error defined " +
"and is in an unexpected state: " + parent.state;
break;
}

maybeAssignErrorId();
failedDependencyLogIdentity = parent.logIdentity;
if (!(caughtException instanceof StatusRuntimeException)) {
log.error().append("Internal Error '").append(errorId).append("' ").append(errorDetails)
.endl();
}
}

setState(terminalState);
}

/**
* Sets the final result for this export.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,14 @@ public void testNewExportRequiresPositiveId() {

@Test
public void testDependencyAlreadyReleased() {
final SessionState.ExportObject<Object> e1 = session.newExport(nextExportId++).submit(() -> {
});
scheduler.runUntilQueueEmpty();
e1.release();
Assert.eq(e1.getState(), "e1.getState()", ExportNotification.State.RELEASED);
final SessionState.ExportObject<Object> e1;
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
e1 = session.newExport(nextExportId++).submit(() -> {
});
scheduler.runUntilQueueEmpty();
e1.release();
Assert.eq(e1.getState(), "e1.getState()", ExportNotification.State.RELEASED);
}

final MutableBoolean errored = new MutableBoolean();
final MutableBoolean success = new MutableBoolean();
Expand Down
Loading