Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get basic streaming to work for run #683

Merged
merged 1 commit into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/src/aiconfig/editor/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"aiconfig": "../../../../../typescript",
"lodash": "^4.17.21",
"node-fetch": "^3.3.2",
"oboe": "^2.1.5",
"react": "^18",
"react-dom": "^18",
"react-markdown": "^8.0.6",
Expand All @@ -49,6 +50,7 @@
"devDependencies": {
"@types/lodash": "^4.14.202",
"@types/node": "^20",
"@types/oboe": "^2.1.4",
"@types/react": "^18",
"@types/react-dom": "^18",
"@typescript-eslint/eslint-plugin": "^6.16.0",
Expand Down
37 changes: 37 additions & 0 deletions python/src/aiconfig/editor/client/src/utils/oboeHelpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import oboe, { Options } from "oboe";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't adding functionality to this pr right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was needed I think on client for Ryan to process. I kept it here from his diff in #651

cc @rholinshead will let you keep or delete moving forward


// Promisify Oboe - similar to this: https://stackoverflow.com/questions/54855494/rewrite-fetch-call-to-oboe-for-json-streams-with-typescript
// Except it allows to use .node('*', fn) & only resolves on done
// See https://medium.com/@amberlamps84/oboe-js-mongodb-express-node-js-and-the-beauty-of-streams-4a90fad5414 on using oboe vs raw streams
// (multiple chunks can be sent in single response & we only want valid json ones)
export async function streamingApi<T>(
headers: Options,
on: string = "*",
fn: (data: any) => void,
on2?: string,
fn2?: (data: any) => void,
on3?: string,
fn3?: (data: any) => void
): Promise<T> {
return new Promise((resolve, reject) => {
if (fn2 && on2 && fn3 && on3) {
oboe(headers)
.node(on, fn)
.node(on2, fn2)
.node(on3, fn3)
.done((data) => resolve(data))
.fail((err) => reject(err.jsonBody));
} else if (fn2 && on2) {
oboe(headers)
.node(on, fn)
.node(on2, fn2)
.done((data) => resolve(data))
.fail((err) => reject(err.jsonBody));
} else {
oboe(headers)
.node(on, fn)
.done((data) => resolve(data))
.fail((err) => reject(err.jsonBody));
}
});
}
19 changes: 19 additions & 0 deletions python/src/aiconfig/editor/client/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2539,6 +2539,13 @@
dependencies:
undici-types "~5.26.4"

"@types/oboe@^2.1.4":
version "2.1.4"
resolved "https://registry.yarnpkg.com/@types/oboe/-/oboe-2.1.4.tgz#d92c4636d0b7737803e4361e10e8dad488f39634"
integrity sha512-bXt4BXSQy0N/buSIak1o0TjYAk2SAeK1aZV9xKcb+xVGWYP8NcMOFy2T7Um3kIvEcQJzrdgJ8R6fpbRcp/LEww==
dependencies:
"@types/node" "*"

"@types/parse-json@^4.0.0":
version "4.0.2"
resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.2.tgz#5950e50960793055845e956c427fc2b0d70c5239"
Expand Down Expand Up @@ -6083,6 +6090,11 @@ http-errors@~1.6.2:
setprototypeof "1.1.0"
statuses ">= 1.4.0 < 2"

http-https@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/http-https/-/http-https-1.0.0.tgz#2f908dd5f1db4068c058cd6e6d4ce392c913389b"
integrity sha512-o0PWwVCSp3O0wS6FvNr6xfBCHgt0m1tvPLFOCc2iFDKTRAXhB7m8klDf7ErowFH8POa6dVdGatKU5I1YYwzUyg==

http-parser-js@>=0.5.1:
version "0.5.8"
resolved "https://registry.yarnpkg.com/http-parser-js/-/http-parser-js-0.5.8.tgz#af23090d9ac4e24573de6f6aecc9d84a48bf20e3"
Expand Down Expand Up @@ -8524,6 +8536,13 @@ object.values@^1.1.0, object.values@^1.1.6, object.values@^1.1.7:
define-properties "^1.2.0"
es-abstract "^1.22.1"

oboe@^2.1.5:
version "2.1.5"
resolved "https://registry.yarnpkg.com/oboe/-/oboe-2.1.5.tgz#5554284c543a2266d7a38f17e073821fbde393cd"
integrity sha512-zRFWiF+FoicxEs3jNI/WYUrVEgA7DeET/InK0XQuudGHRg8iIob3cNPrJTKaz4004uaA9Pbe+Dwa8iluhjLZWA==
dependencies:
http-https "^1.0.0"

obuf@^1.0.0, obuf@^1.1.2:
version "1.1.2"
resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e"
Expand Down
44 changes: 44 additions & 0 deletions python/src/aiconfig/editor/server/queue_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Queue iterator for streaming. Can only process strings for now
but in future will try to make it generic"""
# import asyncio
from queue import Queue

# from typing import Generic, TypeVar

# # TODO: Add generic typing for queue items
# # (couldn't get sentinel value to work with generics)
# T = TypeVar('T')
STOP_STREAMING_SIGNAL = object() # sentinel value to indicate end of stream


class QueueIterator:
"""In order to support text streaming, we need to store
the output in a queue and iterate over those values. A lot of this was
inspired by HuggingFace's TextIteratorStreamer object:

I know I can just use a queue directly in the callsite with
`iter(queue.get, None)`, but having a class makes it easier to manage
and abstracts it a bit more.
"""

def __init__(self):
self.q = Queue()
self.stop_signal = STOP_STREAMING_SIGNAL
self.timeout = None

def __iter__(self):
return self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we return self on iter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whenever we want to build something that is iterable (ie: for some_val in MyIterableClass), we need to define a __iter__ value which returns self to indicate that. If it's an async iterable, this becomes async def __aiter__(self):


def __next__(self):
value = self.q.get(block=True, timeout=self.timeout)
if value == self.stop_signal:
raise StopIteration()
return value

def put(self, text: str, stream_end: bool = False):
self.q.put(text, timeout=self.timeout)
if stream_end:
self.q.put(self.stop_signal, timeout=self.timeout)

def isEmpty(self) -> bool:
return self.q.empty()
163 changes: 143 additions & 20 deletions python/src/aiconfig/editor/server/server.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import asyncio
import copy
import json
import logging
import threading
import time
import webbrowser
from typing import Any, Dict, Type, Union

import lastmile_utils.lib.core.api as core_utils
import result
from aiconfig.Config import AIConfigRuntime
from aiconfig.editor.server.queue_iterator import STOP_STREAMING_SIGNAL, QueueIterator
from aiconfig.editor.server.server_utils import (
EditServerConfig,
FlaskResponse,
Expand All @@ -26,11 +32,11 @@
)
from aiconfig.model_parser import InferenceOptions
from aiconfig.registry import ModelParserRegistry
from flask import Flask, request
from flask import Flask, Response, request, stream_with_context
from flask_cors import CORS
from result import Err, Ok, Result

from aiconfig.schema import Prompt
from aiconfig.schema import ExecuteResult, Output, Prompt

logging.getLogger("werkzeug").disabled = True

Expand Down Expand Up @@ -173,35 +179,152 @@ def create() -> FlaskResponse:


@app.route("/api/run", methods=["POST"])
async def run() -> FlaskResponse:
def run() -> FlaskResponse:
EXCLUDE_OPTIONS = {
"prompt_index": True,
"file_path": True,
"callback_manager": True,
}
state = get_server_state(app)
aiconfig = state.aiconfig
request_json = request.get_json()

prompt_name: Union[str, None] = request_json.get("prompt_name")
if prompt_name is None:
return HttpResponseWithAIConfig(
message="No prompt name provided, cannot execute `run` command",
code=400,
aiconfig=None,
).to_flask_format()

# TODO (rossdanlm): Refactor aiconfig.run() to not take in `params`
# as a function arg since we can now just call
# aiconfig.get_parameters(prompt_name) directly inside of run. See:
# https://github.com/lastmile-ai/aiconfig/issues/671
params = request_json.get("params", aiconfig.get_parameters(prompt_name)) # type: ignore
stream = request_json.get("stream", False) # TODO: set this automatically to True after client supports stream output

# Define stream callback and queue object for streaming results
output_text_queue = QueueIterator()

def update_output_queue(data, _accumulated_data, _index) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to work for all the model parser? I thought the idea of the callback was that specific models need specific logic

Copy link

@rossdancraig rossdancraig Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If your model parser supports streaming, you need to implement something that returns the streaming results into a stream_callback object, otherwise nobody will be able to read the results as they're happening. If you don't do this in your model parser, that's on you, not us. But yea you're right we should make this clear in how to build model parser docs

If there are ever cases where you can stream non-text data formats, we'll make a note and try to follow up later

should_end_stream = data == STOP_STREAMING_SIGNAL
output_text_queue.put(data, should_end_stream)

inference_options = InferenceOptions(
stream=stream,
stream_callback=update_output_queue,
)

def generate():
# Use multi-threading so that we don't block run command from
# displaying the streamed output (if streaming is supported)
def run_async_config_in_thread():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this runs a new event loop, which doesn't necessarily run in a new thread

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I've heard, Python does not actually do "multi-threading", it's just syntax sugar over a way to saying 'switch between these event loop processes'

Copy link
Member

@Ankush-lastmile Ankush-lastmile Jan 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flask doesn't use an async event loop so this should be fine

edit: realized that isn't important

asyncio.run(
aiconfig.run(
prompt_name=prompt_name,
params=params,
run_with_dependencies=False,
options=inference_options,
)
)
output_text_queue.put(STOP_STREAMING_SIGNAL)

t = threading.Thread(target=run_async_config_in_thread)
t.start()

# Create a deep copy of the state aiconfig so we can yield an AIConfig
# with streaming partial outputs in the meantime. This probably isn't
# necessary, but just getting unblocked for now
displaying_config = copy.deepcopy(aiconfig)

# If model supports streaming, need to wait until streamer has at
# least 1 item to display. If model does not support streaming,
# need to wait until the aiconfig.run() thread is complete
SLEEP_DELAY_SECONDS = 0.1
wait_time_in_seconds = 0.0
while output_text_queue.isEmpty() and t.is_alive():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this loop for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • For streaming: we would need to wait for the queue to be populated otherwise the for text in output_text_queue will not be able to process anything since it's empty. Therefore we need to wait
  • For non-streaming, we have to wait for the thread t to be complete anyways, so we will wait on that here too

# Yea I know time.sleep() isn't super accurate, but it's fine,
# we can fix later
time.sleep(0.1)
wait_time_in_seconds += SLEEP_DELAY_SECONDS
print(f"Output queue is currently empty. Waiting for {wait_time_in_seconds:.1f}s...")

# Yield in flask is weird and you either need to send responses as a
# string, or artificially wrap them around "[" and "]"
# yield "["
if not output_text_queue.isEmpty():
accumulated_output_text = ""
for text in output_text_queue:
if isinstance(text, str):
accumulated_output_text += text
elif isinstance(text, dict) and "content" in text:
# TODO: Fix streaming output format so that it returns text
accumulated_output_text += text["content"]
elif isinstance(text, dict) and "generated_text" in text:
# TODO: Fix streaming output format so that it returns text
accumulated_output_text += text["generated_text"]

accumulated_output: Output = ExecuteResult(
**{
"output_type": "execute_result",
"data": accumulated_output_text,
# Assume streaming only supports single output
# I think this actually may be wrong for PaLM or OpenAI
# TODO: Need to sync with Ankush but can fix forward
"execution_count": 0,
"metadata": {},
}
)

displaying_config.add_output(prompt_name, accumulated_output, overwrite=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that this will fail for run_with_dependencies since we don't have a way of tracking the currently executing prompt, and just overwriting the last prompt name instead. Since we are deep copying the aiconfig vs. displaying_config, the final result won't be as bad, but it'll look weird during streaming and we should fix. I have a pseudo-diff in https://github.com/lastmile-ai/gradio-workbook/pull/64 with two separate proposals (backend vs. frontend) and would like feedback on it from others

aiconfig_json = displaying_config.model_dump(exclude=EXCLUDE_OPTIONS)
yield "["
yield json.dumps({"aiconfig": aiconfig_json})
yield "]"

# Ensure that the run process is complete to yield final output
t.join()
aiconfig_json = aiconfig.model_dump(exclude=EXCLUDE_OPTIONS)
yield "["
yield json.dumps({"aiconfig": aiconfig_json})
yield "]"

try:
prompt_name: Union[str, None] = request_json.get("prompt_name")
if prompt_name is None:
return HttpResponseWithAIConfig(
message="No prompt name provided, cannot execute `run` command",
code=400,
aiconfig=None,
).to_flask_format()

# TODO (rossdanlm): Refactor aiconfig.run() to not take in `params`
# as a function arg since we can now just call
# aiconfig.get_parameters(prompt_name) directly inside of run. See:
# https://github.com/lastmile-ai/aiconfig/issues/671
params = request_json.get("params", aiconfig.get_parameters(prompt_name)) # type: ignore
stream = request_json.get("stream", False)
options = InferenceOptions(stream=stream)
run_output = await aiconfig.run(prompt_name, params, options) # type: ignore
LOGGER.debug(f"run_output: {run_output}")
if stream:
LOGGER.info(f"Running `aiconfig.run()` command with request: {request_json}")
# Streaming based on
# https://stackoverflow.com/questions/73275517/flask-not-streaming-json-response
return Response(
stream_with_context(generate()),
status=200,
content_type="application/json",
)

# Run without streaming
inference_options = InferenceOptions(stream=stream)
def run_async_config_in_thread():
asyncio.run(
aiconfig.run(
prompt_name=prompt_name,
params=params,
run_with_dependencies=False,
options=inference_options,
)
)
output_text_queue.put(STOP_STREAMING_SIGNAL)

t = threading.Thread(target=run_async_config_in_thread)
t.start()
LOGGER.info(f"Running `aiconfig.run()` command with request: {request_json}")
t.join()
return HttpResponseWithAIConfig(
#
message="Ran prompt",
code=200,
aiconfig=aiconfig,
).to_flask_format()

except Exception as e:
return HttpResponseWithAIConfig(
#
Expand Down