From 31ab023faa21d6d51f52b9853dd2928705dd1456 Mon Sep 17 00:00:00 2001 From: "Rossdan Craig rossdan@lastmileai.dev" <> Date: Fri, 5 Jan 2024 18:20:35 -0500 Subject: [PATCH] Get basic streaming to work for run Building upon what Ryan investigated in https://github.com/lastmile-ai/aiconfig/pull/651, all the frontend changes are from him I just rebased onto his PR This is a bit tricky becasue we have to support: 1. streaming models --> use a queue iterator for passing the output text 2. non-streaming models --> still yield, just don't use a queue iterator and wait for run command to finish General flow: 1. Parse prompts from client 2. Define stream callback with queue iterator 3. Start thread to run aiconfig without blocking main thread from accessing queue iterator 4. Create a copy of the original AIConfig so we can write partially streamed outputs, yield and display it without risk of race conditions 5. Wait for queue iterator to start containing data, or wait until max timeout (becuase model may not support streaming) 5. Iterate through queue iterator, saving output to display config, yield display config 6. Once output is complete, wait for the original `config.run()` thread and display the output from that Open questions/TODOs 1. [solved - use `while output_text_queue.isEmpty() and t.is_alive()`] ~~How can we check whether model supports streaming or not? Right now we just default to having a max timeout of 5s, but long-term would be better for people to explicitly mark this as a boolean flag in their model parser class~~ 2. I need update the output format for streaming. I thought it was fine but guess not, will verify again. A bit annoying but also not a crazy blocker for now 3. Client needs to also support streaming, but that's fine Ryan can get unblocked with this diff now 4. Pretty complex, but streaming will break for `run_with_dependencies`. I've got a proposal to fix forward in https://github.com/lastmile-ai/gradio-workbook/pull/64 and really want people to take a look and give feedback ## Test plan ```bash alias aiconfig="python -m 'aiconfig.scripts.aiconfig_cli'" aiconfig edit --aiconfig-path="/Users/rossdancraig/Projects/aiconfig/cookbooks/Getting-Started/travel.aiconfig.json" --server-port=8080 --server-mode=debug_servers # Now run this from another terminal curl http://localhost:8080/api/run -d '{"prompt_name":"get_activities"}' -X POST -H 'Content-Type: application/json' ``` I also added this line to print output: ``` print(accumulated_output_text) ``` Streaming https://github.com/lastmile-ai/aiconfig/assets/151060367/055084cc-fb95-43d1-8567-88a7b2e91142 Non-streaming https://github.com/lastmile-ai/aiconfig/assets/151060367/446fe008-9f34-4515-bb80-7b317ab15bd6 --- .../src/aiconfig/editor/client/package.json | 2 + .../editor/client/src/utils/oboeHelpers.ts | 37 ++++ python/src/aiconfig/editor/client/yarn.lock | 19 ++ .../aiconfig/editor/server/queue_iterator.py | 44 +++++ python/src/aiconfig/editor/server/server.py | 163 +++++++++++++++--- 5 files changed, 245 insertions(+), 20 deletions(-) create mode 100644 python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts create mode 100644 python/src/aiconfig/editor/server/queue_iterator.py diff --git a/python/src/aiconfig/editor/client/package.json b/python/src/aiconfig/editor/client/package.json index 093faab55..5b360f21f 100644 --- a/python/src/aiconfig/editor/client/package.json +++ b/python/src/aiconfig/editor/client/package.json @@ -38,6 +38,7 @@ "aiconfig": "../../../../../typescript", "lodash": "^4.17.21", "node-fetch": "^3.3.2", + "oboe": "^2.1.5", "react": "^18", "react-dom": "^18", "react-markdown": "^8.0.6", @@ -49,6 +50,7 @@ "devDependencies": { "@types/lodash": "^4.14.202", "@types/node": "^20", + "@types/oboe": "^2.1.4", "@types/react": "^18", "@types/react-dom": "^18", "@typescript-eslint/eslint-plugin": "^6.16.0", diff --git a/python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts b/python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts new file mode 100644 index 000000000..6806b2cbb --- /dev/null +++ b/python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts @@ -0,0 +1,37 @@ +import oboe, { Options } from "oboe"; + +// Promisify Oboe - similar to this: https://stackoverflow.com/questions/54855494/rewrite-fetch-call-to-oboe-for-json-streams-with-typescript +// Except it allows to use .node('*', fn) & only resolves on done +// See https://medium.com/@amberlamps84/oboe-js-mongodb-express-node-js-and-the-beauty-of-streams-4a90fad5414 on using oboe vs raw streams +// (multiple chunks can be sent in single response & we only want valid json ones) +export async function streamingApi( + headers: Options, + on: string = "*", + fn: (data: any) => void, + on2?: string, + fn2?: (data: any) => void, + on3?: string, + fn3?: (data: any) => void +): Promise { + return new Promise((resolve, reject) => { + if (fn2 && on2 && fn3 && on3) { + oboe(headers) + .node(on, fn) + .node(on2, fn2) + .node(on3, fn3) + .done((data) => resolve(data)) + .fail((err) => reject(err.jsonBody)); + } else if (fn2 && on2) { + oboe(headers) + .node(on, fn) + .node(on2, fn2) + .done((data) => resolve(data)) + .fail((err) => reject(err.jsonBody)); + } else { + oboe(headers) + .node(on, fn) + .done((data) => resolve(data)) + .fail((err) => reject(err.jsonBody)); + } + }); +} diff --git a/python/src/aiconfig/editor/client/yarn.lock b/python/src/aiconfig/editor/client/yarn.lock index ea32e950c..f766e5e80 100644 --- a/python/src/aiconfig/editor/client/yarn.lock +++ b/python/src/aiconfig/editor/client/yarn.lock @@ -2539,6 +2539,13 @@ dependencies: undici-types "~5.26.4" +"@types/oboe@^2.1.4": + version "2.1.4" + resolved "https://registry.yarnpkg.com/@types/oboe/-/oboe-2.1.4.tgz#d92c4636d0b7737803e4361e10e8dad488f39634" + integrity sha512-bXt4BXSQy0N/buSIak1o0TjYAk2SAeK1aZV9xKcb+xVGWYP8NcMOFy2T7Um3kIvEcQJzrdgJ8R6fpbRcp/LEww== + dependencies: + "@types/node" "*" + "@types/parse-json@^4.0.0": version "4.0.2" resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.2.tgz#5950e50960793055845e956c427fc2b0d70c5239" @@ -6083,6 +6090,11 @@ http-errors@~1.6.2: setprototypeof "1.1.0" statuses ">= 1.4.0 < 2" +http-https@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/http-https/-/http-https-1.0.0.tgz#2f908dd5f1db4068c058cd6e6d4ce392c913389b" + integrity sha512-o0PWwVCSp3O0wS6FvNr6xfBCHgt0m1tvPLFOCc2iFDKTRAXhB7m8klDf7ErowFH8POa6dVdGatKU5I1YYwzUyg== + http-parser-js@>=0.5.1: version "0.5.8" resolved "https://registry.yarnpkg.com/http-parser-js/-/http-parser-js-0.5.8.tgz#af23090d9ac4e24573de6f6aecc9d84a48bf20e3" @@ -8524,6 +8536,13 @@ object.values@^1.1.0, object.values@^1.1.6, object.values@^1.1.7: define-properties "^1.2.0" es-abstract "^1.22.1" +oboe@^2.1.5: + version "2.1.5" + resolved "https://registry.yarnpkg.com/oboe/-/oboe-2.1.5.tgz#5554284c543a2266d7a38f17e073821fbde393cd" + integrity sha512-zRFWiF+FoicxEs3jNI/WYUrVEgA7DeET/InK0XQuudGHRg8iIob3cNPrJTKaz4004uaA9Pbe+Dwa8iluhjLZWA== + dependencies: + http-https "^1.0.0" + obuf@^1.0.0, obuf@^1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e" diff --git a/python/src/aiconfig/editor/server/queue_iterator.py b/python/src/aiconfig/editor/server/queue_iterator.py new file mode 100644 index 000000000..b4dc8d4fe --- /dev/null +++ b/python/src/aiconfig/editor/server/queue_iterator.py @@ -0,0 +1,44 @@ +"""Queue iterator for streaming. Can only process strings for now +but in future will try to make it generic""" +# import asyncio +from queue import Queue + +# from typing import Generic, TypeVar + +# # TODO: Add generic typing for queue items +# # (couldn't get sentinel value to work with generics) +# T = TypeVar('T') +STOP_STREAMING_SIGNAL = object() # sentinel value to indicate end of stream + + +class QueueIterator: + """In order to support text streaming, we need to store + the output in a queue and iterate over those values. A lot of this was + inspired by HuggingFace's TextIteratorStreamer object: + + I know I can just use a queue directly in the callsite with + `iter(queue.get, None)`, but having a class makes it easier to manage + and abstracts it a bit more. + """ + + def __init__(self): + self.q = Queue() + self.stop_signal = STOP_STREAMING_SIGNAL + self.timeout = None + + def __iter__(self): + return self + + def __next__(self): + value = self.q.get(block=True, timeout=self.timeout) + if value == self.stop_signal: + raise StopIteration() + return value + + def put(self, text: str, stream_end: bool = False): + self.q.put(text, timeout=self.timeout) + if stream_end: + self.q.put(self.stop_signal, timeout=self.timeout) + + def isEmpty(self) -> bool: + return self.q.empty() diff --git a/python/src/aiconfig/editor/server/server.py b/python/src/aiconfig/editor/server/server.py index be3600ae8..f9cd467f9 100644 --- a/python/src/aiconfig/editor/server/server.py +++ b/python/src/aiconfig/editor/server/server.py @@ -1,10 +1,16 @@ +import asyncio +import copy +import json import logging +import threading +import time import webbrowser from typing import Any, Dict, Type, Union import lastmile_utils.lib.core.api as core_utils import result from aiconfig.Config import AIConfigRuntime +from aiconfig.editor.server.queue_iterator import STOP_STREAMING_SIGNAL, QueueIterator from aiconfig.editor.server.server_utils import ( EditServerConfig, FlaskResponse, @@ -26,11 +32,11 @@ ) from aiconfig.model_parser import InferenceOptions from aiconfig.registry import ModelParserRegistry -from flask import Flask, request +from flask import Flask, Response, request, stream_with_context from flask_cors import CORS from result import Err, Ok, Result -from aiconfig.schema import Prompt +from aiconfig.schema import ExecuteResult, Output, Prompt logging.getLogger("werkzeug").disabled = True @@ -173,35 +179,152 @@ def create() -> FlaskResponse: @app.route("/api/run", methods=["POST"]) -async def run() -> FlaskResponse: +def run() -> FlaskResponse: + EXCLUDE_OPTIONS = { + "prompt_index": True, + "file_path": True, + "callback_manager": True, + } state = get_server_state(app) aiconfig = state.aiconfig request_json = request.get_json() + prompt_name: Union[str, None] = request_json.get("prompt_name") + if prompt_name is None: + return HttpResponseWithAIConfig( + message="No prompt name provided, cannot execute `run` command", + code=400, + aiconfig=None, + ).to_flask_format() + + # TODO (rossdanlm): Refactor aiconfig.run() to not take in `params` + # as a function arg since we can now just call + # aiconfig.get_parameters(prompt_name) directly inside of run. See: + # https://github.com/lastmile-ai/aiconfig/issues/671 + params = request_json.get("params", aiconfig.get_parameters(prompt_name)) # type: ignore + stream = request_json.get("stream", False) # TODO: set this automatically to True after client supports stream output + + # Define stream callback and queue object for streaming results + output_text_queue = QueueIterator() + + def update_output_queue(data, _accumulated_data, _index) -> None: + should_end_stream = data == STOP_STREAMING_SIGNAL + output_text_queue.put(data, should_end_stream) + + inference_options = InferenceOptions( + stream=stream, + stream_callback=update_output_queue, + ) + + def generate(): + # Use multi-threading so that we don't block run command from + # displaying the streamed output (if streaming is supported) + def run_async_config_in_thread(): + asyncio.run( + aiconfig.run( + prompt_name=prompt_name, + params=params, + run_with_dependencies=False, + options=inference_options, + ) + ) + output_text_queue.put(STOP_STREAMING_SIGNAL) + + t = threading.Thread(target=run_async_config_in_thread) + t.start() + + # Create a deep copy of the state aiconfig so we can yield an AIConfig + # with streaming partial outputs in the meantime. This probably isn't + # necessary, but just getting unblocked for now + displaying_config = copy.deepcopy(aiconfig) + + # If model supports streaming, need to wait until streamer has at + # least 1 item to display. If model does not support streaming, + # need to wait until the aiconfig.run() thread is complete + SLEEP_DELAY_SECONDS = 0.1 + wait_time_in_seconds = 0.0 + while output_text_queue.isEmpty() and t.is_alive(): + # Yea I know time.sleep() isn't super accurate, but it's fine, + # we can fix later + time.sleep(0.1) + wait_time_in_seconds += SLEEP_DELAY_SECONDS + print(f"Output queue is currently empty. Waiting for {wait_time_in_seconds:.1f}s...") + + # Yield in flask is weird and you either need to send responses as a + # string, or artificially wrap them around "[" and "]" + # yield "[" + if not output_text_queue.isEmpty(): + accumulated_output_text = "" + for text in output_text_queue: + if isinstance(text, str): + accumulated_output_text += text + elif isinstance(text, dict) and "content" in text: + # TODO: Fix streaming output format so that it returns text + accumulated_output_text += text["content"] + elif isinstance(text, dict) and "generated_text" in text: + # TODO: Fix streaming output format so that it returns text + accumulated_output_text += text["generated_text"] + + accumulated_output: Output = ExecuteResult( + **{ + "output_type": "execute_result", + "data": accumulated_output_text, + # Assume streaming only supports single output + # I think this actually may be wrong for PaLM or OpenAI + # TODO: Need to sync with Ankush but can fix forward + "execution_count": 0, + "metadata": {}, + } + ) + + displaying_config.add_output(prompt_name, accumulated_output, overwrite=True) + aiconfig_json = displaying_config.model_dump(exclude=EXCLUDE_OPTIONS) + yield "[" + yield json.dumps({"aiconfig": aiconfig_json}) + yield "]" + + # Ensure that the run process is complete to yield final output + t.join() + aiconfig_json = aiconfig.model_dump(exclude=EXCLUDE_OPTIONS) + yield "[" + yield json.dumps({"aiconfig": aiconfig_json}) + yield "]" + try: - prompt_name: Union[str, None] = request_json.get("prompt_name") - if prompt_name is None: - return HttpResponseWithAIConfig( - message="No prompt name provided, cannot execute `run` command", - code=400, - aiconfig=None, - ).to_flask_format() - - # TODO (rossdanlm): Refactor aiconfig.run() to not take in `params` - # as a function arg since we can now just call - # aiconfig.get_parameters(prompt_name) directly inside of run. See: - # https://github.com/lastmile-ai/aiconfig/issues/671 - params = request_json.get("params", aiconfig.get_parameters(prompt_name)) # type: ignore - stream = request_json.get("stream", False) - options = InferenceOptions(stream=stream) - run_output = await aiconfig.run(prompt_name, params, options) # type: ignore - LOGGER.debug(f"run_output: {run_output}") + if stream: + LOGGER.info(f"Running `aiconfig.run()` command with request: {request_json}") + # Streaming based on + # https://stackoverflow.com/questions/73275517/flask-not-streaming-json-response + return Response( + stream_with_context(generate()), + status=200, + content_type="application/json", + ) + + # Run without streaming + inference_options = InferenceOptions(stream=stream) + def run_async_config_in_thread(): + asyncio.run( + aiconfig.run( + prompt_name=prompt_name, + params=params, + run_with_dependencies=False, + options=inference_options, + ) + ) + output_text_queue.put(STOP_STREAMING_SIGNAL) + + t = threading.Thread(target=run_async_config_in_thread) + t.start() + LOGGER.info(f"Running `aiconfig.run()` command with request: {request_json}") + t.join() return HttpResponseWithAIConfig( # message="Ran prompt", code=200, aiconfig=aiconfig, ).to_flask_format() + except Exception as e: return HttpResponseWithAIConfig( #