-
Notifications
You must be signed in to change notification settings - Fork 16.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core: Use Blockbuster to detect blocking calls in asyncio during tests #29043
base: master
Are you sure you want to change the base?
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
0983c3d
to
7ce4b18
Compare
|
||
with tempfile.NamedTemporaryFile(delete=True, suffix=".jpg") as temp_file: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's useless to use a real file here.
"""Test invoking nested runnable lambda.""" | ||
blockbuster.deactivate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code makes a sync call from async on purpose...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Should we enable/disable for unit tests?
I like that it forces us to separate async tests from sync tests and avoid being lazy, but at the same time there's some changes that aren't technically necessary and seem to unnecessarily complicate the testing code (e.g., updating a file open to a non blocking request)?
@@ -121,7 +121,7 @@ def _config_with_context( | |||
return patch_config(config, configurable=context_funcs) | |||
|
|||
|
|||
def aconfig_with_context( | |||
async def aconfig_with_context( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically breaking change, but probably okay looks a lot like a private function to me.
Would you mind adding a comment about why this needs to be async (i.e., is inspect.get_source is making os calls?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, inspect.get_source makes os.stat
calls to check if a source file has been updated from the linecache
and FS read calls to get the code if the linecache
needs to be updated (done at least at the first access).
Note that an LRU cache was added probably because these os calls have an impact on perf ?#28131
Thinking about it, it may be cleaner to have a aconfig_specs
in Runnable
that defaults to calling config_specs
(not in a thread) and that we can override to use a thread for RunnableLambda
. WDYT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you separate this specific change into another PR? I'll need to dig to figure out what's going on and don't feel comfortable merging before investigating or having some benchmark metrics
Other changes look good, so we can merge them in.
@@ -90,9 +91,11 @@ async def test_inmemory_dump_load(tmp_path: Path) -> None: | |||
output = await store.asimilarity_search("foo", k=1) | |||
|
|||
test_file = str(tmp_path / "test.json") | |||
store.dump(test_file) | |||
await asyncio.to_thread(store.dump, test_file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like a false positive for the test? it doesn't really matter whether this is using blocking or non blocking code here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, with the way blockbuster is configured here (activated before the test, deativated after), the test code itself needs to be non-blocking.
Maybe we could add aload
/adump
methods to InMemoryVectorStore
using aiofile
/aiofiles
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- OK to add aload/adump
- Not using aiofile as that would be a new dependency for run time, I think we can just use asyncio.to_thread in the implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead I changed the test to make it sync. See 4d4d9a6
@@ -298,17 +299,17 @@ def parent(a: int) -> int: | |||
# Now run the chain and check the resulting posts | |||
cb = [tracer] | |||
if method == "invoke": | |||
res: Any = parent.invoke(1, {"callbacks": cb}) # type: ignore | |||
res: Any = await asyncio.to_thread(parent.invoke, 1, {"callbacks": cb}) # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any benefit to doing this? feel like it's complicating the test code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored this part to cleanly separate sync and async tests: see c2162c2
With this the asyncio.to_thread
calls are not needed.
188abdf
to
4598e0d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK makes sense -- I'll finish reviewing the test changes on Monday and will merge.
If there were a way to configure block buster to fail based on the stack trace (e.g., we mostly care about async calls to langchain APIs) , it'll probably make it more valuable (or at least less effort to adopt widely)
test_foo.py
async def test1():
---
with open(..) as f: # blocking call but we don't care in 99% of the time
...
----
# Blocking call from calling sync method in async test.. we probably care about it
# but there might be some exceptions
runnable.invoke()
# blocking call from an async api
# we care about this 100% of the time
await some_langchain_thing() # results in a blocking call
3a082ce
to
e5c91e0
Compare
asyncio.Event is not thread-safe so it must be created in the asyncio thread
48dd730
to
3c8ab33
Compare
I can have a look at that but this means inspecting the full stack to search for langchain_core files. This is a slow operation and I don't know if it will be performant enough. |
( | ||
bb.functions[func] | ||
.can_block_in("langchain_core/_api/internal.py", "is_caller_internal") | ||
.can_block_in("langchain_core/runnables/base.py", "__repr__") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RunnableLambda's __repr__
calls get_lambda_source
which is blocking. It should probably be cached.
`RunnableLambda`'s `deps` may do costly OS operation by calling `get_function_nonlocals`. So it's better to cache it. See #29043
`RunnableLambda`'s `__repr__` may do costly OS operation by calling `get_lambda_source`. So it's better to cache it. See #29043 --------- Co-authored-by: Chester Curme <[email protected]>
This PR uses the blockbuster library in langchain-core to detect blocking calls made in the asyncio event loop during unit tests.
Avoiding blocking calls is hard as these can be deeply buried in the code or made in 3rd party libraries.
Blockbuster makes it easier to detect them by raising an exception when a call is made to a known blocking function (eg:
time.sleep
).Adding blockbuster allowed to find a blocking call in
aconfig_with_context
(it ends up callingget_function_nonlocals
which loads function code).Dependencies:
Twitter handle: cbornet_