Skip to content

Commit

Permalink
feat(agent): add workOnTaskResume method and integrate into workflow …
Browse files Browse the repository at this point in the history
…management

- Introduced workOnTaskResume method in Agent and BaseAgent classes to handle task resumption.
- Implemented workOnTaskResume in ReactChampionAgent to manage task execution with last feedback context.
- Updated teamStore to support task resumption in the workflow controller, enhancing task handling during workflow interruptions.
- Improved overall agent state management and task queue handling for better responsiveness and control.
  • Loading branch information
anthonydevs17 committed Jan 23, 2025
1 parent b6fc578 commit db13940
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 9 deletions.
4 changes: 4 additions & 0 deletions src/agents/baseAgent.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ class BaseAgent {
workOnTask(_task) {
throw new Error('workOnTask must be implemented by subclasses.');
}

workOnTaskResume(_task) {
throw new Error('workOnTaskResume must be implemented by subclasses.');
}
}

export { BaseAgent };
10 changes: 9 additions & 1 deletion src/agents/reactChampionAgent.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,15 @@ class ReactChampionAgent extends BaseAgent {
this.interactionsHistory = new ChatMessageHistory();
this.lastFeedbackMessage = null;
}

async workOnTaskResume(task) {
const lastFeedbackMessage = this.lastFeedbackMessage;
return await this.agenticLoop(
this,
task,
this.#executableAgent,
lastFeedbackMessage
);
}
async workOnTask(task, inputs, context) {
const config = this.prepareAgentForTask(task, inputs, context);
this.#executableAgent = config.executableAgent;
Expand Down
3 changes: 3 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class Agent {
workOnTask(task, inputs, context) {
return this.agentInstance.workOnTask(task, inputs, context);
}
workOnTaskResume(task) {
return this.agentInstance.workOnTaskResume(task);
}

workOnFeedback(task, inputs, context) {
return this.agentInstance.workOnFeedback(task, inputs, context);
Expand Down
4 changes: 3 additions & 1 deletion src/stores/teamStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,9 @@ const createTeamStore = (initialState = {}) => {
}
}
},

workOnTaskResume: async (agent, task) => {
await agent.workOnTaskResume(task);
},
deriveContextFromLogs: (logs, currentTaskId) => {
const taskResults = new Map();
const tasks = get().tasks; // Get the tasks array from the store
Expand Down
28 changes: 21 additions & 7 deletions src/stores/workflowController.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/

// import PQueue from 'p-queue';
import { TASK_STATUS_enum /*WORKFLOW_STATUS_enum*/ } from '../utils/enums';
import { TASK_STATUS_enum, WORKFLOW_STATUS_enum } from '../utils/enums';
// import { logger } from '../utils/logger';
export const setupWorkflowController = (useTeamStore) => {
// const taskQueue = new PQueue({ concurrency: 1 });
Expand All @@ -24,14 +24,28 @@ export const setupWorkflowController = (useTeamStore) => {
useTeamStore.subscribe(
(state) => state.tasks.filter((t) => t.status === TASK_STATUS_enum.DOING),
(doingTasks, previousDoingTasks) => {
const isResumed =
useTeamStore.getState().teamWorkflowStatus ===
WORKFLOW_STATUS_enum.RESUMED;
doingTasks.forEach((task) => {
if (!previousDoingTasks.find((t) => t.id === task.id)) {
taskQueue
.add(() => useTeamStore.getState().workOnTask(task.agent, task))
.catch((error) => {
useTeamStore.getState().handleTaskError({ task, error });
useTeamStore.getState().handleWorkflowError(task, error);
});
if (isResumed) {
taskQueue
.add(() =>
useTeamStore.getState().workOnTaskResume(task.agent, task)
)
.catch((error) => {
useTeamStore.getState().handleTaskError({ task, error });
useTeamStore.getState().handleWorkflowError(task, error);
});
} else {
taskQueue
.add(() => useTeamStore.getState().workOnTask(task.agent, task))
.catch((error) => {
useTeamStore.getState().handleTaskError({ task, error });
useTeamStore.getState().handleWorkflowError(task, error);
});
}
if (taskQueue.isPaused) taskQueue.start();
}
});
Expand Down

0 comments on commit db13940

Please sign in to comment.