Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming 'working' With Client-side Handling #684

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cookbooks/Getting-Started/getting_started.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@
"name": "python3"
},
"language_info": {
"name": "python"
"name": "python",
"version": "3.11.6"
}
},
"nbformat": 4,
Expand Down
2 changes: 2 additions & 0 deletions python/src/aiconfig/editor/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"aiconfig": "../../../../../typescript",
"lodash": "^4.17.21",
"node-fetch": "^3.3.2",
"oboe": "^2.1.5",
"react": "^18",
"react-dom": "^18",
"react-markdown": "^8.0.6",
Expand All @@ -48,6 +49,7 @@
"devDependencies": {
"@types/lodash": "^4.14.202",
"@types/node": "^20",
"@types/oboe": "^2.1.4",
"@types/react": "^18",
"@types/react-dom": "^18",
"@typescript-eslint/eslint-plugin": "^6.16.0",
Expand Down
21 changes: 20 additions & 1 deletion python/src/aiconfig/editor/client/src/Editor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AIConfig, ModelMetadata, Prompt } from "aiconfig";
import { useCallback, useEffect, useMemo, useState } from "react";
import { ufetch } from "ufetch";
import { ROUTE_TABLE } from "./utils/api";
import { streamingApi } from "./utils/oboeHelpers";

export default function Editor() {
const [aiconfig, setAiConfig] = useState<AIConfig | undefined>();
Expand Down Expand Up @@ -57,9 +58,27 @@ export default function Editor() {
}, []);

