Skip to content

Commit

Permalink
fix(langgraph): make sure the pregel loop aborts on cancellation (#797)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjamincburns authored Jan 23, 2025
1 parent db24751 commit 03d7003
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 16 deletions.
8 changes: 7 additions & 1 deletion libs/langgraph/src/pregel/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ export async function _runWithRetry<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
pregelTask: PregelExecutableTask<N, C>,
retryPolicy?: RetryPolicy,
configurable?: Record<string, unknown>
configurable?: Record<string, unknown>,
signal?: AbortSignal
): Promise<{
task: PregelExecutableTask<N, C>;
result: unknown;
Expand All @@ -88,6 +89,11 @@ export async function _runWithRetry<
}
// eslint-disable-next-line no-constant-condition
while (true) {
if (signal?.aborted) {
// no need to throw here - we'll throw from the runner, instead.
// there's just no point in retrying if the user has requested an abort.
break;
}
// Clear any writes from previous attempts
pregelTask.writes.splice(0, pregelTask.writes.length);
error = undefined;
Expand Down
34 changes: 19 additions & 15 deletions libs/langgraph/src/pregel/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,25 @@ export class PregelRunner {
return task.config?.configurable?.[CONFIG_KEY_SEND]?.(writes) ?? [];
};

if (stepTimeout && signal) {
if ("any" in AbortSignal) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
signal = (AbortSignal as any).any([
signal,
AbortSignal.timeout(stepTimeout),
]);
}
} else if (stepTimeout) {
signal = AbortSignal.timeout(stepTimeout);
}

// don't start tasks if signal is aborted!
if (signal?.aborted) {
// note: don't use throwIfAborted here because it throws a DOMException,
// which isn't consistent with how we throw on abort below.
throw new Error("Abort");
}

// Start tasks
Object.assign(
executingTasksMap,
Expand All @@ -136,21 +155,6 @@ export class PregelRunner {
)
);

if (stepTimeout && signal) {
if ("any" in AbortSignal) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
signal = (AbortSignal as any).any([
signal,
AbortSignal.timeout(stepTimeout),
]);
}
} else if (stepTimeout) {
signal = AbortSignal.timeout(stepTimeout);
}

// Abort if signal is aborted
signal?.throwIfAborted();

let listener: () => void;
const signalPromise = new Promise<never>((_resolve, reject) => {
listener = () => reject(new Error("Abort"));
Expand Down
1 change: 1 addition & 0 deletions libs/langgraph/src/pregel/utils/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const CONFIG_KEYS = [
"writer",
"interruptBefore",
"interruptAfter",
"signal",
];

const DEFAULT_RECURSION_LIMIT = 25;
Expand Down
3 changes: 3 additions & 0 deletions libs/langgraph/src/tests/pregel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9464,6 +9464,9 @@ graph TD;
config
)
).rejects.toThrow("Aborted");

// Ensure that the `twoCount` has had time to increment before we check it, in case the stream aborted but the graph execution didn't.
await new Promise((resolve) => setTimeout(resolve, 300));
expect(oneCount).toEqual(1);
expect(twoCount).toEqual(0);
});
Expand Down

0 comments on commit 03d7003

Please sign in to comment.