diff --git a/bun.lockb b/bun.lockb index 40524cd..6cfa292 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/example/src/surprise_trip.ts b/example/src/surprise_trip.ts index 10aaff6..2b35100 100644 --- a/example/src/surprise_trip.ts +++ b/example/src/surprise_trip.ts @@ -4,7 +4,7 @@ import { agent } from '@dead-simple-ai-agent/framework/agent' import { teamwork } from '@dead-simple-ai-agent/framework/teamwork' -import { logger } from '@dead-simple-ai-agent/framework/telemetry/console' +import { logger } from '@dead-simple-ai-agent/framework/telemetry' import { workflow } from '@dead-simple-ai-agent/framework/workflow' import { lookupWikipedia } from '../tools.js' @@ -72,7 +72,8 @@ const researchTripWorkflow = workflow({ Comprehensive day-by-day itinerary for the trip to WrocΕ‚aw, Poland. Ensure the itinerary integrates flights, hotel information, and all planned activities and dining experiences. `, - telemetry: logger, + // Uncomment to see the workflow state in the console + // snapshot: logger, }) const result = await teamwork(researchTripWorkflow) diff --git a/packages/framework/package.json b/packages/framework/package.json index 4f30f63..bf1200e 100644 --- a/packages/framework/package.json +++ b/packages/framework/package.json @@ -9,9 +9,6 @@ }, "./models/*": { "bun": "./src/models/*.ts" - }, - "./telemetry/*": { - "bun": "./src/telemetry/*.ts" } }, "type": "module", diff --git a/packages/framework/src/supervisor/nextTask.ts b/packages/framework/src/supervisor/nextTask.ts index 3359cf6..02ceee7 100644 --- a/packages/framework/src/supervisor/nextTask.ts +++ b/packages/framework/src/supervisor/nextTask.ts @@ -5,12 +5,13 @@ import { z } from 'zod' import { Provider } from '../models/openai.js' import { Message } from '../types.js' -export async function getNextTask(provider: Provider, history: Message[]): Promise { +export async function nextTask(provider: Provider, history: Message[]): Promise { const response = await provider.completions({ messages: [ { role: 'system', // tbd: handle subsequent failures + // tbd: include max iterations in system prompt content: s` You are a planner that breaks down complex workflows into smaller, actionable steps. Your job is to determine the next task that needs to be done based on the original workflow and what has been completed so far. diff --git a/packages/framework/src/supervisor/nextTick.ts b/packages/framework/src/supervisor/nextTick.ts new file mode 100644 index 0000000..69c64e6 --- /dev/null +++ b/packages/framework/src/supervisor/nextTick.ts @@ -0,0 +1,132 @@ +import { Workflow, WorkflowState } from '../workflow.js' +import { finalizeWorkflow } from './finalizeWorkflow.js' +import { nextTask } from './nextTask.js' +import { runAgent } from './runAgent.js' +import { runTools } from './runTools.js' +import { selectAgent } from './selectAgent.js' + +/** + * Performs single iteration over Workflow and produces its next state. + */ +export async function nextTick(workflow: Workflow, state: WorkflowState): Promise { + const { status, messages } = state + + /** + * When number of messages exceedes number of maximum iterations, we must force finish the workflow + * and produce best final answer + */ + if (messages.length > workflow.maxIterations) { + const content = await finalizeWorkflow(workflow.provider, messages) + return { + ...state, + status: 'finished', + messages: state.messages.concat({ + role: 'user', + content, + }), + } + } + + /** + * When workflow is idle, we must get next task to work on, or finish the workflow otherwise. + */ + if (status === 'idle') { + const task = await nextTask(workflow.provider, messages) + if (task) { + return { + ...state, + status: 'pending', + agentRequest: [ + { + role: 'user', + content: task, + }, + ], + } + } else { + return { + ...state, + status: 'finished', + } + } + } + + /** + * When workflow is pending, we must find best agent to work on it. + */ + if (status === 'pending') { + const selectedAgent = await selectAgent(workflow.provider, state.agentRequest, workflow.members) + return { + ...state, + status: 'assigned', + agentStatus: 'idle', + agent: selectedAgent.role, + } + } + + /** + * When workflow is running, we must call assigned agent to continue working on it. + */ + if (status === 'assigned') { + const agent = workflow.members.find((member) => member.role === state.agent) + if (!agent) { + return { + id: state.id, + status: 'failed', + messages: state.messages.concat({ + role: 'assistant', + content: 'No agent found.', + }), + } + } + + /** + * When agentStatus is `tool`, an agent is waiting for the tools results. + * We must run all the tools in order to proceed to the next step. + */ + if (state.agentStatus === 'tool') { + const toolsResponse = await runTools(agent, state.agentRequest!) + return { + ...state, + agentStatus: 'step', + agentRequest: state.agentRequest.concat(toolsResponse), + } + } + + /** + * When agent finishes running, it will return status to indicate whether it finished processing. + * + * If it finished processing, we will append its final answer to the context, as well as + * first message from `agentRequest`, which holds the actual task, excluding middle-steps. + * + * If further processing is required, we will carry `agentRequest` over to the next iteration. + */ + const [agentResponse, status] = await runAgent(agent, state.agentRequest) + if (status === 'complete') { + const agentFinalAnswer = agentResponse.at(-1)! + return { + ...state, + status: 'idle', + messages: state.messages.concat(state.agentRequest[0], agentFinalAnswer), + } + } + return { + ...state, + agentStatus: status, + agentRequest: state.agentRequest.concat(agentResponse), + } + } + + /** + * When workflow fails due to unexpected error, we must attempt recovering or finish the workflow + * otherwise. + */ + if (status === 'failed') { + return { + ...state, + status: 'finished', + } + } + + return state +} diff --git a/packages/framework/src/executor.ts b/packages/framework/src/supervisor/runAgent.ts similarity index 53% rename from packages/framework/src/executor.ts rename to packages/framework/src/supervisor/runAgent.ts index ee51e50..44bf153 100644 --- a/packages/framework/src/executor.ts +++ b/packages/framework/src/supervisor/runAgent.ts @@ -2,14 +2,13 @@ import s from 'dedent' import { zodFunction, zodResponseFormat } from 'openai/helpers/zod' import { z } from 'zod' -import { Agent } from './agent.js' -import { Message } from './types.js' +import { Agent } from '../agent.js' +import { Message } from '../types.js' -export async function executeTaskWithAgent( +export async function runAgent( agent: Agent, - messages: Message[], - team: Agent[] -): Promise { + agentRequest: Message[] +): Promise<[Message[], 'step' | 'complete' | 'tool']> { const tools = agent.tools ? Object.entries(agent.tools).map(([name, tool]) => zodFunction({ @@ -21,7 +20,6 @@ export async function executeTaskWithAgent( : [] const response = await agent.provider.completions({ - // tbd: verify the prompt messages: [ { role: 'system', @@ -36,7 +34,7 @@ export async function executeTaskWithAgent( Only ask question to the user if you cannot complete the task without their input. `, }, - ...messages, + ...agentRequest, ], tools: tools.length > 0 ? tools : undefined, response_format: zodResponseFormat( @@ -58,62 +56,23 @@ export async function executeTaskWithAgent( 'task_result' ), }) - if (response.choices[0].message.tool_calls.length > 0) { - const toolResults = await Promise.all( - response.choices[0].message.tool_calls.map(async (toolCall) => { - if (toolCall.type !== 'function') { - throw new Error('Tool call is not a function') - } - - const tool = agent.tools ? agent.tools[toolCall.function.name] : null - if (!tool) { - throw new Error(`Unknown tool: ${toolCall.function.name}`) - } - - const content = await tool.execute(toolCall.function.parsed_arguments, { - provider: agent.provider, - messages, - }) - return { - role: 'tool' as const, - tool_call_id: toolCall.id, - content: JSON.stringify(content), - } - }) - ) - return executeTaskWithAgent( - agent, - [...messages, response.choices[0].message, ...toolResults], - team - ) + if (response.choices[0].message.tool_calls.length > 0) { + return [[response.choices[0].message], 'tool'] } - // tbd: verify shape of response const result = response.choices[0].message.parsed if (!result) { throw new Error('No parsed response received') } - if (result.response.kind === 'step') { - console.log('πŸš€ Step:', result.response.name) - return executeTaskWithAgent( - agent, - [ - ...messages, - { - role: 'assistant', - content: result.response.result, - }, - ], - team - ) - } - - if (result.response.kind === 'complete') { - return result.response.result - } - - // tbd: check if this is reachable - throw new Error('Illegal state') + return [ + [ + { + role: 'assistant', + content: result.response.result, + }, + ], + result.response.kind, + ] } diff --git a/packages/framework/src/supervisor/runTools.ts b/packages/framework/src/supervisor/runTools.ts new file mode 100644 index 0000000..bad2e1d --- /dev/null +++ b/packages/framework/src/supervisor/runTools.ts @@ -0,0 +1,50 @@ +import type { ParsedChatCompletionMessage } from 'openai/resources/beta/chat/completions.mjs' +import { ChatCompletionToolMessageParam } from 'openai/resources/index.mjs' + +import { Agent } from '../agent.js' +import { Message } from '../types.js' + +/** + * Asserts that given message requests tool calls + */ +function isToolCallRequest(message?: Message): message is ParsedChatCompletionMessage { + return message ? 'tool_calls' in message : false +} + +export async function runTools( + agent: Agent, + agentRequest: Message[] +): Promise { + // tbd: find cleaner way to do this + const messages = Array.from(agentRequest) + const toolCallRequest = messages.pop() + + if (!isToolCallRequest(toolCallRequest)) { + throw new Error('Invalid tool request') + } + + const toolResults = await Promise.all( + toolCallRequest.tool_calls.map(async (toolCall) => { + if (toolCall.type !== 'function') { + throw new Error('Tool call is not a function') + } + + const tool = agent.tools ? agent.tools[toolCall.function.name] : null + if (!tool) { + throw new Error(`Unknown tool: ${toolCall.function.name}`) + } + + const content = await tool.execute(toolCall.function.parsed_arguments, { + provider: agent.provider, + messages, + }) + return { + role: 'tool' as const, + tool_call_id: toolCall.id, + content: JSON.stringify(content), + } + }) + ) + + return toolResults +} diff --git a/packages/framework/src/supervisor/selectAgent.ts b/packages/framework/src/supervisor/selectAgent.ts index b7f5cb1..ad466a1 100644 --- a/packages/framework/src/supervisor/selectAgent.ts +++ b/packages/framework/src/supervisor/selectAgent.ts @@ -4,10 +4,11 @@ import { z } from 'zod' import { Agent } from '../agent.js' import { Provider } from '../models/openai.js' +import { Message } from '../types.js' export async function selectAgent( provider: Provider, - task: string, + agentRequest: Message[], agents: Agent[] ): Promise { const response = await provider.completions({ @@ -29,7 +30,7 @@ export async function selectAgent( role: 'user', content: s` Here is the task: - ${task} + ${agentRequest.map((request) => request.content).join(',')} Here are the available agents: diff --git a/packages/framework/src/teamwork.ts b/packages/framework/src/teamwork.ts index 31de1c4..f38346b 100644 --- a/packages/framework/src/teamwork.ts +++ b/packages/framework/src/teamwork.ts @@ -1,113 +1,28 @@ -import { executeTaskWithAgent } from './executor.js' -import { finalizeWorkflow } from './supervisor/finalizeWorkflow.js' -import { getNextTask } from './supervisor/nextTask.js' -import { selectAgent } from './supervisor/selectAgent.js' -import { Message, MessageContent } from './types.js' +import { nextTick } from './supervisor/nextTick.js' +import { MessageContent } from './types.js' import { Workflow, WorkflowState, workflowState } from './workflow.js' -export async function iterate(workflow: Workflow, state: WorkflowState): Promise { - const { provider, members, telemetry } = workflow - const { messages } = state - - telemetry.record({ - type: 'workflow.iteration.start', - data: { - workflow, - state, - }, - }) - - const task = await getNextTask(provider, messages) - if (!task) { - return { - ...state, - messages, - status: 'finished', - } - } - - telemetry.record({ - type: 'workflow.iteration.nextTask', - data: { - workflow, - task, - }, - }) - - if (messages.length > workflow.maxIterations) { - return { - ...state, - messages, - status: 'interrupted', - } - } - - // tbd: get rid of console.logs, use telemetry instead - console.log('πŸš€ Next task:', task) - - const selectedAgent = await selectAgent(provider, task, members) - console.log('πŸš€ Selected agent:', selectedAgent.role) - - const agentRequest: Message[] = [ - ...messages, - { - role: 'user', - content: task, - }, - ] - - try { - const result = await executeTaskWithAgent(selectedAgent, agentRequest, members) - return { - ...state, - messages: [ - ...agentRequest, - { - role: 'assistant', - content: result, - }, - ], - status: 'running', - } - } catch (error) { - return { - ...state, - messages: [ - ...agentRequest, - { - role: 'assistant', - content: error instanceof Error ? error.message : 'Unknown error', - }, - ], - status: 'failed', - } - } -} - +/** + * Teamwork runs given workflow and continues iterating over the workflow until it finishes. + */ export async function teamwork( workflow: Workflow, state: WorkflowState = workflowState(workflow) ): Promise { const { status, messages } = state - if (status === 'pending' || status === 'running') { - return teamwork(workflow, await iterate(workflow, state)) - } - if (status === 'finished') { return messages.at(-1)!.content } - if (status === 'failed') { - return ('🚨' + messages.at(-1)!.content) as string - } - - if (status === 'interrupted') { - console.log('🚨 Max iterations exceeded ', workflow.maxIterations) - return finalizeWorkflow(workflow.provider, messages) - } + return teamwork(workflow, await iterate(workflow, state)) +} - // tbd: recover from errors - // tbd: request final answer if took too long - throw new Error('Workflow failed. This is not implemented yet.') +/** + * Iterate performs single iteration over workflow and returns its next state + */ +export async function iterate(workflow: Workflow, state: WorkflowState): Promise { + const nextState = await nextTick(workflow, state) + workflow.snapshot(nextState) + return nextState } diff --git a/packages/framework/src/telemetry.ts b/packages/framework/src/telemetry.ts new file mode 100644 index 0000000..7a7d126 --- /dev/null +++ b/packages/framework/src/telemetry.ts @@ -0,0 +1,9 @@ +import { WorkflowState } from './workflow.js' + +export type Telemetry = (state: WorkflowState) => void + +export const noop: Telemetry = () => {} + +export const logger: Telemetry = (state) => { + console.log(state) +} diff --git a/packages/framework/src/telemetry/base.ts b/packages/framework/src/telemetry/base.ts deleted file mode 100644 index f898213..0000000 --- a/packages/framework/src/telemetry/base.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { Workflow, WorkflowState } from '../workflow.js' - -// Base event structure -export type BaseTelemetryEvent = { - workflowId: string - correlationId?: string -} - -/** - * Emitted when a workflow begins execution. - */ -export type WorkflowStartEvent = { - type: 'workflow.iteration.start' - data: { - workflow: Workflow - state: WorkflowState - } -} - -/** - * Emitted when a workflow gets the next task - */ -export type WorkflowEndEvent = { - type: 'workflow.iteration.nextTask' - data: { - workflow: Workflow - task: string - } -} - -export type TelemetryEvent = WorkflowStartEvent | WorkflowEndEvent - -export type Telemetry = { - record: (event: TelemetryEvent) => void | Promise -} - -export const noopTelemetry: Telemetry = { - record: () => {}, -} diff --git a/packages/framework/src/telemetry/console.ts b/packages/framework/src/telemetry/console.ts deleted file mode 100644 index 889d6f7..0000000 --- a/packages/framework/src/telemetry/console.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { Telemetry } from './base.js' - -export const logger: Telemetry = { - record: (event) => { - console.log(event) - }, -} diff --git a/packages/framework/src/workflow.ts b/packages/framework/src/workflow.ts index 806154f..da16d28 100644 --- a/packages/framework/src/workflow.ts +++ b/packages/framework/src/workflow.ts @@ -4,7 +4,7 @@ import s from 'dedent' import { Agent } from './agent.js' import { openai, Provider } from './models/openai.js' -import { noopTelemetry, Telemetry } from './telemetry/base.js' +import { noop, Telemetry } from './telemetry.js' import { Message } from './types.js' type WorkflowOptions = { @@ -14,7 +14,7 @@ type WorkflowOptions = { provider?: Provider maxIterations?: number - telemetry?: Telemetry + snapshot?: Telemetry } /** @@ -24,26 +24,56 @@ export const workflow = (options: WorkflowOptions): Workflow => { return { maxIterations: 50, provider: openai(), - telemetry: noopTelemetry, + snapshot: noop, ...options, } } export type Workflow = Required -export type WorkflowState = { +/** + * Base workflow + */ +type BaseWorkflowState = { id: string - status: 'running' | 'finished' | 'interrupted' | 'failed' | 'pending' messages: Message[] } +/** + * Different states workflow is in, in between execution from agents + */ +export type IdleWorkflowState = BaseWorkflowState & { + status: 'idle' | 'finished' | 'failed' +} + +/** + * Supervisor selected the task, and is now pending assignement of an agent + */ +export type PendingWorkflowState = BaseWorkflowState & { + status: 'pending' + agentRequest: Message[] +} + +/** + * State in which an agent is assigned and work is pending + */ +export type AssignedWorkflowState = BaseWorkflowState & { + status: 'assigned' + + agent: string + agentRequest: Message[] + agentStatus: 'idle' | 'step' | 'tool' +} + +export type WorkflowState = IdleWorkflowState | PendingWorkflowState | AssignedWorkflowState + /** * Helper utility to create a workflow state with defaults. */ -export const workflowState = (workflow: Workflow): WorkflowState => { +export const workflowState = (workflow: Workflow): IdleWorkflowState => { return { id: randomUUID(), - status: 'pending', + status: 'idle', messages: [ { role: 'assistant' as const, diff --git a/website/package.json b/website/package.json index 2d24d77..bf1f041 100644 --- a/website/package.json +++ b/website/package.json @@ -11,6 +11,7 @@ "rspress": "^1.37.3" }, "devDependencies": { + "@rspress/plugin-typedoc": "^1.37.4", "@types/node": "^18.11.17" } } diff --git a/website/rspress.config.ts b/website/rspress.config.ts index 0f37a9b..97505e2 100644 --- a/website/rspress.config.ts +++ b/website/rspress.config.ts @@ -1,5 +1,6 @@ import * as path from 'node:path' +import { pluginTypeDoc } from '@rspress/plugin-typedoc' import { defineConfig } from 'rspress/config' export default defineConfig({ @@ -19,4 +20,16 @@ export default defineConfig({ }, ], }, + plugins: [ + pluginTypeDoc({ + entryPoints: [ + path.join(__dirname, '../packages/framework/src/agent.ts'), + path.join(__dirname, '../packages/framework/src/teamwork.ts'), + path.join(__dirname, '../packages/framework/src/tool.ts'), + path.join(__dirname, '../packages/framework/src/workflow.ts'), + path.join(__dirname, '../packages/framework/src/models/openai.ts'), + path.join(__dirname, '../packages/framework/src/telemetry.ts'), + ], + }), + ], }) diff --git a/website/tsconfig.json b/website/tsconfig.json index 3c43903..86afe4d 100644 --- a/website/tsconfig.json +++ b/website/tsconfig.json @@ -1,3 +1,6 @@ { - "extends": "../tsconfig.json" + "extends": "../tsconfig.json", + "include": [ + "../packages/framework/src" + ] }