Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get basic streaming to work for run #683

Merged
merged 1 commit into from
Jan 6, 2024
Merged

Get basic streaming to work for run #683

merged 1 commit into from
Jan 6, 2024

Conversation

rossdanlm
Copy link
Contributor

@rossdanlm rossdanlm commented Dec 30, 2023

Get basic streaming to work for run

Building upon what Ryan investigated in #651, all the frontend changes are from him I just rebased onto his PR

This is a bit tricky becasue we have to support:

  1. streaming models --> use a queue iterator for passing the output text
  2. non-streaming models --> still yield, just don't use a queue iterator and wait for run command to finish

General flow:

  1. Parse prompts from client
  2. Define stream callback with queue iterator
  3. Start thread to run aiconfig without blocking main thread from accessing queue iterator
  4. Create a copy of the original AIConfig so we can write partially streamed outputs, yield and display it without risk of race conditions
  5. Wait for queue iterator to start containing data, or wait until max timeout (becuase model may not support streaming)
  6. Iterate through queue iterator, saving output to display config, yield display config
  7. Once output is complete, wait for the original config.run() thread and display the output from that

Open questions/TODOs

  1. [solved - use while output_text_queue.isEmpty() and t.is_alive()] How can we check whether model supports streaming or not? Right now we just default to having a max timeout of 5s, but long-term would be better for people to explicitly mark this as a boolean flag in their model parser class
  2. I need update the output format for streaming. I thought it was fine but guess not, will verify again. A bit annoying but also not a crazy blocker for now
  3. Client needs to also support streaming, but that's fine Ryan can get unblocked with this diff now
  4. Pretty complex, but streaming will break for run_with_dependencies. I've got a proposal to fix forward in https://github.com/lastmile-ai/gradio-workbook/pull/64 and really want people to take a look and give feedback

Test plan

alias aiconfig="python -m 'aiconfig.scripts.aiconfig_cli'"
aiconfig edit --aiconfig-path="/Users/rossdancraig/Projects/aiconfig/cookbooks/Getting-Started/travel.aiconfig.json" --server-port=8080 --server-mode=debug_servers

# Now run this from another terminal
curl http://localhost:8080/api/run -d '{"prompt_name":"get_activities"}' -X POST -H 'Content-Type: application/json'

I also added this line to print output:

print(accumulated_output_text)

Streaming

Screen.Recording.2024-01-05.at.18.55.00.mov

Non-streaming (same as before)

Screen.Recording.2024-01-05.at.18.44.40.mov

@rossdanlm rossdanlm changed the title [editor][server endpoints][3/n]: Proof-of-Concept for Streaming Get basic streaming to work for run Dec 30, 2023
@rossdanlm rossdanlm force-pushed the pr683 branch 3 times, most recently from a8eab0e to f6db1e6 Compare December 30, 2023 05:01
@rossdanlm rossdanlm marked this pull request as ready for review December 30, 2023 05:02
@rossdanlm rossdanlm force-pushed the pr683 branch 2 times, most recently from f45c35e to 97c5a2a Compare December 30, 2023 05:22
}
)

