Skip to content

Commit

Permalink
feat: implement server with background jobs and resume when all answe…
Browse files Browse the repository at this point in the history
…rs are provided (#49)

This implements #2 and #9 (please confirm)

Updated server to use latest API
Workflow now runs on the server in the background until it requests a
tool or finishes
Removed saving/loading from file as we want to keep state in memory and
run its updates in background. Previous example required calling
"iterate" endpoint each time to advance it. We can always add it back,
but I think this one is much better
Will automatically resume when all answers are provided 
Logger UI

![CleanShot 2024-12-09 at 04 26
13@2x](https://github.com/user-attachments/assets/8eaea18a-79c9-46a5-bc33-531be71c18b5)

Once this gets accepted, I will go ahead and create reusable functions
to reduce this boilerplate and hide abstraction.

---------

Co-authored-by: Piotr Karwatka <[email protected]>
  • Loading branch information
grabbou and pkarw authored Dec 9, 2024
1 parent 4ef51bf commit eb67943
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 148 deletions.
57 changes: 28 additions & 29 deletions example/src/medical_survey/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,47 +25,46 @@ const askPatient = tool({
})

const nurse = agent({
role: 'Nurse,doctor assistant',
role: 'Nurse',
description: `
You are skille nurse / doctor assistant.
You role is to cooperate with reporter to create a pre-visit note for a patient that is about to come for a visit.
Ask user questions about the patient's health and symptoms.
Ask one question at time up to 5 questions.
`,
You are skille nurse / doctor assistant.
You role is to cooperate with reporter to create a pre-visit note for a patient that is about to come for a visit.
Ask user questions about the patient's health and symptoms.
Ask one question at time up to 5 questions.
`,
tools: {
ask_question: askPatient,
askPatient,
},
})

const reporter = agent({
role: 'Reporter',
description: `
You are skilled at preparing great looking markdown reports.
Prepare a report for a patient that is about to come for a visit.
Add info about the patient's health and symptoms.
`,
tools: {},
You are skilled at preparing great looking reports.
You can prepare a report for a patient that is about to come for a visit.
Add info about the patient's health and symptoms.
`,
})

export const preVisitNoteWorkflow = workflow({
members: [nurse, reporter],
description: `
Create a pre-visit note for a patient that is about to come for a visit.
The note should include the patient's health and symptoms.
Include:
- symptoms,
- health issues,
- medications,
- allergies,
- surgeries
Never ask fo:
- personal data,
- sensitive data,
- any data that can be used to identify the patient.
`,
Create a pre-visit note for a patient that is about to come for a visit.
The note should include the patient's health and symptoms.
Include:
- symptoms,
- health issues,
- medications,
- allergies,
- surgeries
Never ask fo:
- personal data,
- sensitive data,
- any data that can be used to identify the patient.
`,
output: `
A markdown report for the patient's pre-visit note.
`,
A markdown report for the patient's pre-visit note.
`,
})
45 changes: 0 additions & 45 deletions example/src/medical_survey/workflow_server.ts

This file was deleted.

201 changes: 157 additions & 44 deletions example/src/medical_survey_server.ts
Original file line number Diff line number Diff line change
@@ -1,68 +1,181 @@
/**
* Example borrowed from CrewAI.
* This example demonstrates using framework in server-side environments.
*/

import { isToolCallRequest } from '@dead-simple-ai-agent/framework/supervisor/runTools'
import { iterate } from '@dead-simple-ai-agent/framework/teamwork'
import { workflowState } from '@dead-simple-ai-agent/framework/workflow'
import fastify, { FastifyReply, FastifyRequest } from 'fastify'
import { promises as fs } from 'fs'
import { tmpdir } from 'os'
import { join } from 'path'
import { WorkflowState, workflowState } from '@dead-simple-ai-agent/framework/workflow'
import chalk from 'chalk'
import s from 'dedent'
import fastify, { FastifyRequest } from 'fastify'

import { preVisitNoteWorkflow } from './medical_survey/workflow.js'

const server = fastify({ logger: false })

import { preVisitNoteWorkflow } from './medical_survey/workflow_server.js'
const visits: Record<string, WorkflowState> = {}

const dbPath = (id: string) => join(tmpdir(), id + '_workflow_db.json')
/**
* This will create a new workflow and return the initial state
*/
server.post('/visits', async () => {
const state = workflowState(preVisitNoteWorkflow)

let state = workflowState(preVisitNoteWorkflow)
// Add the state to the visits map
visits[state.id] = state

// Start the visit in the background
runVisit(state.id)

return {
id: state.id,
status: state.status,
}
})

/**
* Call this endpoint to get status of the workflow, or the final result.
*/
server.get('/visits/:id', async (req: FastifyRequest<{ Params: { id: string } }>) => {
const state = visits[req.params.id]
if (!state) {
throw new Error('Workflow not found')
}

server.post('/start', async () => {
const nextState = await iterate(preVisitNoteWorkflow, state)
if (state.status === 'finished') {
return {
status: state.status,
result: state.messages.at(-1)!.content,
}
}

await fs.writeFile(dbPath(nextState.id), JSON.stringify(nextState, null, 2), 'utf-8')
if (state.status === 'assigned') {
if (state.agentStatus === 'tool') {
return state.agentRequest.findLast(isToolCallRequest)!.tool_calls
}
return {
status: state.status,
agentStatus: state.agentStatus,
}
}

return {
status: 'running',
state: nextState,
status: state.status,
}
})

/**
* Adds a message to the workflow.
*/
server.post(
'/iterate/:id',
async (req: FastifyRequest<{ Params: { id: string }; Body: { message: string } }>) => {
const { id } = req.params
const { message } = req.body

const path = dbPath(id)

if (await fs.exists(path)) {
try {
state = JSON.parse(await fs.readFile(path, 'utf-8'))
console.log('🛟 Loaded workflow from', path)
} catch (error) {
console.log(`🚨Error while loading workflow from ${path}. Starting new workflow.`)
}
'/visits/:id/messages',
async (req: FastifyRequest<{ Params: { id: string }; Body: ToolCallMessage }>) => {
const state = visits[req.params.id]
if (!state) {
throw new Error('Workflow not found')
}

if (state.status !== 'assigned' || state.agentStatus !== 'tool') {
throw new Error('Workflow is not waiting for a message right now')
}

if (message) {
// message provided within the call - for example a return call from API/Slack/Whatever
state.messages.push({ role: 'user', content: message })
const toolRequestMessage = state.agentRequest.findLast(isToolCallRequest)
if (!toolRequestMessage) {
throw new Error('No tool request message found')
}

const nextState = await iterate(preVisitNoteWorkflow, state)
await fs.writeFile(path, JSON.stringify(nextState, null, 2), 'utf-8')
const toolCall = toolRequestMessage.tool_calls.find(
(toolCall) => toolCall.id === req.body.tool_call_id
)
if (!toolCall) {
throw new Error('Tool call not found')
}

const agentRequest = state.agentRequest.concat({
role: 'tool',
tool_call_id: toolCall.id,
content: req.body.content,
})

const allToolRequests = toolRequestMessage.tool_calls.map((toolCall) => toolCall.id)
const hasAllToolCalls = allToolRequests.every((toolCallId) =>
agentRequest.some(
(request) => 'tool_call_id' in request && request.tool_call_id === toolCallId
)
)

return nextState
// Add tool response to the workflow
// Change agent status to `step` if all tool calls have been added, so
// runVisit will continue
if (hasAllToolCalls) {
visits[req.params.id] = {
...state,
agentStatus: 'step',
agentRequest,
}
runVisit(req.params.id)
} else {
visits[req.params.id] = {
...state,
agentRequest,
}
}

return {
hasAllToolCalls,
}
}
)

const port = parseInt(process.env['PORT'] || '3000', 10)
server.listen({
port,
})
console.log(`🚀 Server running at http://localhost:${port}`)
console.log(`Run 'curl -X POST http://localhost:${port}/start' to start the workflow`)
console.log(
`Run 'curl -X POST http://localhost:${port}/iterate/ID -d '{"message":"Hello"}' to iterate the workflow with the message provided optionally as an answer added to the state`
)
/**
* Start the server
*/
const port = parseInt(process.env['PORT'] || '3000')
server.listen({ port })

console.log(s`
🚀 Server running at http://localhost:${port}
Things to do:
${chalk.bold('🩺 Create a new visit:')}
${chalk.gray(`curl -X POST http://localhost:${port}/visits`)}
${chalk.bold('💻 Check the status of the visit:')}
${chalk.gray(`curl -X GET http://localhost:${port}/visits/:id`)}
${chalk.bold('🔧 If the workflow is waiting for a tool call, you will get a response like this:')}
${chalk.gray(`[{"id":"<tool_call_id>","type":"function"}]`)}
${chalk.bold('📝 Add a message to the visit:')}
${chalk.gray(`curl -X POST http://localhost:${port}/visits/:id/messages H "Content-Type: application/json" -d '{"tool_call_id":"...","content":"..."}'`)}
Note:
- You can only add messages when the workflow is waiting for a tool call
`)

type ToolCallMessage = {
tool_call_id: string
content: string
}

/**
* Helper function, inspired by `teamwork`.
* It will continue running the visit in the background and will stop when the workflow is finished.
*/
async function runVisit(id: string) {
const state = visits[id]
if (!state) {
throw new Error('Workflow not found')
}

if (
state.status === 'finished' ||
(state.status === 'assigned' && state.agentStatus === 'tool')
) {
return
}

visits[id] = await iterate(preVisitNoteWorkflow, state)

return runVisit(id)
}
28 changes: 0 additions & 28 deletions example/src/medical_survey_stateless.ts

This file was deleted.

3 changes: 1 addition & 2 deletions example/src/surprise_trip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ const researchTripWorkflow = workflow({
Comprehensive day-by-day itinerary for the trip to Wrocław, Poland.
Ensure the itinerary integrates flights, hotel information, and all planned activities and dining experiences.
`,
// Uncomment to see the workflow state in the console
// snapshot: logger,
snapshot: logger,
})

const result = await teamwork(researchTripWorkflow)
Expand Down

0 comments on commit eb67943

Please sign in to comment.