Skip to content

Commit

Permalink
feat(workflow): streamline workflow management with enhanced pause, r…
Browse files Browse the repository at this point in the history
…esume, and stop methods

- Refactored Team class methods (pause, resume, stop) to utilize new workflow management functions directly from the store, improving code clarity and reducing redundancy.
- Updated ReactChampionAgent to track the last feedback message and handle task execution more effectively, including abort handling.
- Introduced new error classes (StopAbortError, PauseAbortError) for better error management during workflow interruptions.
- Enhanced task logging for aborted tasks, capturing relevant statistics and error details for improved debugging.
- Integrated workflow action enums to standardize workflow control actions across the application.
  • Loading branch information
anthonydevs17 committed Jan 16, 2025
1 parent ea3f631 commit 5b15c1f
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 173 deletions.
165 changes: 90 additions & 75 deletions src/agents/reactChampionAgent.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ class ReactChampionAgent extends BaseAgent {
};
this.llmConfig = extractedLlmConfig;
}

this.interactionsHistory = new ChatMessageHistory();
this.lastFeedbackMessage = null;
}

async workOnTask(task, inputs, context) {
Expand Down Expand Up @@ -147,7 +147,7 @@ class ReactChampionAgent extends BaseAgent {

return {
executableAgent: chainAgentWithHistory,
initialFeedbackMessage: feedbackMessage,
initialFeedbackMessage: this.lastFeedbackMessage || feedbackMessage,
};
}

Expand All @@ -164,22 +164,20 @@ class ReactChampionAgent extends BaseAgent {
iterations < maxAgentIterations &&
!loopCriticalError
) {
while (
agent.store.getState().teamWorkflowStatus ===
WORKFLOW_STATUS_enum.PAUSED ||
agent.store.getState().teamWorkflowStatus ===
WORKFLOW_STATUS_enum.STOPPED
// Save the feedback message as the last feedback message
this.lastFeedbackMessage = feedbackMessage;

// Check workflow status
const workflowStatus = agent.store.getState().teamWorkflowStatus;

if (
workflowStatus === WORKFLOW_STATUS_enum.STOPPED ||
workflowStatus === WORKFLOW_STATUS_enum.STOPPING
) {
if (
agent.store.getState().teamWorkflowStatus ===
WORKFLOW_STATUS_enum.STOPPED
) {
return {
result: parsedResultWithFinalAnswer,
metadata: { iterations, maxAgentIterations },
};
}
await new Promise((resolve) => setTimeout(resolve, 100)); // Wait until resumed or stopped
return {
result: parsedResultWithFinalAnswer,
metadata: { iterations, maxAgentIterations },
};
}

try {
Expand Down Expand Up @@ -513,63 +511,48 @@ class ReactChampionAgent extends BaseAgent {
}

async executeThinking(agent, task, ExecutableAgent, feedbackMessage) {
const abortController =
agent.store.getState().workflowController.abortController;

return new Promise((resolve, reject) => {
// Check if already aborted
if (abortController.signal.aborted) {
reject(new AbortError());
return;
}

// Use once: true to ensure the listener is removed after firing
abortController.signal.addEventListener(
'abort',
() => {
reject(new AbortError());
},
{ once: true }
);
const promiseObj = {};
let rejectFn; // Declare reject function outside Promise
// Create an AbortController for this invocation
const abortController = new AbortController();
const thinkingPromise = new Promise((resolve, reject) => {
rejectFn = reject; // Capture the reject function

ExecutableAgent.invoke(
{ feedbackMessage },
{
configurable: {
sessionId: 'foo-bar-baz',
signal: abortController.signal,
},
configurable: { sessionId: task.id },
callbacks: [
{
handleChatModelStart: (llm, messages) => {
if (abortController.signal.aborted) return;
agent
.handleThinkingStart({ agent, task, messages })
.catch((error) => {
reject(error);
});
handleChatModelStart: async (llm, messages) => {
await agent.handleThinkingStart({ agent, task, messages });
},

handleLLMEnd: async (output) => {
if (abortController.signal.aborted) return;
agent
.handleThinkingEnd({ agent, task, output })
.then((thinkingResult) => resolve(thinkingResult))
.catch((error) => {
reject(error);
});
if (
this.store.getState().teamWorkflowStatus ===
WORKFLOW_STATUS_enum.PAUSED
) {
return;
}
const result = await agent.handleThinkingEnd({
agent,
task,
output,
});
resolve(result);
},
},
],
], // Add the signal to the options
signal: abortController.signal,
}
).catch((error) => {
logger.error(
`LLM_INVOCATION_ERROR: Error during LLM API call for Agent: ${agent.name}, Task: ${task.id}. Details:`,
error
);
if (error.name === 'AbortError' || abortController.signal.aborted) {
reject(new AbortError());
if (error.name === 'AbortError') {
reject(new AbortError('Task was cancelled'));
} else {
logger.error(
`LLM_INVOCATION_ERROR: Error during LLM API call for Agent: ${agent.name}, Task: ${task.id}. Details:`,
error
);
reject(
new LLMInvocationError(
`LLM API Error during executeThinking for Agent: ${agent.name}, Task: ${task.id}`,
Expand All @@ -579,6 +562,35 @@ class ReactChampionAgent extends BaseAgent {
}
});
});

// Assign both the promise and the captured reject function
Object.assign(promiseObj, {
promise: thinkingPromise,
// reject: rejectFn,
reject: (e) => {
abortController.abort();
rejectFn(e);
},
});

// Track promise in store
this.store.getState().trackPromise(this.id, promiseObj);

try {
return await thinkingPromise;
} catch (error) {
// Ensure we properly handle and rethrow the error
if (error instanceof AbortError) {
throw error; // Rethrow AbortError
}
// Wrap unexpected errors
throw new LLMInvocationError(
`LLM API Error during executeThinking for Agent: ${agent.name}, Task: ${task.id}`,
error
);
} finally {
this.store.getState().removePromise(this.id, promiseObj);
}
}

handleIssuesParsingLLMOutput({ agent, task, output }) {
Expand Down Expand Up @@ -684,28 +696,29 @@ class ReactChampionAgent extends BaseAgent {
}

async executeUsingTool({ agent, task, parsedLLMOutput, tool }) {
const abortController =
agent.store.getState().workflowController.abortController;

const toolInput = parsedLLMOutput.actionInput;
agent.handleUsingToolStart({ agent, task, tool, input: toolInput });

try {
const toolResult = await Promise.race([
tool.call(toolInput),
new Promise((_, reject) => {
abortController.signal.addEventListener('abort', () => {
reject(new AbortError());
});
}),
]);
const promiseObj = {};
let rejectFn; // Declare reject function outside Promise

agent.handleUsingToolEnd({ agent, task, tool, output: toolResult });
const toolPromise = new Promise((resolve, reject) => {
rejectFn = reject; // Capture the reject function
tool.call(toolInput).then(resolve).catch(reject);
});

// Track promise in store
Object.assign(promiseObj, { promise: toolPromise, reject: rejectFn });

this.store.getState().trackPromise(this.id, promiseObj);

try {
const result = await toolPromise;
agent.handleUsingToolEnd({ agent, task, tool, output: result });
return this.promptTemplates.TOOL_RESULT_FEEDBACK({
agent,
task,
toolResult,
toolResult: result,
parsedLLMOutput,
});
} catch (error) {
Expand All @@ -719,6 +732,8 @@ class ReactChampionAgent extends BaseAgent {
tool,
error,
});
} finally {
this.store.getState().removePromise(this.id, promiseObj);
}
}

Expand Down
21 changes: 3 additions & 18 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,7 @@ class Team {
* @returns {void}
*/
pause() {
const currentStatus = this.store.getState().teamWorkflowStatus;
if (currentStatus !== WORKFLOW_STATUS_enum.RUNNING) {
throw new Error('Cannot pause workflow unless it is running');
}
this.store.setState({ teamWorkflowStatus: WORKFLOW_STATUS_enum.PAUSED });
return this.store.getState().pauseWorkflow();
}

/**
Expand All @@ -176,26 +172,15 @@ class Team {
* @returns {void}
*/
resume() {
const currentStatus = this.store.getState().teamWorkflowStatus;
if (currentStatus !== WORKFLOW_STATUS_enum.PAUSED) {
throw new Error('Cannot resume workflow unless it is paused');
}
this.store.setState({ teamWorkflowStatus: WORKFLOW_STATUS_enum.RESUMED });
return this.store.getState().resumeWorkflow();
}
/**
* Stops the team's workflow.
* This method stops the workflow, preventing any further task execution.
* @returns {void}
*/
stop() {
const currentStatus = this.store.getState().teamWorkflowStatus;
if (
currentStatus !== WORKFLOW_STATUS_enum.RUNNING &&
currentStatus !== WORKFLOW_STATUS_enum.PAUSED
) {
throw new Error('Cannot stop workflow unless it is running or paused');
}
this.store.setState({ teamWorkflowStatus: WORKFLOW_STATUS_enum.STOPPING });
return this.store.getState().stopWorkflow();
}

/**
Expand Down
40 changes: 38 additions & 2 deletions src/stores/taskStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
} from '../utils/enums';
import { getTaskTitleForLogs } from '../utils/tasks';
import { logger } from '../utils/logger';
import { PrettyError } from '../utils/errors';
import { PrettyError, StopAbortError } from '../utils/errors';
import { calculateTaskCost } from '../utils/llmCostCalculator';

export const useTaskStore = (set, get) => ({
Expand Down Expand Up @@ -288,6 +288,43 @@ export const useTaskStore = (set, get) => ({
get().handleWorkflowBlocked({ task, error });
},
handleTaskAborted: ({ task, error }) => {
if (error instanceof StopAbortError) {
//create task log
const stats = get().getTaskStats(task, get);
const modelCode = task.agent.llmConfig.model; // Assuming this is where the model code is stored
// Calculate costs directly using stats
const costDetails = calculateTaskCost(modelCode, stats.llmUsageStats);

const taskLog = get().prepareNewLog({
agent: task.agent,
task,
logDescription: `Task aborted: ${getTaskTitleForLogs(task)}, Reason: ${
error.message
}`,
metadata: {
...stats,
costDetails,
error,
},
logType: 'TaskStatusUpdate',
});
// create pretty error
const prettyError = new PrettyError({
name: 'TASK STOPPED',
message: 'Task manually stopped by user.',
recommendedAction:
'Enable logLevel: "debug" during team initialization to obtain more detailed logs and facilitate troubleshooting.',
rootError: error,
context: { task, error },
});
logger.warn(prettyError.prettyMessage);
logger.debug(prettyError.context);

set((state) => ({
workflowLogs: [...state.workflowLogs, taskLog],
}));
return;
}
const stats = get().getTaskStats(task, get);
task.status = TASK_STATUS_enum.BLOCKED;
const modelCode = task.agent.llmConfig.model; // Assuming this is where the model code is stored
Expand Down Expand Up @@ -338,6 +375,5 @@ export const useTaskStore = (set, get) => ({
),
workflowLogs: [...state.workflowLogs, taskLog],
}));
get().handleWorkflowAborted({ task, error });
},
});
3 changes: 2 additions & 1 deletion src/stores/teamStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { create } from 'zustand';
import { devtools, subscribeWithSelector } from 'zustand/middleware';
import { useAgentStore } from './agentStore';
import { useTaskStore } from './taskStore';
import { useWorkflowLoopStore } from './workflowLoopStore';
import {
TASK_STATUS_enum,
AGENT_STATUS_enum,
Expand Down Expand Up @@ -52,7 +53,7 @@ const createTeamStore = (initialState = {}) => {
(set, get) => ({
...useAgentStore(set, get),
...useTaskStore(set, get),

...useWorkflowLoopStore(set, get),
teamWorkflowStatus:
initialState.teamWorkflowStatus || WORKFLOW_STATUS_enum.INITIAL,
workflowResult: initialState.workflowResult || null,
Expand Down
Loading

0 comments on commit 5b15c1f

Please sign in to comment.