-
Notifications
You must be signed in to change notification settings - Fork 80
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Get basic streaming to work for run (#683)
Get basic streaming to work for run Building upon what Ryan investigated in #651, all the frontend changes are from him I just rebased onto his PR This is a bit tricky becasue we have to support: 1. streaming models --> use a queue iterator for passing the output text 2. non-streaming models --> still yield, just don't use a queue iterator and wait for run command to finish General flow: 1. Parse prompts from client 2. Define stream callback with queue iterator 3. Start thread to run aiconfig without blocking main thread from accessing queue iterator 4. Create a copy of the original AIConfig so we can write partially streamed outputs, yield and display it without risk of race conditions 5. Wait for queue iterator to start containing data, or wait until max timeout (becuase model may not support streaming) 5. Iterate through queue iterator, saving output to display config, yield display config 6. Once output is complete, wait for the original `config.run()` thread and display the output from that Open questions/TODOs 1. [solved - use `while output_text_queue.isEmpty() and t.is_alive()`] ~~How can we check whether model supports streaming or not? Right now we just default to having a max timeout of 5s, but long-term would be better for people to explicitly mark this as a boolean flag in their model parser class~~ 2. I need update the output format for streaming. I thought it was fine but guess not, will verify again. A bit annoying but also not a crazy blocker for now 3. Client needs to also support streaming, but that's fine Ryan can get unblocked with this diff now 4. Pretty complex, but streaming will break for `run_with_dependencies`. I've got a proposal to fix forward in https://github.com/lastmile-ai/gradio-workbook/pull/64 and really want people to take a look and give feedback ## Test plan ```bash alias aiconfig="python -m 'aiconfig.scripts.aiconfig_cli'" aiconfig edit --aiconfig-path="/Users/rossdancraig/Projects/aiconfig/cookbooks/Getting-Started/travel.aiconfig.json" --server-port=8080 --server-mode=debug_servers # Now run this from another terminal curl http://localhost:8080/api/run -d '{"prompt_name":"get_activities"}' -X POST -H 'Content-Type: application/json' ``` I also added this line to print output: ``` print(accumulated_output_text) ``` Streaming https://github.com/lastmile-ai/aiconfig/assets/151060367/d8930ea6-3143-49a3-89c6-4a2668c2e9e1 Non-streaming (same as before) https://github.com/lastmile-ai/aiconfig/assets/151060367/5aae7c7f-c273-4be7-bcb9-e96199a04076
- Loading branch information
Showing
5 changed files
with
245 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
37 changes: 37 additions & 0 deletions
37
python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import oboe, { Options } from "oboe"; | ||
|
||
// Promisify Oboe - similar to this: https://stackoverflow.com/questions/54855494/rewrite-fetch-call-to-oboe-for-json-streams-with-typescript | ||
// Except it allows to use .node('*', fn) & only resolves on done | ||
// See https://medium.com/@amberlamps84/oboe-js-mongodb-express-node-js-and-the-beauty-of-streams-4a90fad5414 on using oboe vs raw streams | ||
// (multiple chunks can be sent in single response & we only want valid json ones) | ||
export async function streamingApi<T>( | ||
headers: Options, | ||
on: string = "*", | ||
fn: (data: any) => void, | ||
on2?: string, | ||
fn2?: (data: any) => void, | ||
on3?: string, | ||
fn3?: (data: any) => void | ||
): Promise<T> { | ||
return new Promise((resolve, reject) => { | ||
if (fn2 && on2 && fn3 && on3) { | ||
oboe(headers) | ||
.node(on, fn) | ||
.node(on2, fn2) | ||
.node(on3, fn3) | ||
.done((data) => resolve(data)) | ||
.fail((err) => reject(err.jsonBody)); | ||
} else if (fn2 && on2) { | ||
oboe(headers) | ||
.node(on, fn) | ||
.node(on2, fn2) | ||
.done((data) => resolve(data)) | ||
.fail((err) => reject(err.jsonBody)); | ||
} else { | ||
oboe(headers) | ||
.node(on, fn) | ||
.done((data) => resolve(data)) | ||
.fail((err) => reject(err.jsonBody)); | ||
} | ||
}); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
"""Queue iterator for streaming. Can only process strings for now | ||
but in future will try to make it generic""" | ||
# import asyncio | ||
from queue import Queue | ||
|
||
# from typing import Generic, TypeVar | ||
|
||
# # TODO: Add generic typing for queue items | ||
# # (couldn't get sentinel value to work with generics) | ||
# T = TypeVar('T') | ||
STOP_STREAMING_SIGNAL = object() # sentinel value to indicate end of stream | ||
|
||
|
||
class QueueIterator: | ||
"""In order to support text streaming, we need to store | ||
the output in a queue and iterate over those values. A lot of this was | ||
inspired by HuggingFace's TextIteratorStreamer object: | ||
I know I can just use a queue directly in the callsite with | ||
`iter(queue.get, None)`, but having a class makes it easier to manage | ||
and abstracts it a bit more. | ||
""" | ||
|
||
def __init__(self): | ||
self.q = Queue() | ||
self.stop_signal = STOP_STREAMING_SIGNAL | ||
self.timeout = None | ||
|
||
def __iter__(self): | ||
return self | ||
|
||
def __next__(self): | ||
value = self.q.get(block=True, timeout=self.timeout) | ||
if value == self.stop_signal: | ||
raise StopIteration() | ||
return value | ||
|
||
def put(self, text: str, stream_end: bool = False): | ||
self.q.put(text, timeout=self.timeout) | ||
if stream_end: | ||
self.q.put(self.stop_signal, timeout=self.timeout) | ||
|
||
def isEmpty(self) -> bool: | ||
return self.q.empty() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters