diff --git a/cookbooks/Getting-Started/getting_started.ipynb b/cookbooks/Getting-Started/getting_started.ipynb index dcb134092..b6e7afded 100644 --- a/cookbooks/Getting-Started/getting_started.ipynb +++ b/cookbooks/Getting-Started/getting_started.ipynb @@ -391,7 +391,8 @@ "name": "python3" }, "language_info": { - "name": "python" + "name": "python", + "version": "3.11.6" } }, "nbformat": 4, diff --git a/python/src/aiconfig/editor/client/package.json b/python/src/aiconfig/editor/client/package.json index 79c13e97e..1411de9ce 100644 --- a/python/src/aiconfig/editor/client/package.json +++ b/python/src/aiconfig/editor/client/package.json @@ -37,6 +37,7 @@ "aiconfig": "../../../../../typescript", "lodash": "^4.17.21", "node-fetch": "^3.3.2", + "oboe": "^2.1.5", "react": "^18", "react-dom": "^18", "react-markdown": "^8.0.6", @@ -48,6 +49,7 @@ "devDependencies": { "@types/lodash": "^4.14.202", "@types/node": "^20", + "@types/oboe": "^2.1.4", "@types/react": "^18", "@types/react-dom": "^18", "@typescript-eslint/eslint-plugin": "^6.16.0", diff --git a/python/src/aiconfig/editor/client/src/Editor.tsx b/python/src/aiconfig/editor/client/src/Editor.tsx index ce9bd05ef..98e470912 100644 --- a/python/src/aiconfig/editor/client/src/Editor.tsx +++ b/python/src/aiconfig/editor/client/src/Editor.tsx @@ -6,6 +6,7 @@ import { AIConfig, ModelMetadata, Prompt } from "aiconfig"; import { useCallback, useEffect, useMemo, useState } from "react"; import { ufetch } from "ufetch"; import { ROUTE_TABLE } from "./utils/api"; +import { streamingApi } from "./utils/oboeHelpers"; export default function Editor() { const [aiconfig, setAiConfig] = useState(); @@ -57,9 +58,27 @@ export default function Editor() { }, []); const runPrompt = useCallback(async (promptName: string) => { - return await ufetch.post(ROUTE_TABLE.RUN_PROMPT, { + const res = await ufetch.post("http://localhost:8080/api/test_streaming", { prompt_name: promptName, }); + await streamingApi( + { + url: "http://localhost:8080/api/test_streaming", + method: "POST", + body: { + prompt_name: promptName, + }, + }, + "output_chunk", + (data) => { + console.log("output_chunk data: ", data); + }, + "aiconfig", + (data) => { + console.log("aiconfig data: ", data); + } + ); + return res; }, []); const updatePrompt = useCallback( diff --git a/python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts b/python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts new file mode 100644 index 000000000..6806b2cbb --- /dev/null +++ b/python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts @@ -0,0 +1,37 @@ +import oboe, { Options } from "oboe"; + +// Promisify Oboe - similar to this: https://stackoverflow.com/questions/54855494/rewrite-fetch-call-to-oboe-for-json-streams-with-typescript +// Except it allows to use .node('*', fn) & only resolves on done +// See https://medium.com/@amberlamps84/oboe-js-mongodb-express-node-js-and-the-beauty-of-streams-4a90fad5414 on using oboe vs raw streams +// (multiple chunks can be sent in single response & we only want valid json ones) +export async function streamingApi( + headers: Options, + on: string = "*", + fn: (data: any) => void, + on2?: string, + fn2?: (data: any) => void, + on3?: string, + fn3?: (data: any) => void +): Promise { + return new Promise((resolve, reject) => { + if (fn2 && on2 && fn3 && on3) { + oboe(headers) + .node(on, fn) + .node(on2, fn2) + .node(on3, fn3) + .done((data) => resolve(data)) + .fail((err) => reject(err.jsonBody)); + } else if (fn2 && on2) { + oboe(headers) + .node(on, fn) + .node(on2, fn2) + .done((data) => resolve(data)) + .fail((err) => reject(err.jsonBody)); + } else { + oboe(headers) + .node(on, fn) + .done((data) => resolve(data)) + .fail((err) => reject(err.jsonBody)); + } + }); +} diff --git a/python/src/aiconfig/editor/client/yarn.lock b/python/src/aiconfig/editor/client/yarn.lock index 6035a2bb8..69844c18f 100644 --- a/python/src/aiconfig/editor/client/yarn.lock +++ b/python/src/aiconfig/editor/client/yarn.lock @@ -2525,6 +2525,13 @@ dependencies: undici-types "~5.26.4" +"@types/oboe@^2.1.4": + version "2.1.4" + resolved "https://registry.yarnpkg.com/@types/oboe/-/oboe-2.1.4.tgz#d92c4636d0b7737803e4361e10e8dad488f39634" + integrity sha512-bXt4BXSQy0N/buSIak1o0TjYAk2SAeK1aZV9xKcb+xVGWYP8NcMOFy2T7Um3kIvEcQJzrdgJ8R6fpbRcp/LEww== + dependencies: + "@types/node" "*" + "@types/parse-json@^4.0.0": version "4.0.2" resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.2.tgz#5950e50960793055845e956c427fc2b0d70c5239" @@ -6069,6 +6076,11 @@ http-errors@~1.6.2: setprototypeof "1.1.0" statuses ">= 1.4.0 < 2" +http-https@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/http-https/-/http-https-1.0.0.tgz#2f908dd5f1db4068c058cd6e6d4ce392c913389b" + integrity sha512-o0PWwVCSp3O0wS6FvNr6xfBCHgt0m1tvPLFOCc2iFDKTRAXhB7m8klDf7ErowFH8POa6dVdGatKU5I1YYwzUyg== + http-parser-js@>=0.5.1: version "0.5.8" resolved "https://registry.yarnpkg.com/http-parser-js/-/http-parser-js-0.5.8.tgz#af23090d9ac4e24573de6f6aecc9d84a48bf20e3" @@ -8510,6 +8522,13 @@ object.values@^1.1.0, object.values@^1.1.6, object.values@^1.1.7: define-properties "^1.2.0" es-abstract "^1.22.1" +oboe@^2.1.5: + version "2.1.5" + resolved "https://registry.yarnpkg.com/oboe/-/oboe-2.1.5.tgz#5554284c543a2266d7a38f17e073821fbde393cd" + integrity sha512-zRFWiF+FoicxEs3jNI/WYUrVEgA7DeET/InK0XQuudGHRg8iIob3cNPrJTKaz4004uaA9Pbe+Dwa8iluhjLZWA== + dependencies: + http-https "^1.0.0" + obuf@^1.0.0, obuf@^1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e" diff --git a/python/src/aiconfig/editor/server/server.py b/python/src/aiconfig/editor/server/server.py index 12b9bc35e..6cb411f36 100644 --- a/python/src/aiconfig/editor/server/server.py +++ b/python/src/aiconfig/editor/server/server.py @@ -1,8 +1,12 @@ +import asyncio +import json import logging from typing import Any, Type +import threading import lastmile_utils.lib.core.api as core_utils import result +import time from aiconfig.Config import AIConfigRuntime from aiconfig.editor.server.server_utils import ( EditServerConfig, @@ -25,11 +29,11 @@ ) from aiconfig.model_parser import InferenceOptions from aiconfig.registry import ModelParserRegistry -from flask import Flask, request +from aiconfig.schema import ExecuteResult, Prompt +from flask import Flask, Response, request, stream_with_context from flask_cors import CORS from result import Err, Ok, Result -from aiconfig.schema import Prompt logging.getLogger("werkzeug").disabled = True @@ -161,6 +165,80 @@ def create() -> FlaskResponse: return HttpResponseWithAIConfig(message=f"Failed to create AIConfig: {e}", code=400, aiconfig=None).to_flask_format() +@app.route("/api/test_streaming", methods=["POST"]) +def test_streaming(): + EXCLUDE_OPTIONS = { + "prompt_index": True, + "file_path": True, + "callback_manager": True, + } + state = get_server_state(app) + aiconfig = state.aiconfig + request_json = request.get_json() + num_stream_steps: int = request_json.get("num_stream_steps", 10) + prompt_name: str = request_json.get("prompt_name", "get_activities") + params = request_json.get("params", {}) + print(f"{num_stream_steps=}") + print(f"{type(num_stream_steps)=}") + + def generate(num_stream_steps: int): + aiconfig_json: str | None = None + prompt: Prompt = state.aiconfig.get_prompt(prompt_name) + def run_async_config_in_thread(): + asyncio.run(aiconfig.run( + prompt_name=prompt_name, + params=params, + run_with_dependencies=False, + # options=inference_options, + ) + ) + # output_text_queue.put(STOP_STREAMING_SIGNAL) + t = threading.Thread(target=run_async_config_in_thread) + t.start() + + yield "[" + for i in range(num_stream_steps): + output = ExecuteResult( + output_type="execute_result", + execution_count=0, + data="Rossdan" + str(i + 1), + metadata={}, + ) + prompt.outputs = [output] + print(f"Done step {i+1}/{num_stream_steps}...") + + aiconfig_json = aiconfig.model_dump(exclude=EXCLUDE_OPTIONS) + + # print(f"{str(aiconfig_json)=}\n") + yield json.dumps({"output_chunk": output.model_dump()}) + ",\n" + + # t.join() + # if aiconfig_json is None: + # aiconfig_json = aiconfig.model_dump(exclude=EXCLUDE_OPTIONS) + t.join() + aiconfig_json = aiconfig.model_dump(exclude=EXCLUDE_OPTIONS) + yield json.dumps({"aiconfig": aiconfig_json}) + yield "]" + + try: + LOGGER.info(f"Testing streaming: {request_json}") + # Stream based on https://stackoverflow.com/questions/73275517/flask-not-streaming-json-response + return Response( + stream_with_context(generate(num_stream_steps)), + status=200, + content_type="application/json", + ) + # return generate(num_stream_steps) #, {"Content-Type": "application/json"} + except Exception as e: + err: Err[str] = core_utils.ErrWithTraceback(e) + LOGGER.error(f"Failed to test streaming: {err}") + return HttpResponseWithAIConfig( + message=f"Failed to test streaming: {err}", + code=400, + aiconfig=None, + ).to_flask_format() + + @app.route("/api/run", methods=["POST"]) async def run() -> FlaskResponse: state = get_server_state(app)