const runPrompt = useCallback(async (promptName: string) => {
return await ufetch.post(ROUTE_TABLE.RUN_PROMPT, {
const res = await ufetch.post("http://localhost:8080/api/test_streaming", {
prompt_name: promptName,
});
await streamingApi(
{
url: "http://localhost:8080/api/test_streaming",
method: "POST",
body: {
prompt_name: promptName,
},
},
"output_chunk",
(data) => {
console.log("output_chunk data: ", data);
},
"aiconfig",
(data) => {
console.log("aiconfig data: ", data);
}
);
return res;
}, []);

const updatePrompt = useCallback(
Expand Down
37 changes: 37 additions & 0 deletions python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import oboe, { Options } from "oboe";

// Promisify Oboe - similar to this: https://stackoverflow.com/questions/54855494/rewrite-fetch-call-to-oboe-for-json-streams-with-typescript
// Except it allows to use .node('*', fn) & only resolves on done
// See https://medium.com/@amberlamps84/oboe-js-mongodb-express-node-js-and-the-beauty-of-streams-4a90fad5414 on using oboe vs raw streams
// (multiple chunks can be sent in single response & we only want valid json ones)
export async function streamingApi<T>(
headers: Options,
on: string = "*",
fn: (data: any) => void,
on2?: string,
fn2?: (data: any) => void,
on3?: string,
fn3?: (data: any) => void
): Promise<T> {
return new Promise((resolve, reject) => {
if (fn2 && on2 && fn3 && on3) {
oboe(headers)
.node(on, fn)
.node(on2, fn2)
.node(on3, fn3)
.done((data) => resolve(data))
.fail((err) => reject(err.jsonBody));
} else if (fn2 && on2) {
oboe(headers)
.node(on, fn)
.node(on2, fn2)
.done((data) => resolve(data))
.fail((err) => reject(err.jsonBody));
} else {
oboe(headers)
.node(on, fn)
.done((data) => resolve(data))
.fail((err) => reject(err.jsonBody));
}
});
}
19 changes: 19 additions & 0 deletions python/src/aiconfig/editor/client/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2525,6 +2525,13 @@
dependencies:
undici-types "~5.26.4"

"@types/oboe@^2.1.4":
version "2.1.4"
resolved "https://registry.yarnpkg.com/@types/oboe/-/oboe-2.1.4.tgz#d92c4636d0b7737803e4361e10e8dad488f39634"
integrity sha512-bXt4BXSQy0N/buSIak1o0TjYAk2SAeK1aZV9xKcb+xVGWYP8NcMOFy2T7Um3kIvEcQJzrdgJ8R6fpbRcp/LEww==
dependencies:
"@types/node" "*"

"@types/parse-json@^4.0.0":
version "4.0.2"
resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.2.tgz#5950e50960793055845e956c427fc2b0d70c5239"
Expand Down Expand Up @@ -6069,6 +6076,11 @@ http-errors@~1.6.2:
setprototypeof "1.1.0"
statuses ">= 1.4.0 < 2"

http-https@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/http-https/-/http-https-1.0.0.tgz#2f908dd5f1db4068c058cd6e6d4ce392c913389b"
integrity sha512-o0PWwVCSp3O0wS6FvNr6xfBCHgt0m1tvPLFOCc2iFDKTRAXhB7m8klDf7ErowFH8POa6dVdGatKU5I1YYwzUyg==

http-parser-js@>=0.5.1:
version "0.5.8"
resolved "https://registry.yarnpkg.com/http-parser-js/-/http-parser-js-0.5.8.tgz#af23090d9ac4e24573de6f6aecc9d84a48bf20e3"
Expand Down Expand Up @@ -8510,6 +8522,13 @@ object.values@^1.1.0, object.values@^1.1.6, object.values@^1.1.7:
define-properties "^1.2.0"
es-abstract "^1.22.1"

oboe@^2.1.5:
version "2.1.5"
resolved "https://registry.yarnpkg.com/oboe/-/oboe-2.1.5.tgz#5554284c543a2266d7a38f17e073821fbde393cd"
integrity sha512-zRFWiF+FoicxEs3jNI/WYUrVEgA7DeET/InK0XQuudGHRg8iIob3cNPrJTKaz4004uaA9Pbe+Dwa8iluhjLZWA==
dependencies:
http-https "^1.0.0"

obuf@^1.0.0, obuf@^1.1.2:
version "1.1.2"
resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e"
Expand Down
82 changes: 80 additions & 2 deletions python/src/aiconfig/editor/server/server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import asyncio
import json
import logging
from typing import Any, Type
import threading

import lastmile_utils.lib.core.api as core_utils
import result
import time
from aiconfig.Config import AIConfigRuntime
from aiconfig.editor.server.server_utils import (
EditServerConfig,
Expand All @@ -25,11 +29,11 @@
)
from aiconfig.model_parser import InferenceOptions
from aiconfig.registry import ModelParserRegistry
from flask import Flask, request
from aiconfig.schema import ExecuteResult, Prompt
from flask import Flask, Response, request, stream_with_context
from flask_cors import CORS
from result import Err, Ok, Result

from aiconfig.schema import Prompt

logging.getLogger("werkzeug").disabled = True

Expand Down Expand Up @@ -161,6 +165,80 @@ def create() -> FlaskResponse:
return HttpResponseWithAIConfig(message=f"Failed to create AIConfig: {e}", code=400, aiconfig=None).to_flask_format()


@app.route("/api/test_streaming", methods=["POST"])
def test_streaming():
EXCLUDE_OPTIONS = {
"prompt_index": True,
"file_path": True,
"callback_manager": True,
}
state = get_server_state(app)
aiconfig = state.aiconfig
request_json = request.get_json()
num_stream_steps: int = request_json.get("num_stream_steps", 10)
prompt_name: str = request_json.get("prompt_name", "get_activities")
params = request_json.get("params", {})
print(f"{num_stream_steps=}")
print(f"{type(num_stream_steps)=}")

def generate(num_stream_steps: int):
aiconfig_json: str | None = None
prompt: Prompt = state.aiconfig.get_prompt(prompt_name)
def run_async_config_in_thread():
asyncio.run(aiconfig.run(
prompt_name=prompt_name,
params=params,
run_with_dependencies=False,
# options=inference_options,
)
)
# output_text_queue.put(STOP_STREAMING_SIGNAL)
t = threading.Thread(target=run_async_config_in_thread)
t.start()

yield "["
for i in range(num_stream_steps):
output = ExecuteResult(
output_type="execute_result",
execution_count=0,
data="Rossdan" + str(i + 1),
metadata={},
)
prompt.outputs = [output]
print(f"Done step {i+1}/{num_stream_steps}...")

aiconfig_json = aiconfig.model_dump(exclude=EXCLUDE_OPTIONS)

# print(f"{str(aiconfig_json)=}\n")
yield json.dumps({"output_chunk": output.model_dump()}) + ",\n"

# t.join()
# if aiconfig_json is None:
# aiconfig_json = aiconfig.model_dump(exclude=EXCLUDE_OPTIONS)
t.join()
aiconfig_json = aiconfig.model_dump(exclude=EXCLUDE_OPTIONS)
yield json.dumps({"aiconfig": aiconfig_json})
yield "]"

try:
LOGGER.info(f"Testing streaming: {request_json}")
# Stream based on https://stackoverflow.com/questions/73275517/flask-not-streaming-json-response
return Response(
stream_with_context(generate(num_stream_steps)),
status=200,
content_type="application/json",
)
# return generate(num_stream_steps) #, {"Content-Type": "application/json"}
except Exception as e:
err: Err[str] = core_utils.ErrWithTraceback(e)
LOGGER.error(f"Failed to test streaming: {err}")
return HttpResponseWithAIConfig(
message=f"Failed to test streaming: {err}",
code=400,
aiconfig=None,
).to_flask_format()


@app.route("/api/run", methods=["POST"])
async def run() -> FlaskResponse:
state = get_server_state(app)
Expand Down