Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in streaming associated with additive chunks #158

Merged
merged 4 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions langserve/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
get_async_callback_manager_for_config,
get_callback_manager_for_config,
)
from langchain.schema.runnable.utils import Input, Output
from langchain.schema.runnable.utils import AddableDict, Input, Output

from langserve.callbacks import CallbackEventDict, ahandle_callbacks, handle_callbacks
from langserve.serialization import (
Expand Down Expand Up @@ -453,6 +453,7 @@ def stream(
callback_manager = get_callback_manager_for_config(config)

final_output: Optional[Output] = None
final_output_supported = True

run_manager = callback_manager.on_chain_start(
dumpd(self),
Expand Down Expand Up @@ -481,12 +482,19 @@ def stream(
for sse in event_source.iter_sse():
if sse.event == "data":
chunk = self._lc_serializer.loads(sse.data)
if isinstance(chunk, dict):
chunk = AddableDict(chunk)
yield chunk

if final_output:
final_output += chunk
else:
final_output = chunk
if final_output_supported:
if final_output is None:
final_output = chunk
else:
try:
final_output = final_output + chunk
except TypeError:
final_output = None
final_output_supported = False
elif sse.event == "error":
# This can only be a server side error
_raise_exception_from_data(
Expand Down Expand Up @@ -516,6 +524,7 @@ async def astream(
callback_manager = get_async_callback_manager_for_config(config)

final_output: Optional[Output] = None
final_output_supported = True

run_manager = await callback_manager.on_chain_start(
dumpd(self),
Expand All @@ -541,12 +550,19 @@ async def astream(
async for sse in event_source.aiter_sse():
if sse.event == "data":
chunk = self._lc_serializer.loads(sse.data)
if isinstance(chunk, dict):
chunk = AddableDict(chunk)
yield chunk

if final_output:
final_output += chunk
else:
final_output = chunk
if final_output_supported:
if final_output is None:
final_output = chunk
else:
try:
final_output = final_output + chunk
except TypeError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this? Can we add a comment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comments, let me know if unclear i can add another PR afterwards with clarifications

final_output = None
final_output_supported = False

elif sse.event == "error":
# This can only be a server side error
Expand Down
68 changes: 56 additions & 12 deletions tests/unit_tests/test_server_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import uuid
from asyncio import AbstractEventLoop
from contextlib import asynccontextmanager, contextmanager
from typing import Any, Dict, Iterator, List, Optional, Union
from typing import Any, Dict, Iterable, Iterator, List, Optional, Union

import httpx
import pytest
Expand Down Expand Up @@ -1263,8 +1263,14 @@ async def passthrough_dict(d: Any) -> Any:
}


class ErroringRunnable(Runnable):
"""A custom runnable for testing errors are raised server side."""
class StreamingRunnable(Runnable):
"""A custom runnable used for testing purposes"""

iterable: Iterable[Any]

def __init__(self, iterable: Iterable[Any]) -> None:
"""Initialize the runnable."""
self.iterable = iterable

def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
"""Invoke the runnable."""
Expand All @@ -1276,27 +1282,64 @@ def stream(
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
yield 1
yield 2
raise ValueError("An exception occurred")
raise NotImplementedError()

async def astream(
self,
input: Iterator[Input],
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
yield 1
yield 2
raise ValueError("An exception occurred")
for element in self.iterable:
if isinstance(element, BaseException):
raise element
yield element


# Have not figured out how to test sync stream yet
# def test_streaming_dict_sync() -> None:
# """Test streaming different types of items."""
# app = FastAPI()
#
# stream_dict = StreamingRunnable(iterable=[{"a": "1"}, {"a": "2"}])
#
# add_routes(app, stream_dict)
#
# # Invoke request
# with get_sync_remote_runnable(app) as runnable:
# chunks = []
# for chunk in runnable.stream("input ignored"):
# chunks.append(chunk)
#
# assert chunks == [{"a": "1"}, {"a": "2"}]


@pytest.mark.asyncio
async def test_streaming_dict_async() -> None:
"""Test streaming different types of items."""
app = FastAPI()

stream_dict = StreamingRunnable(iterable=[{"a": "1"}, {"a": "2"}])

add_routes(app, stream_dict)

# Invoke request
async with get_async_remote_runnable(app, raise_app_exceptions=False) as runnable:
chunks = []
async for chunk in runnable.astream("input ignored"):
chunks.append(chunk)

assert chunks == [{"a": "1"}, {"a": "2"}]


@pytest.mark.asyncio
async def test_server_side_error() -> None:
"""Test server side error handling."""

app = FastAPI()
add_routes(app, ErroringRunnable())

erroring_stream = StreamingRunnable(iterable=[1, 2, ValueError("An error")])
add_routes(app, erroring_stream)

# Invoke request
async with get_async_remote_runnable(app, raise_app_exceptions=False) as runnable:
Expand Down Expand Up @@ -1346,11 +1389,12 @@ async def test_server_side_error() -> None:
# assert e.response.text == "Internal Server Error"


def test_server_side_error_sync() -> None:
def test_server_side_error_sync(event_loop: AbstractEventLoop) -> None:
"""Test server side error handling."""

app = FastAPI()
add_routes(app, ErroringRunnable())
erroring_stream = StreamingRunnable(iterable=[1, 2, ValueError("An error")])
add_routes(app, erroring_stream)

# Invoke request
with get_sync_remote_runnable(app, raise_server_exceptions=False) as runnable:
Expand Down