diff --git a/python/src/aiconfig/editor/client/package.json b/python/src/aiconfig/editor/client/package.json index 79c13e97e..1411de9ce 100644 --- a/python/src/aiconfig/editor/client/package.json +++ b/python/src/aiconfig/editor/client/package.json @@ -37,6 +37,7 @@ "aiconfig": "../../../../../typescript", "lodash": "^4.17.21", "node-fetch": "^3.3.2", + "oboe": "^2.1.5", "react": "^18", "react-dom": "^18", "react-markdown": "^8.0.6", @@ -48,6 +49,7 @@ "devDependencies": { "@types/lodash": "^4.14.202", "@types/node": "^20", + "@types/oboe": "^2.1.4", "@types/react": "^18", "@types/react-dom": "^18", "@typescript-eslint/eslint-plugin": "^6.16.0", diff --git a/python/src/aiconfig/editor/client/src/Editor.tsx b/python/src/aiconfig/editor/client/src/Editor.tsx index ce9bd05ef..55b5d1ebb 100644 --- a/python/src/aiconfig/editor/client/src/Editor.tsx +++ b/python/src/aiconfig/editor/client/src/Editor.tsx @@ -6,6 +6,7 @@ import { AIConfig, ModelMetadata, Prompt } from "aiconfig"; import { useCallback, useEffect, useMemo, useState } from "react"; import { ufetch } from "ufetch"; import { ROUTE_TABLE } from "./utils/api"; +import { streamingApi } from "./utils/oboeHelpers"; export default function Editor() { const [aiconfig, setAiConfig] = useState(); @@ -57,9 +58,27 @@ export default function Editor() { }, []); const runPrompt = useCallback(async (promptName: string) => { - return await ufetch.post(ROUTE_TABLE.RUN_PROMPT, { + const res = await ufetch.post("http://localhost:8080/api/run", { prompt_name: promptName, }); + await streamingApi( + { + url: "http://localhost:8080/api/run", + method: "POST", + body: { + prompt_name: promptName, + }, + }, + "output_chunk", + (data) => { + console.log("output_chunk data: ", data); + }, + "aiconfig", + (data) => { + console.log("aiconfig data: ", data); + } + ); + return res; }, []); const updatePrompt = useCallback( diff --git a/python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts b/python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts new file mode 100644 index 000000000..6806b2cbb --- /dev/null +++ b/python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts @@ -0,0 +1,37 @@ +import oboe, { Options } from "oboe"; + +// Promisify Oboe - similar to this: https://stackoverflow.com/questions/54855494/rewrite-fetch-call-to-oboe-for-json-streams-with-typescript +// Except it allows to use .node('*', fn) & only resolves on done +// See https://medium.com/@amberlamps84/oboe-js-mongodb-express-node-js-and-the-beauty-of-streams-4a90fad5414 on using oboe vs raw streams +// (multiple chunks can be sent in single response & we only want valid json ones) +export async function streamingApi( + headers: Options, + on: string = "*", + fn: (data: any) => void, + on2?: string, + fn2?: (data: any) => void, + on3?: string, + fn3?: (data: any) => void +): Promise { + return new Promise((resolve, reject) => { + if (fn2 && on2 && fn3 && on3) { + oboe(headers) + .node(on, fn) + .node(on2, fn2) + .node(on3, fn3) + .done((data) => resolve(data)) + .fail((err) => reject(err.jsonBody)); + } else if (fn2 && on2) { + oboe(headers) + .node(on, fn) + .node(on2, fn2) + .done((data) => resolve(data)) + .fail((err) => reject(err.jsonBody)); + } else { + oboe(headers) + .node(on, fn) + .done((data) => resolve(data)) + .fail((err) => reject(err.jsonBody)); + } + }); +} diff --git a/python/src/aiconfig/editor/client/yarn.lock b/python/src/aiconfig/editor/client/yarn.lock index 6035a2bb8..69844c18f 100644 --- a/python/src/aiconfig/editor/client/yarn.lock +++ b/python/src/aiconfig/editor/client/yarn.lock @@ -2525,6 +2525,13 @@ dependencies: undici-types "~5.26.4" +"@types/oboe@^2.1.4": + version "2.1.4" + resolved "https://registry.yarnpkg.com/@types/oboe/-/oboe-2.1.4.tgz#d92c4636d0b7737803e4361e10e8dad488f39634" + integrity sha512-bXt4BXSQy0N/buSIak1o0TjYAk2SAeK1aZV9xKcb+xVGWYP8NcMOFy2T7Um3kIvEcQJzrdgJ8R6fpbRcp/LEww== + dependencies: + "@types/node" "*" + "@types/parse-json@^4.0.0": version "4.0.2" resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.2.tgz#5950e50960793055845e956c427fc2b0d70c5239" @@ -6069,6 +6076,11 @@ http-errors@~1.6.2: setprototypeof "1.1.0" statuses ">= 1.4.0 < 2" +http-https@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/http-https/-/http-https-1.0.0.tgz#2f908dd5f1db4068c058cd6e6d4ce392c913389b" + integrity sha512-o0PWwVCSp3O0wS6FvNr6xfBCHgt0m1tvPLFOCc2iFDKTRAXhB7m8klDf7ErowFH8POa6dVdGatKU5I1YYwzUyg== + http-parser-js@>=0.5.1: version "0.5.8" resolved "https://registry.yarnpkg.com/http-parser-js/-/http-parser-js-0.5.8.tgz#af23090d9ac4e24573de6f6aecc9d84a48bf20e3" @@ -8510,6 +8522,13 @@ object.values@^1.1.0, object.values@^1.1.6, object.values@^1.1.7: define-properties "^1.2.0" es-abstract "^1.22.1" +oboe@^2.1.5: + version "2.1.5" + resolved "https://registry.yarnpkg.com/oboe/-/oboe-2.1.5.tgz#5554284c543a2266d7a38f17e073821fbde393cd" + integrity sha512-zRFWiF+FoicxEs3jNI/WYUrVEgA7DeET/InK0XQuudGHRg8iIob3cNPrJTKaz4004uaA9Pbe+Dwa8iluhjLZWA== + dependencies: + http-https "^1.0.0" + obuf@^1.0.0, obuf@^1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e" diff --git a/python/src/aiconfig/editor/server/queue_iterator.py b/python/src/aiconfig/editor/server/queue_iterator.py new file mode 100644 index 000000000..22389e7a5 --- /dev/null +++ b/python/src/aiconfig/editor/server/queue_iterator.py @@ -0,0 +1,41 @@ +"""Queue iterator for streaming. Can only process strings for now +but in future will try to make it generic""" +# import asyncio +from queue import Queue +# from typing import Generic, TypeVar + +# # TODO: Add generic typing for queue items +# # (couldn't get sentinel value to work with generics) +# T = TypeVar('T') +STOP_STREAMING_SIGNAL = object() #sentinel value to indicate end of stream + +class QueueIterator(): + """In order to support text streaming, we need to store + the output in a queue and iterate over those values. A lot of this was + inspired by HuggingFace's TextIteratorStreamer object: + + I know I can just use a queue directly in the callsite with + `iter(queue.get, None)`, but having a class makes it easier to manage + and abstracts it a bit more. + """ + def __init__(self): + self.q = Queue() + self.stop_signal = STOP_STREAMING_SIGNAL + self.timeout = None + + def __iter__(self): + return self + + def __next__(self): + value = self.q.get(block=True, timeout=self.timeout) + if value == self.stop_signal: + raise StopIteration() + return value + + def put(self, text: str, stream_end: bool = False): + self.q.put(text, timeout=self.timeout) + if stream_end: + self.q.put(self.stop_signal, timeout=self.timeout) + + def isEmpty(self) -> bool: + return self.q.empty() diff --git a/python/src/aiconfig/editor/server/server.py b/python/src/aiconfig/editor/server/server.py index 12b9bc35e..d33fe68c5 100644 --- a/python/src/aiconfig/editor/server/server.py +++ b/python/src/aiconfig/editor/server/server.py @@ -1,8 +1,13 @@ +import asyncio +import copy +import json import logging from typing import Any, Type import lastmile_utils.lib.core.api as core_utils import result +import threading +import time from aiconfig.Config import AIConfigRuntime from aiconfig.editor.server.server_utils import ( EditServerConfig, @@ -23,13 +28,17 @@ safe_load_from_disk, safe_run_aiconfig_static_method, ) +from aiconfig.editor.server.queue_iterator import ( + QueueIterator, + STOP_STREAMING_SIGNAL, +) from aiconfig.model_parser import InferenceOptions from aiconfig.registry import ModelParserRegistry -from flask import Flask, request +from aiconfig.schema import ExecuteResult, Prompt, Output +from flask import Flask, Response, request, stream_with_context from flask_cors import CORS from result import Err, Ok, Result -from aiconfig.schema import Prompt logging.getLogger("werkzeug").disabled = True @@ -162,24 +171,103 @@ def create() -> FlaskResponse: @app.route("/api/run", methods=["POST"]) -async def run() -> FlaskResponse: +def run(): + EXCLUDE_OPTIONS = { + "prompt_index": True, + "file_path": True, + "callback_manager": True, + } state = get_server_state(app) aiconfig = state.aiconfig request_json = request.get_json() + prompt_name: str = request_json.get("prompt_name", "get_activities") # Hard-coded + params = request_json.get("params", {}) #Fixed in https://github.com/lastmile-ai/aiconfig/pull/668 + stream = request_json.get("stream", True) + + # Define stream callback and queue object for streaming results + output_text_queue = QueueIterator() + def update_output_queue(data, _accumulated_data, _index) -> None: + should_end_stream = data == STOP_STREAMING_SIGNAL + output_text_queue.put(data, should_end_stream) + inference_options = InferenceOptions( + stream=stream, + stream_callback=update_output_queue, + ) + + def generate(): + # Use multi-threading so that we don't block run command from + # displaying the streamed output (if streaming is supported) + def run_async_config_in_thread(): + asyncio.run(aiconfig.run( + prompt_name=prompt_name, + params=params, + run_with_dependencies=False, + options=inference_options, + ) + ) + output_text_queue.put(STOP_STREAMING_SIGNAL) + t = threading.Thread(target=run_async_config_in_thread) + t.start() + + # Create a deep copy of the state aiconfig so we can yield an AIConfig + # with streaming partial outputs in the meantime. This probably isn't + # necessary, but just getting unblocked for now + displaying_config = copy.deepcopy(aiconfig) + + # If model supports streaming, need to wait until streamer has at + # least 1 item to display. If model does not support streaming, + # need to wait until the aiconfig.run() thread is complete + SLEEP_DELAY_SECONDS = 0.1 + wait_time_in_seconds = 0.0 + # If + while output_text_queue.isEmpty() and t.is_alive(): + # Yea I know time.sleep() isn't super accurate, but it's fine, + # we can fix later + time.sleep(0.1) + wait_time_in_seconds += SLEEP_DELAY_SECONDS + print(f"Output queue is currently empty. Waiting for {wait_time_in_seconds:.1f}s...") + + yield "[" + if not output_text_queue.isEmpty(): + accumulated_output_text = "" + for text in output_text_queue: + if isinstance(text, str): + accumulated_output_text += text + elif isinstance(text, dict) and "content" in text: + # TODO: Fix streaming output format so that it returns text + accumulated_output_text += text["content"] + + accumulated_output : Output = ExecuteResult( + **{ + "output_type": "execute_result", + "data": accumulated_output_text, + # Assume streaming only supports single output + # I think this actually may be wrong for PaLM or OpenAI + # TODO: Need to sync with Ankush but can fix forward + "execution_count": 0, + "metadata": {}, + } + ) + + displaying_config.add_output(prompt_name, accumulated_output, overwrite=True) + aiconfig_json = displaying_config.model_dump(exclude=EXCLUDE_OPTIONS) + yield json.dumps({"aiconfig": aiconfig_json}) + + # Ensure that the run process is complete to yield final output + t.join() + aiconfig_json = aiconfig.model_dump(exclude=EXCLUDE_OPTIONS) + yield json.dumps({"aiconfig": aiconfig_json}) + yield "]" try: - prompt_name = request_json["prompt_name"] - params = request_json.get("params", {}) - stream = request_json.get("stream", False) - options = InferenceOptions(stream=stream) - run_output = await aiconfig.run(prompt_name, params, options) # type: ignore - LOGGER.debug(f"run_output: {run_output}") - return HttpResponseWithAIConfig( - # - message="Ran prompt", - code=200, - aiconfig=aiconfig, - ).to_flask_format() + LOGGER.info(f"Running `aiconfig.run()` command with request: {request_json}") + # Streaming based on + # https://stackoverflow.com/questions/73275517/flask-not-streaming-json-response + return Response( + stream_with_context(generate()), + status=200, + content_type="application/json", + ) except Exception as e: return HttpResponseWithAIConfig( #