diff --git a/python/src/aiconfig/editor/client/package.json b/python/src/aiconfig/editor/client/package.json index 093faab55..5b360f21f 100644 --- a/python/src/aiconfig/editor/client/package.json +++ b/python/src/aiconfig/editor/client/package.json @@ -38,6 +38,7 @@ "aiconfig": "../../../../../typescript", "lodash": "^4.17.21", "node-fetch": "^3.3.2", + "oboe": "^2.1.5", "react": "^18", "react-dom": "^18", "react-markdown": "^8.0.6", @@ -49,6 +50,7 @@ "devDependencies": { "@types/lodash": "^4.14.202", "@types/node": "^20", + "@types/oboe": "^2.1.4", "@types/react": "^18", "@types/react-dom": "^18", "@typescript-eslint/eslint-plugin": "^6.16.0", diff --git a/python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts b/python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts new file mode 100644 index 000000000..6806b2cbb --- /dev/null +++ b/python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts @@ -0,0 +1,37 @@ +import oboe, { Options } from "oboe"; + +// Promisify Oboe - similar to this: https://stackoverflow.com/questions/54855494/rewrite-fetch-call-to-oboe-for-json-streams-with-typescript +// Except it allows to use .node('*', fn) & only resolves on done +// See https://medium.com/@amberlamps84/oboe-js-mongodb-express-node-js-and-the-beauty-of-streams-4a90fad5414 on using oboe vs raw streams +// (multiple chunks can be sent in single response & we only want valid json ones) +export async function streamingApi( + headers: Options, + on: string = "*", + fn: (data: any) => void, + on2?: string, + fn2?: (data: any) => void, + on3?: string, + fn3?: (data: any) => void +): Promise { + return new Promise((resolve, reject) => { + if (fn2 && on2 && fn3 && on3) { + oboe(headers) + .node(on, fn) + .node(on2, fn2) + .node(on3, fn3) + .done((data) => resolve(data)) + .fail((err) => reject(err.jsonBody)); + } else if (fn2 && on2) { + oboe(headers) + .node(on, fn) + .node(on2, fn2) + .done((data) => resolve(data)) + .fail((err) => reject(err.jsonBody)); + } else { + oboe(headers) + .node(on, fn) + .done((data) => resolve(data)) + .fail((err) => reject(err.jsonBody)); + } + }); +} diff --git a/python/src/aiconfig/editor/client/yarn.lock b/python/src/aiconfig/editor/client/yarn.lock index ea32e950c..f766e5e80 100644 --- a/python/src/aiconfig/editor/client/yarn.lock +++ b/python/src/aiconfig/editor/client/yarn.lock @@ -2539,6 +2539,13 @@ dependencies: undici-types "~5.26.4" +"@types/oboe@^2.1.4": + version "2.1.4" + resolved "https://registry.yarnpkg.com/@types/oboe/-/oboe-2.1.4.tgz#d92c4636d0b7737803e4361e10e8dad488f39634" + integrity sha512-bXt4BXSQy0N/buSIak1o0TjYAk2SAeK1aZV9xKcb+xVGWYP8NcMOFy2T7Um3kIvEcQJzrdgJ8R6fpbRcp/LEww== + dependencies: + "@types/node" "*" + "@types/parse-json@^4.0.0": version "4.0.2" resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.2.tgz#5950e50960793055845e956c427fc2b0d70c5239" @@ -6083,6 +6090,11 @@ http-errors@~1.6.2: setprototypeof "1.1.0" statuses ">= 1.4.0 < 2" +http-https@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/http-https/-/http-https-1.0.0.tgz#2f908dd5f1db4068c058cd6e6d4ce392c913389b" + integrity sha512-o0PWwVCSp3O0wS6FvNr6xfBCHgt0m1tvPLFOCc2iFDKTRAXhB7m8klDf7ErowFH8POa6dVdGatKU5I1YYwzUyg== + http-parser-js@>=0.5.1: version "0.5.8" resolved "https://registry.yarnpkg.com/http-parser-js/-/http-parser-js-0.5.8.tgz#af23090d9ac4e24573de6f6aecc9d84a48bf20e3" @@ -8524,6 +8536,13 @@ object.values@^1.1.0, object.values@^1.1.6, object.values@^1.1.7: define-properties "^1.2.0" es-abstract "^1.22.1" +oboe@^2.1.5: + version "2.1.5" + resolved "https://registry.yarnpkg.com/oboe/-/oboe-2.1.5.tgz#5554284c543a2266d7a38f17e073821fbde393cd" + integrity sha512-zRFWiF+FoicxEs3jNI/WYUrVEgA7DeET/InK0XQuudGHRg8iIob3cNPrJTKaz4004uaA9Pbe+Dwa8iluhjLZWA== + dependencies: + http-https "^1.0.0" + obuf@^1.0.0, obuf@^1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e" diff --git a/python/src/aiconfig/editor/server/queue_iterator.py b/python/src/aiconfig/editor/server/queue_iterator.py new file mode 100644 index 000000000..b4dc8d4fe --- /dev/null +++ b/python/src/aiconfig/editor/server/queue_iterator.py @@ -0,0 +1,44 @@ +"""Queue iterator for streaming. Can only process strings for now +but in future will try to make it generic""" +# import asyncio +from queue import Queue + +# from typing import Generic, TypeVar + +# # TODO: Add generic typing for queue items +# # (couldn't get sentinel value to work with generics) +# T = TypeVar('T') +STOP_STREAMING_SIGNAL = object() # sentinel value to indicate end of stream + + +class QueueIterator: + """In order to support text streaming, we need to store + the output in a queue and iterate over those values. A lot of this was + inspired by HuggingFace's TextIteratorStreamer object: + + I know I can just use a queue directly in the callsite with + `iter(queue.get, None)`, but having a class makes it easier to manage + and abstracts it a bit more. + """ + + def __init__(self): + self.q = Queue() + self.stop_signal = STOP_STREAMING_SIGNAL + self.timeout = None + + def __iter__(self): + return self + + def __next__(self): + value = self.q.get(block=True, timeout=self.timeout) + if value == self.stop_signal: + raise StopIteration() + return value + + def put(self, text: str, stream_end: bool = False): + self.q.put(text, timeout=self.timeout) + if stream_end: + self.q.put(self.stop_signal, timeout=self.timeout) + + def isEmpty(self) -> bool: + return self.q.empty() diff --git a/python/src/aiconfig/editor/server/server.py b/python/src/aiconfig/editor/server/server.py index be3600ae8..f9cd467f9 100644 --- a/python/src/aiconfig/editor/server/server.py +++ b/python/src/aiconfig/editor/server/server.py @@ -1,10 +1,16 @@ +import asyncio +import copy +import json import logging +import threading +import time import webbrowser from typing import Any, Dict, Type, Union import lastmile_utils.lib.core.api as core_utils import result from aiconfig.Config import AIConfigRuntime +from aiconfig.editor.server.queue_iterator import STOP_STREAMING_SIGNAL, QueueIterator from aiconfig.editor.server.server_utils import ( EditServerConfig, FlaskResponse, @@ -26,11 +32,11 @@ ) from aiconfig.model_parser import InferenceOptions from aiconfig.registry import ModelParserRegistry -from flask import Flask, request +from flask import Flask, Response, request, stream_with_context from flask_cors import CORS from result import Err, Ok, Result -from aiconfig.schema import Prompt +from aiconfig.schema import ExecuteResult, Output, Prompt logging.getLogger("werkzeug").disabled = True @@ -173,35 +179,152 @@ def create() -> FlaskResponse: @app.route("/api/run", methods=["POST"]) -async def run() -> FlaskResponse: +def run() -> FlaskResponse: + EXCLUDE_OPTIONS = { + "prompt_index": True, + "file_path": True, + "callback_manager": True, + } state = get_server_state(app) aiconfig = state.aiconfig request_json = request.get_json() + prompt_name: Union[str, None] = request_json.get("prompt_name") + if prompt_name is None: + return HttpResponseWithAIConfig( + message="No prompt name provided, cannot execute `run` command", + code=400, + aiconfig=None, + ).to_flask_format() + + # TODO (rossdanlm): Refactor aiconfig.run() to not take in `params` + # as a function arg since we can now just call + # aiconfig.get_parameters(prompt_name) directly inside of run. See: + # https://github.com/lastmile-ai/aiconfig/issues/671 + params = request_json.get("params", aiconfig.get_parameters(prompt_name)) # type: ignore + stream = request_json.get("stream", False) # TODO: set this automatically to True after client supports stream output + + # Define stream callback and queue object for streaming results + output_text_queue = QueueIterator() + + def update_output_queue(data, _accumulated_data, _index) -> None: + should_end_stream = data == STOP_STREAMING_SIGNAL + output_text_queue.put(data, should_end_stream) + + inference_options = InferenceOptions( + stream=stream, + stream_callback=update_output_queue, + ) + + def generate(): + # Use multi-threading so that we don't block run command from + # displaying the streamed output (if streaming is supported) + def run_async_config_in_thread(): + asyncio.run( + aiconfig.run( + prompt_name=prompt_name, + params=params, + run_with_dependencies=False, + options=inference_options, + ) + ) + output_text_queue.put(STOP_STREAMING_SIGNAL) + + t = threading.Thread(target=run_async_config_in_thread) + t.start() + + # Create a deep copy of the state aiconfig so we can yield an AIConfig + # with streaming partial outputs in the meantime. This probably isn't + # necessary, but just getting unblocked for now + displaying_config = copy.deepcopy(aiconfig) + + # If model supports streaming, need to wait until streamer has at + # least 1 item to display. If model does not support streaming, + # need to wait until the aiconfig.run() thread is complete + SLEEP_DELAY_SECONDS = 0.1 + wait_time_in_seconds = 0.0 + while output_text_queue.isEmpty() and t.is_alive(): + # Yea I know time.sleep() isn't super accurate, but it's fine, + # we can fix later + time.sleep(0.1) + wait_time_in_seconds += SLEEP_DELAY_SECONDS + print(f"Output queue is currently empty. Waiting for {wait_time_in_seconds:.1f}s...") + + # Yield in flask is weird and you either need to send responses as a + # string, or artificially wrap them around "[" and "]" + # yield "[" + if not output_text_queue.isEmpty(): + accumulated_output_text = "" + for text in output_text_queue: + if isinstance(text, str): + accumulated_output_text += text + elif isinstance(text, dict) and "content" in text: + # TODO: Fix streaming output format so that it returns text + accumulated_output_text += text["content"] + elif isinstance(text, dict) and "generated_text" in text: + # TODO: Fix streaming output format so that it returns text + accumulated_output_text += text["generated_text"] + + accumulated_output: Output = ExecuteResult( + **{ + "output_type": "execute_result", + "data": accumulated_output_text, + # Assume streaming only supports single output + # I think this actually may be wrong for PaLM or OpenAI + # TODO: Need to sync with Ankush but can fix forward + "execution_count": 0, + "metadata": {}, + } + ) + + displaying_config.add_output(prompt_name, accumulated_output, overwrite=True) + aiconfig_json = displaying_config.model_dump(exclude=EXCLUDE_OPTIONS) + yield "[" + yield json.dumps({"aiconfig": aiconfig_json}) + yield "]" + + # Ensure that the run process is complete to yield final output + t.join() + aiconfig_json = aiconfig.model_dump(exclude=EXCLUDE_OPTIONS) + yield "[" + yield json.dumps({"aiconfig": aiconfig_json}) + yield "]" + try: - prompt_name: Union[str, None] = request_json.get("prompt_name") - if prompt_name is None: - return HttpResponseWithAIConfig( - message="No prompt name provided, cannot execute `run` command", - code=400, - aiconfig=None, - ).to_flask_format() - - # TODO (rossdanlm): Refactor aiconfig.run() to not take in `params` - # as a function arg since we can now just call - # aiconfig.get_parameters(prompt_name) directly inside of run. See: - # https://github.com/lastmile-ai/aiconfig/issues/671 - params = request_json.get("params", aiconfig.get_parameters(prompt_name)) # type: ignore - stream = request_json.get("stream", False) - options = InferenceOptions(stream=stream) - run_output = await aiconfig.run(prompt_name, params, options) # type: ignore - LOGGER.debug(f"run_output: {run_output}") + if stream: + LOGGER.info(f"Running `aiconfig.run()` command with request: {request_json}") + # Streaming based on + # https://stackoverflow.com/questions/73275517/flask-not-streaming-json-response + return Response( + stream_with_context(generate()), + status=200, + content_type="application/json", + ) + + # Run without streaming + inference_options = InferenceOptions(stream=stream) + def run_async_config_in_thread(): + asyncio.run( + aiconfig.run( + prompt_name=prompt_name, + params=params, + run_with_dependencies=False, + options=inference_options, + ) + ) + output_text_queue.put(STOP_STREAMING_SIGNAL) + + t = threading.Thread(target=run_async_config_in_thread) + t.start() + LOGGER.info(f"Running `aiconfig.run()` command with request: {request_json}") + t.join() return HttpResponseWithAIConfig( # message="Ran prompt", code=200, aiconfig=aiconfig, ).to_flask_format() + except Exception as e: return HttpResponseWithAIConfig( #