Skip to content

Commit

Permalink
Get basic streaming to work for run
Browse files Browse the repository at this point in the history
Building upon what Ryan investigated in #651, all the frontend changes are from him I just rebased onto his PR


This is a bit tricky becasue we have to support:

1. streaming models --> use a queue iterator for passing the output text
2. non-streaming models --> still yield, just don't use a queue iterator and wait for run command to finish


General flow:
1. Parse prompts from client
2. Define stream callback with queue iterator
3. Start thread to run aiconfig without blocking main thread from accessing queue iterator
4. Create a copy of the original AIConfig so we can write partially streamed outputs, yield and display it without risk of race conditions
5. Wait for queue iterator to start containing data, or wait until max timeout (becuase model may not support streaming)
5. Iterate through queue iterator, saving output to display config, yield display config
6. Once output is complete, wait for the original `config.run()` thread and display the output from that

Open questions/TODOs
1. How can we check whether model supports streaming or not? Right now we just default to having a max timeout of 5s, but long-term would be better for people to explicitly mark this as a boolean flag in their model parser class
2. I need update the output format for streaming. I thought it was fine but guess not, will verify again. Not ideal but also not a crazy blocker for now
3. Client needs to also support streaming, but that's fine Ryan can get unblocked with this diff now

## Test plan
```bash
alias aiconfig="python -m 'aiconfig.scripts.aiconfig_cli'"
aiconfig edit --aiconfig-path="/Users/rossdancraig/Projects/aiconfig/cookbooks/Getting-Started/travel.aiconfig.json" --server-port=8080 --server-mode=debug_servers

# Now run this from another terminal
curl http://localhost:8080/api/run -d '{"prompt_name":"get_activities"}' -X POST -H 'Content-Type: application/json'
```

I also added this line to print output:
```
print(accumulated_output_text)
```

| Streaming | Non-streaming (you can refresh page to short-circuit and get AIConfig) | Non-streaming (normal waiting) |
|---|---|---|
|  https://github.com/lastmile-ai/aiconfig/assets/151060367/446fe008-9f34-4515-bb80-7b317ab15bd6 | https://github.com/lastmile-ai/aiconfig/assets/151060367/d94b636a-2703-49da-a729-82c7a5f1079a | https://github.com/lastmile-ai/aiconfig/assets/151060367/5195e23a-cade-44ac-852e-bdd0bfe374a4 |
  • Loading branch information
Rossdan Craig [email protected] committed Dec 30, 2023
1 parent 8529c2f commit f6db1e6
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 16 deletions.
2 changes: 2 additions & 0 deletions python/src/aiconfig/editor/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"aiconfig": "../../../../../typescript",
"lodash": "^4.17.21",
"node-fetch": "^3.3.2",
"oboe": "^2.1.5",
"react": "^18",
"react-dom": "^18",
"react-markdown": "^8.0.6",
Expand All @@ -48,6 +49,7 @@
"devDependencies": {
"@types/lodash": "^4.14.202",
"@types/node": "^20",
"@types/oboe": "^2.1.4",
"@types/react": "^18",
"@types/react-dom": "^18",
"@typescript-eslint/eslint-plugin": "^6.16.0",
Expand Down
21 changes: 20 additions & 1 deletion python/src/aiconfig/editor/client/src/Editor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AIConfig, ModelMetadata, Prompt } from "aiconfig";
import { useCallback, useEffect, useMemo, useState } from "react";
import { ufetch } from "ufetch";
import { ROUTE_TABLE } from "./utils/api";
import { streamingApi } from "./utils/oboeHelpers";

export default function Editor() {
const [aiconfig, setAiConfig] = useState<AIConfig | undefined>();
Expand Down Expand Up @@ -57,9 +58,27 @@ export default function Editor() {
}, []);