displaying_config.add_output(prompt_name, accumulated_output, overwrite=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that this will fail for run_with_dependencies since we don't have a way of tracking the currently executing prompt, and just overwriting the last prompt name instead. Since we are deep copying the aiconfig vs. displaying_config, the final result won't be as bad, but it'll look weird during streaming and we should fix. I have a pseudo-diff in https://github.com/lastmile-ai/gradio-workbook/pull/64 with two separate proposals (backend vs. frontend) and would like feedback on it from others

@rossdanlm rossdanlm force-pushed the pr683 branch 3 times, most recently from ea56f91 to a5e377c Compare December 30, 2023 06:03
Copy link
Contributor

@jonathanlastmileai jonathanlastmileai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came to the same conclusion about needing an inter-thread queue while playing with this on server v2. However, in that case, I was restricting myself to not defining a custom callback because my understanding was we would have to define a specific one for every model provider.

If we can pass a callback, isn't this a lot easier with a web socket? You can make the callback write the chunks directly into the socket synchronously; it doesn't matter if you block the main thread, and then when finished return the result as usual.

My verdict: If it works, ship, if still not quite there for whatever reason, let's consider v2

# # TODO: Add generic typing for queue items
# # (couldn't get sentinel value to work with generics)
# T = TypeVar('T')
STOP_STREAMING_SIGNAL = object() #sentinel value to indicate end of stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: object() is a weird sentinel value. Howa bout we make a Sentinel enum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created issue to follow up in #794


# Define stream callback and queue object for streaming results
output_text_queue = QueueIterator()
def update_output_queue(data, _accumulated_data, _index) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to work for all the model parser? I thought the idea of the callback was that specific models need specific logic

Copy link

@rossdancraig rossdancraig Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If your model parser supports streaming, you need to implement something that returns the streaming results into a stream_callback object, otherwise nobody will be able to read the results as they're happening. If you don't do this in your model parser, that's on you, not us. But yea you're right we should make this clear in how to build model parser docs

If there are ever cases where you can stream non-text data formats, we'll make a note and try to follow up later

def generate():
# Use multi-threading so that we don't block run command from
# displaying the streamed output (if streaming is supported)
def run_async_config_in_thread():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this runs a new event loop, which doesn't necessarily run in a new thread

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I've heard, Python does not actually do "multi-threading", it's just syntax sugar over a way to saying 'switch between these event loop processes'

Copy link
Member

@Ankush-lastmile Ankush-lastmile Jan 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flask doesn't use an async event loop so this should be fine

edit: realized that isn't important

# need to wait until the aiconfig.run() thread is complete
SLEEP_DELAY_SECONDS = 0.1
wait_time_in_seconds = 0.0
while output_text_queue.isEmpty() and t.is_alive():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this loop for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • For streaming: we would need to wait for the queue to be populated otherwise the for text in output_text_queue will not be able to process anything since it's empty. Therefore we need to wait
  • For non-streaming, we have to wait for the thread t to be complete anyways, so we will wait on that here too

@rossdanlm rossdanlm marked this pull request as ready for review January 5, 2024 07:43
rossdanlm added a commit that referenced this pull request Jan 5, 2024
Run python autoformatter on entire Python libraries


I just got tired of doing this manually every time. We should just
eventually make a script to do this, can do later. Filed a task:
#771

This includes the cookbooks

cc jonathan I couldn't just do a clean

```
fd --glob '*.py'  cookbooks/* | xargs python -m 'scripts.lint' --mode=fix --files
```

because there's an issue with llama extension:
```
rossdancraig@Rossdans-MBP aiconfig % fd --glob '*.py'  cookbooks/llama | xargs python -m 'scripts.lint' --mode=fix --files
Running autoflake
[Errno 2] No such file or directory: '/Users/jonathan/Projects/aiconfig/extensions/llama/python/llama.py'
Running isort
Broken 1 paths
Running black
Usage: black [OPTIONS] SRC ...
Try 'black -h' for help.

Error: Invalid value for 'SRC ...': Path '/Users/jonathan/Projects/aiconfig/extensions/llama/python/llama.py' does not exist.
[CRITICAL] 2024-01-05 01:58:04,022 lint.py:161: err: Some jobs failed:
OK:Oks:
Ok
stdout=''
stderr=''
Err:Errs:
Failure: exit code = 1
stdout=''
stderr=''
Failure: exit code = 2
stdout=''
stderr=''

```

---
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with
[ReviewStack](https://reviewstack.dev/lastmile-ai/aiconfig/pull/770).
* #683
* __->__ #770
@rossdanlm rossdanlm force-pushed the pr683 branch 2 times, most recently from 31ab023 to 1e6ef2c Compare January 5, 2024 23:56
Building upon what Ryan investigated in #651, all the frontend changes are from him I just rebased onto his PR


This is a bit tricky becasue we have to support:

1. streaming models --> use a queue iterator for passing the output text
2. non-streaming models --> still yield, just don't use a queue iterator and wait for run command to finish


General flow:
1. Parse prompts from client
2. Define stream callback with queue iterator
3. Start thread to run aiconfig without blocking main thread from accessing queue iterator
4. Create a copy of the original AIConfig so we can write partially streamed outputs, yield and display it without risk of race conditions
5. Wait for queue iterator to start containing data, or wait until max timeout (becuase model may not support streaming)
5. Iterate through queue iterator, saving output to display config, yield display config
6. Once output is complete, wait for the original `config.run()` thread and display the output from that

Open questions/TODOs
1. [solved - use `while output_text_queue.isEmpty() and t.is_alive()`] ~~How can we check whether model supports streaming or not? Right now we just default to having a max timeout of 5s, but long-term would be better for people to explicitly mark this as a boolean flag in their model parser class~~
2. I need update the output format for streaming. I thought it was fine but guess not, will verify again. A bit annoying but also not a crazy blocker for now
3. Client needs to also support streaming, but that's fine Ryan can get unblocked with this diff now
4. Pretty complex, but streaming will break for `run_with_dependencies`. I've got a proposal to fix forward in https://github.com/lastmile-ai/gradio-workbook/pull/64 and really want people to take a look and give feedback

## Test plan
```bash
alias aiconfig="python -m 'aiconfig.scripts.aiconfig_cli'"
aiconfig edit --aiconfig-path="/Users/rossdancraig/Projects/aiconfig/cookbooks/Getting-Started/travel.aiconfig.json" --server-port=8080 --server-mode=debug_servers

# Now run this from another terminal
curl http://localhost:8080/api/run -d '{"prompt_name":"get_activities"}' -X POST -H 'Content-Type: application/json'
```

I also added this line to print output:
```
print(accumulated_output_text)
```
Streaming

https://github.com/lastmile-ai/aiconfig/assets/151060367/d8930ea6-3143-49a3-89c6-4a2668c2e9e1

Non-streaming (same as before)

https://github.com/lastmile-ai/aiconfig/assets/151060367/5aae7c7f-c273-4be7-bcb9-e96199a04076
self.timeout = None

def __iter__(self):
return self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we return self on iter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whenever we want to build something that is iterable (ie: for some_val in MyIterableClass), we need to define a __iter__ value which returns self to indicate that. If it's an async iterable, this becomes async def __aiter__(self):

@@ -0,0 +1,37 @@
import oboe, { Options } from "oboe";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't adding functionality to this pr right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was needed I think on client for Ryan to process. I kept it here from his diff in #651

cc @rholinshead will let you keep or delete moving forward

Copy link
Member

@Ankush-lastmile Ankush-lastmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approving to unblock. cc @rossdancraig tested that this doesn't break current functionality since this is now a 1 week old pr.

@rossdanlm rossdanlm merged commit e3590eb into main Jan 6, 2024
2 checks passed
@rossdanlm rossdanlm deleted the pr683 branch January 6, 2024 00:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants