-
Notifications
You must be signed in to change notification settings - Fork 75
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This adds a streaming example to the API. This has a README with some bare-bones pieces, but is meant to accompany a blog post. We use SSEs + async + fastAPI to build an efficient application.
- Loading branch information
1 parent
d5c7318
commit cd094d6
Showing
18 changed files
with
1,459 additions
and
95 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,214 @@ | ||
# Streaming in FastAPI | ||
|
||
This example demonstrates how to stream data from Burr's streaming mode through FastAPI. | ||
|
||
This is gone over in more detail in our blog post (coming soon). This README will go over the main code + roles and how to run the example. | ||
|
||
This uses Server Sent Events (SSE) to stream data from FastAPI to the frontend. This also uses Async Generators to ensure optimal performance. | ||
|
||
## Example | ||
|
||
The application we created will be a simple chatbot proxy. It has a few diffrent modes -- it can either decide a prompt is "unsafe" (in this case meaning that it has the word "unsafe" in it, but this would typically go to specific model), | ||
or do one of the following: | ||
|
||
1. Generate code | ||
2. Answer a question | ||
3. Generate a poem | ||
4. Prompt for more | ||
|
||
It will use an LLM to decide which to do. It streams back text using async streaming in Burr. Read more about how that is implemented [here](https://burr.dagworks.io/concepts/streaming-actions/). | ||
|
||
Note that, even though not every response is streaming (E.G. unsafe response, which is hardcoded), they are modeled as streaming to make interaction with the app simpler. | ||
|
||
The app looks like this: | ||
|
||
 | ||
|
||
## Running the example | ||
|
||
You will need an API key from Open AI to run this. | ||
|
||
You'll first have to have `burr[start]` installed. You can then view this demo in your app by running Burr: | ||
|
||
```bash | ||
burr | ||
``` | ||
|
||
This will open a browser on [http://localhost:7241](http://localhost:7241) | ||
|
||
Navigate to the [streaming example](http://localhost:7241/demos/streaming-chatbot). | ||
|
||
## Streaming in Burr | ||
|
||
Read more [here](https://burr.dagworks.io/concepts/streaming-actions/) | ||
To use streaming in Burr, you write your actions as a generator. If you're using the function-based API (as we do in this example), | ||
the function should yield a tuple, consisting of: | ||
1. The result (intermediate or final) | ||
2. The updated (`None` if intermediate, present if final) | ||
|
||
(2) will always be the last yield, and indicate that the streaming action is complete. Take, for example, the | ||
"unsafe" response, meaning that the LLM has determined that it cannot respond. This is a simple example -- just to illustrate streaming: | ||
|
||
This sleeps to make a point (and make the demo more fun/give the appearance of the app "thinking") -- in reality you really would not want to do this. | ||
|
||
```python | ||
@streaming_action(reads=["prompt", "chat_history"], writes=["response"]) | ||
async def unsafe_response(state: State) -> Tuple[dict, State]: | ||
result = { | ||
"response": { | ||
"content": "I am afraid I can't respond to that...", | ||
"type": "text", | ||
"role": "assistant", | ||
} | ||
} | ||
for word in result["response"]["content"].split(): | ||
await asyncio.sleep(0.1) | ||
yield {"delta": word + " "}, None | ||
yield result, state.update(**result).append(chat_history=result["response"]) | ||
``` | ||
|
||
This is an async generator that yields just the delta until it gets to the end. This can easily proxy from another service (openAI for example), | ||
or do some other async operation. | ||
|
||
When you call the action, you will get back a `AsyncStreamingResponseContainer` object. This is *also* an async generator! | ||
|
||
```python | ||
action, streaming_container = await app.astream_result( | ||
halt_after=TERMINAL_ACTIONS, inputs={"prompt": "Please generate a limerick about Alexander Hamilton and Aaron Burr"} | ||
) | ||
|
||
async for item in streaming_container: | ||
print(item['delta'], end="") | ||
``` | ||
|
||
This will stream the results out. | ||
|
||
## Connecting to FastAPI | ||
|
||
To connect to FastAPI, we need to do the following: | ||
|
||
1. Instantiate a Burr Application in FastAPI | ||
2. Create a route that will stream the data to the frontend, which is *also* an async generator. | ||
3. Bridge the two together | ||
|
||
In [server.py](server.py), we have a helpful `_get_application` function that will get or create an application for us. | ||
We can then call a chat_response function that looks like this: | ||
|
||
```python | ||
@router.post("/response/{project_id}/{app_id}", response_class=StreamingResponse) | ||
async def chat_response(project_id: str, app_id: str, prompt: PromptInput) -> StreamingResponse: | ||
burr_app = _get_application(project_id, app_id) | ||
chat_history = burr_app.state.get("chat_history", []) | ||
action, streaming_container = await burr_app.astream_result( | ||
halt_after=chat_application.TERMINAL_ACTIONS, inputs=dict(prompt=prompt.prompt) | ||
) | ||
|
||
async def sse_generator(): | ||
yield f"data: {json.dumps({'type': 'chat_history', 'value': chat_history})}\n\n" | ||
|
||
async for item in streaming_container: | ||
yield f"data: {json.dumps({'type': 'delta', 'value': item['delta']})} \n\n" | ||
|
||
return StreamingResponse(sse_generator()) | ||
``` | ||
|
||
Note this returns a [StreamingResponse](https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse) | ||
and does some fancy stuff with the SSE API. Particularly: | ||
1. It returns the initial state, so the UI can update to the latest (not strictly necessary, but nice to have for rendering) | ||
2. It streams the deltas as they come in | ||
3. It returns the data in the format: "data: ...\n\n" as this is standard for SSE | ||
|
||
And it's as simple as that! You can now stream data from Burr to FastAPI. | ||
|
||
## Streaming in Typescript/React | ||
|
||
This part can get a little messy with state management/chat history, but here's the basics of it. There are multiple approaches | ||
to managing SSE in React, but we will be using the very bare-bones `fetch` and `getReaders()` API. | ||
|
||
The following code is the `submitPrompt` function that will send the prompt and modify the state. This gets called when the | ||
user submits a prompt (E.G. on the `onClick` of a button). | ||
|
||
It relies on the state variables: | ||
|
||
- `currentPrompt`/`setCurrentPrompt` - the current prompt | ||
- `chatHistory`/`setChatHistory` - the chat history | ||
|
||
This also assumes the server is a post request with the prompt in the URL (putting it in the body is probably better...) | ||
|
||
### Fetch the result (POST) | ||
|
||
First we'll fetch the result with a post request to match the endpoint above. We will also get a reader object | ||
to help us iterate through the inputs: | ||
|
||
```typescript | ||
const response = await fetch( | ||
`/api/v0/streaming_chatbot/response/${props.projectId}/${props.appId}`, | ||
{ | ||
method: 'POST', | ||
headers: { 'Content-Type': 'application/json' }, | ||
body: JSON.stringify({ prompt: currentPrompt }) | ||
} | ||
); | ||
const reader = response.body?.getReader(); | ||
``` | ||
|
||
Then we'll run through the reader object and parse the data, modifying the state as we go: | ||
|
||
```typescript | ||
if (reader) { | ||
const decoder = new TextDecoder('utf-8'); | ||
// eslint-disable-next-line no-constant-condition | ||
while (true) { | ||
const result = await reader.read(); | ||
if (result.done) { | ||
break; | ||
} | ||
const message = decoder.decode(result.value, { stream: true }); | ||
message | ||
.split('data: ') | ||
.slice(1) | ||
.forEach((item) => { | ||
const event: Event = JSON.parse(item); | ||
if (event.type === 'chat_history') { | ||
const chatMessageEvent = event as ChatHistoryEvent; | ||
setDisplayedChatHistory(chatMessageEvent.value); | ||
} | ||
if (event.type === 'delta') { | ||
const chatMessageEvent = event as ChatMessageEvent; | ||
chatResponse += chatMessageEvent.value; | ||
setCurrentResponse(chatResponse); | ||
} | ||
}); | ||
} | ||
setDisplayedChatHistory((chatHistory) => [ | ||
...chatHistory, | ||
{ | ||
role: ChatItem.role.USER, | ||
content: currentPrompt, | ||
type: ChatItem.type.TEXT | ||
}, | ||
{ | ||
role: ChatItem.role.ASSISTANT, | ||
content: chatResponse, | ||
type: ChatItem.type.TEXT | ||
} | ||
]); | ||
setCurrentPrompt(''); | ||
setCurrentResponse(''); | ||
setIsChatWaiting(false); | ||
} | ||
``` | ||
In the above we: | ||
1. Check if the reader is present (it is likely worth adding more error-correcting here) | ||
2. Break if the reader is done | ||
3. Decode the message | ||
4. Parse the message | ||
a. If it is a chat history event, update the chat history | ||
b. If it is a delta event, update the chat response | ||
5. Update the chat history with the new prompt and response | ||
6. Reset the variables so we don't render them twice | ||
|
||
While the logic of updating state is bespoke to how we do it here, looping through the reader and parsing the data | ||
is a common, highly generatizable operation. | ||
|
||
Note there are multiple ways of doing this -- this was just the simplest. |
Empty file.
Oops, something went wrong.