Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: implement exponential backoff to resolve EventStreamError #94

Merged
merged 1 commit into from
Feb 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions redbox-core/redbox/graph/nodes/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
import logging
import re
import textwrap
import time
from random import uniform
from collections.abc import Callable
from functools import reduce
from typing import Any, Iterable
from uuid import uuid4

from botocore.exceptions import EventStreamError

from langchain.schema import StrOutputParser
from langchain_core.callbacks.manager import dispatch_custom_event
from langchain_core.documents import Document
Expand Down Expand Up @@ -83,11 +87,13 @@ def build_merge_pattern(

When combined with group send, with combine all Documents and use the metadata of the first.

When used without a send, the first Document receieved defines the metadata.
When used without a send, the first Document received defines the metadata.

If tools are supplied, can also set state["tool_calls"].
"""
tokeniser = get_tokeniser()
MAX_RETRIES = 5
BACKOFF_FACTOR = 1.5

@RunnableLambda
def _merge(state: RedboxState) -> dict[str, Any]:
Expand All @@ -106,9 +112,22 @@ def _merge(state: RedboxState) -> dict[str, Any]:
),
)

merge_response = build_llm_chain(
prompt_set=prompt_set, llm=llm, final_response_chain=final_response_chain
).invoke(merge_state)
retries = 0
while retries < MAX_RETRIES:
try:
merge_response = build_llm_chain(
prompt_set=prompt_set, llm=llm, final_response_chain=final_response_chain
).invoke(merge_state)

# if reaches a successful citation, exit the loop
break

except EventStreamError as e:
retries += 1
if retries >= MAX_RETRIES:
raise e
wait_time = BACKOFF_FACTOR**retries + uniform(0, 1)
time.sleep(wait_time)

merged_document.page_content = merge_response["messages"][-1].content
request_metadata = merge_response["metadata"]
Expand Down
Loading