const runPrompt = useCallback(async (promptName: string) => {
return await ufetch.post(ROUTE_TABLE.RUN_PROMPT, {
const res = await ufetch.post("http://localhost:8080/api/run", {
prompt_name: promptName,
});
await streamingApi(
{
url: "http://localhost:8080/api/run",
method: "POST",
body: {
prompt_name: promptName,
},
},
"output_chunk",
(data) => {
console.log("output_chunk data: ", data);
},
"aiconfig",
(data) => {
console.log("aiconfig data: ", data);
}
);
return res;
}, []);

const updatePrompt = useCallback(
Expand Down
37 changes: 37 additions & 0 deletions python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import oboe, { Options } from "oboe";

// Promisify Oboe - similar to this: https://stackoverflow.com/questions/54855494/rewrite-fetch-call-to-oboe-for-json-streams-with-typescript
// Except it allows to use .node('*', fn) & only resolves on done
// See https://medium.com/@amberlamps84/oboe-js-mongodb-express-node-js-and-the-beauty-of-streams-4a90fad5414 on using oboe vs raw streams
// (multiple chunks can be sent in single response & we only want valid json ones)
export async function streamingApi<T>(
headers: Options,
on: string = "*",
fn: (data: any) => void,
on2?: string,
fn2?: (data: any) => void,
on3?: string,
fn3?: (data: any) => void
): Promise<T> {
return new Promise((resolve, reject) => {
if (fn2 && on2 && fn3 && on3) {
oboe(headers)
.node(on, fn)
.node(on2, fn2)
.node(on3, fn3)
.done((data) => resolve(data))
.fail((err) => reject(err.jsonBody));
} else if (fn2 && on2) {
oboe(headers)
.node(on, fn)
.node(on2, fn2)
.done((data) => resolve(data))
.fail((err) => reject(err.jsonBody));
} else {
oboe(headers)
.node(on, fn)
.done((data) => resolve(data))
.fail((err) => reject(err.jsonBody));
}
});
}
19 changes: 19 additions & 0 deletions python/src/aiconfig/editor/client/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2525,6 +2525,13 @@
dependencies:
undici-types "~5.26.4"

"@types/oboe@^2.1.4":
version "2.1.4"
resolved "https://registry.yarnpkg.com/@types/oboe/-/oboe-2.1.4.tgz#d92c4636d0b7737803e4361e10e8dad488f39634"
integrity sha512-bXt4BXSQy0N/buSIak1o0TjYAk2SAeK1aZV9xKcb+xVGWYP8NcMOFy2T7Um3kIvEcQJzrdgJ8R6fpbRcp/LEww==
dependencies:
"@types/node" "*"

"@types/parse-json@^4.0.0":
version "4.0.2"
resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.2.tgz#5950e50960793055845e956c427fc2b0d70c5239"
Expand Down Expand Up @@ -6069,6 +6076,11 @@ http-errors@~1.6.2:
setprototypeof "1.1.0"
statuses ">= 1.4.0 < 2"

http-https@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/http-https/-/http-https-1.0.0.tgz#2f908dd5f1db4068c058cd6e6d4ce392c913389b"
integrity sha512-o0PWwVCSp3O0wS6FvNr6xfBCHgt0m1tvPLFOCc2iFDKTRAXhB7m8klDf7ErowFH8POa6dVdGatKU5I1YYwzUyg==

http-parser-js@>=0.5.1:
version "0.5.8"
resolved "https://registry.yarnpkg.com/http-parser-js/-/http-parser-js-0.5.8.tgz#af23090d9ac4e24573de6f6aecc9d84a48bf20e3"
Expand Down Expand Up @@ -8510,6 +8522,13 @@ object.values@^1.1.0, object.values@^1.1.6, object.values@^1.1.7:
define-properties "^1.2.0"
es-abstract "^1.22.1"

oboe@^2.1.5:
version "2.1.5"
resolved "https://registry.yarnpkg.com/oboe/-/oboe-2.1.5.tgz#5554284c543a2266d7a38f17e073821fbde393cd"
integrity sha512-zRFWiF+FoicxEs3jNI/WYUrVEgA7DeET/InK0XQuudGHRg8iIob3cNPrJTKaz4004uaA9Pbe+Dwa8iluhjLZWA==
dependencies:
http-https "^1.0.0"

obuf@^1.0.0, obuf@^1.1.2:
version "1.1.2"
resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e"
Expand Down
42 changes: 42 additions & 0 deletions python/src/aiconfig/editor/server/queue_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Queue iterator for streaming. Can only process strings for now
but in future will try to make it generic"""
# import asyncio
from queue import Queue
# from typing import Generic, TypeVar

# # TODO: Add generic typing for queue items
# # (couldn't get sentinel value to work with generics)
# T = TypeVar('T')
STOP_STREAMING_SIGNAL = object() #sentinel value to indicate end of stream

class QueueIterator():
"""In order to support text streaming, we need to store
the output in a queue and iterate over those values. A lot of this was
inspired by HuggingFace's TextIteratorStreamer object:
I know I can just use a queue directly in the callsite with
`iter(queue.get, None)`, but having a class makes it easier to manage
and abstracts it a bit more.
"""
def __init__(self):
self.q = Queue()
self.stop_signal = STOP_STREAMING_SIGNAL
self.timeout = None

def __iter__(self):
return self

def __next__(self):
value = self.q.get(block=True, timeout=self.timeout)
if value == self.stop_signal:
raise StopIteration()
else:
return value

def put(self, text: str, stream_end: bool = False):
self.q.put(text, timeout=self.timeout)
if stream_end:
self.q.put(self.stop_signal, timeout=self.timeout)

def isEmpty(self) -> bool:
return self.q.empty()
127 changes: 112 additions & 15 deletions python/src/aiconfig/editor/server/server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import asyncio
import copy
import json
import logging
from typing import Any, Type

import lastmile_utils.lib.core.api as core_utils
import result
import threading
import time
from aiconfig.Config import AIConfigRuntime
from aiconfig.editor.server.server_utils import (
EditServerConfig,
Expand All @@ -23,13 +28,17 @@
safe_load_from_disk,
safe_run_aiconfig_static_method,
)
from aiconfig.editor.server.queue_iterator import (
QueueIterator,
STOP_STREAMING_SIGNAL,
)
from aiconfig.model_parser import InferenceOptions
from aiconfig.registry import ModelParserRegistry
from flask import Flask, request
from aiconfig.schema import ExecuteResult, Prompt, Output
from flask import Flask, Response, request, stream_with_context
from flask_cors import CORS
from result import Err, Ok, Result

from aiconfig.schema import Prompt

logging.getLogger("werkzeug").disabled = True

Expand Down Expand Up @@ -162,24 +171,112 @@ def create() -> FlaskResponse:


@app.route("/api/run", methods=["POST"])
async def run() -> FlaskResponse:
def run():
EXCLUDE_OPTIONS = {
"prompt_index": True,
"file_path": True,
"callback_manager": True,
}
state = get_server_state(app)
aiconfig = state.aiconfig
request_json = request.get_json()
prompt_name: str = request_json.get("prompt_name", "get_activities") # Hard-coded
params = request_json.get("params", {}) #Fixed in https://github.com/lastmile-ai/aiconfig/pull/668
stream = request_json.get("stream", True)

# Define stream callback and queue object for streaming results
output_text_queue = QueueIterator()
def update_output_queue(data, _accumulated_data, _index) -> None:
should_end_stream = data == STOP_STREAMING_SIGNAL
output_text_queue.put(data, should_end_stream)
inference_options = InferenceOptions(
stream=stream,
stream_callback=update_output_queue,
)

def generate():
# Use multi-threading so that we don't block run command from
# displaying the streamed output (if streaming is supported)
def run_async_config_in_thread():
asyncio.run(aiconfig.run(
prompt_name=prompt_name,
params=params,
run_with_dependencies=False,
options=inference_options,
)
)
output_text_queue.put(STOP_STREAMING_SIGNAL)
t = threading.Thread(target=run_async_config_in_thread)
t.start()

# Create a deep copy of the state aiconfig so we can yield an AIConfig
# with streaming partial outputs in the meantime. This probably isn't
# necessary, but just getting unblocked for now
displaying_config = copy.deepcopy(aiconfig)

# Need to wait until streamer has at least 1 item to display
SLEEP_DELAY_SECONDS = 0.1
MAX_TIMEOUT_SECONDS = 5.0
wait_time_in_seconds = 0.0
while output_text_queue.isEmpty():
time.sleep(0.1)
wait_time_in_seconds += SLEEP_DELAY_SECONDS
print(f"Output queue is currently empty. Waiting for {wait_time_in_seconds:.1f}s...")

# TODO: We should have a better way to check if the model supports
# streaming or not and bypass this if they do. I'm thinking we
# could add an abstract field that all models need to set T/F
# but we'll see. For now this works
# And yea I know time.sleep() isn't super accurate, but it's fine,
# we can fix later
if wait_time_in_seconds >= MAX_TIMEOUT_SECONDS:
print(f"Output queue is still empty after {wait_time_in_seconds:.1f}s. Breaking...")
break

yield "["
aiconfig_json: str | None = None
if not output_text_queue.isEmpty():
accumulated_output_text = ""
for text in output_text_queue:
if isinstance(text, str):
accumulated_output_text += text
elif isinstance(text, dict) and "content" in text:
# TODO: Fix streaming output format so that it returns text
accumulated_output_text += text["content"]

accumulated_output : Output = ExecuteResult(
**{
"output_type": "execute_result",
"data": accumulated_output_text,
# Assume streaming only supports single output
# I think this actually may be wrong for PaLM or OpenAI
# TODO: Need to sync with Ankush but can fix forward
"execution_count": 0,
"metadata": {},
}
)

displaying_config.add_output(prompt_name, accumulated_output, overwrite=True)
aiconfig_json = displaying_config.model_dump(exclude=EXCLUDE_OPTIONS)
yield json.dumps({"aiconfig": aiconfig_json})

# Some models don't support streaming, so we need to wait for run
# process to complete and yield the final output
t.join()
if aiconfig_json is None:
aiconfig_json = aiconfig.model_dump(exclude=EXCLUDE_OPTIONS)
yield json.dumps({"aiconfig": aiconfig_json})
yield "]"

try:
prompt_name = request_json["prompt_name"]
params = request_json.get("params", {})
stream = request_json.get("stream", False)
options = InferenceOptions(stream=stream)
run_output = await aiconfig.run(prompt_name, params, options) # type: ignore
LOGGER.debug(f"run_output: {run_output}")
return HttpResponseWithAIConfig(
#
message="Ran prompt",
code=200,
aiconfig=aiconfig,
).to_flask_format()
LOGGER.info(f"Running `aiconfig.run()` command with request: {request_json}")
# Streaming based on
# https://stackoverflow.com/questions/73275517/flask-not-streaming-json-response
return Response(
stream_with_context(generate()),
status=200,
content_type="application/json",
)
except Exception as e:
return HttpResponseWithAIConfig(
#
Expand Down

0 comments on commit f6db1e6

Please sign in to comment.