-
-
Notifications
You must be signed in to change notification settings - Fork 348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for async generator finalization #1564
Merged
Merged
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
26f1aea
Async generator hooks, simpler approach
oremanj ac3e46d
blacken
oremanj 2947de7
Respond to review comments + add more tests
oremanj d198ed2
blacken
oremanj b720303
Merge remote-tracking branch 'origin/master' into asyncgenhooks-basic
oremanj 45c2eee
flake8
oremanj 2d81bb0
Fix mismerge with master
oremanj 5a37f78
Work correctly in -Werror mode too
oremanj 3acf34b
Handle asyncgens correctly when Trio is the guest of an asyncio host
oremanj 80e8ab3
Fix 3.6
oremanj cfdb850
Make tests pass on pypy 7.2 which doesn't run firstiter hooks
oremanj 153ce13
Hopefully resolve coverage issues
oremanj ebaf69c
blacken
oremanj 5b2c544
Merge remote-tracking branch 'origin/master' into asyncgenhooks-basic
oremanj 936ccdb
Add docs and newsfragment
oremanj 7d0fcd9
Fix formatting
oremanj 86a8b7d
Merge remote-tracking branch 'origin/master' into asyncgenhooks-basic
oremanj 4776728
Fix flake8
oremanj 5574b9e
Merge remote-tracking branch 'origin/master' into asyncgenhooks-basic
oremanj 9868f25
Respond to review comments; split asyncgens logic into a separate file
oremanj 2af6974
Fix mypy
oremanj 33d168e
Merge remote-tracking branch 'origin/master' into asyncgenhooks-basic
oremanj 0441952
Review responses
oremanj File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
Trio now supports automatic :ref:`async generator finalization | ||
<async-generators>`, so more async generators will work even if you | ||
don't wrap them in ``async with async_generator.aclosing():`` | ||
blocks. Please see the documentation for important caveats; in | ||
particular, yielding within a nursery or cancel scope remains | ||
unsupported. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
import attr | ||
import logging | ||
import sys | ||
import warnings | ||
import weakref | ||
|
||
from .._util import name_asyncgen | ||
from . import _run | ||
from .. import _core | ||
|
||
# Used to log exceptions in async generator finalizers | ||
ASYNCGEN_LOGGER = logging.getLogger("trio.async_generator_errors") | ||
|
||
|
||
@attr.s(eq=False, slots=True) | ||
class AsyncGenerators: | ||
# Async generators are added to this set when first iterated. Any | ||
# left after the main task exits will be closed before trio.run() | ||
# returns. During most of the run, this is a WeakSet so GC works. | ||
# During shutdown, when we're finalizing all the remaining | ||
# asyncgens after the system nursery has been closed, it's a | ||
# regular set so we don't have to deal with GC firing at | ||
# unexpected times. | ||
alive = attr.ib(factory=weakref.WeakSet) | ||
|
||
# This collects async generators that get garbage collected during | ||
# the one-tick window between the system nursery closing and the | ||
# init task starting end-of-run asyncgen finalization. | ||
trailing_needs_finalize = attr.ib(factory=set) | ||
|
||
prev_hooks = attr.ib(init=False) | ||
|
||
def install_hooks(self, runner): | ||
def firstiter(agen): | ||
if hasattr(_run.GLOBAL_RUN_CONTEXT, "task"): | ||
self.alive.add(agen) | ||
else: | ||
# An async generator first iterated outside of a Trio | ||
# task doesn't belong to Trio. Probably we're in guest | ||
# mode and the async generator belongs to our host. | ||
# The locals dictionary is the only good place to | ||
# remember this fact, at least until | ||
# https://bugs.python.org/issue40916 is implemented. | ||
agen.ag_frame.f_locals["@trio_foreign_asyncgen"] = True | ||
if self.prev_hooks.firstiter is not None: | ||
self.prev_hooks.firstiter(agen) | ||
|
||
def finalize_in_trio_context(agen, agen_name): | ||
try: | ||
runner.spawn_system_task( | ||
self._finalize_one, | ||
agen, | ||
agen_name, | ||
name=f"close asyncgen {agen_name} (abandoned)", | ||
) | ||
except RuntimeError: | ||
# There is a one-tick window where the system nursery | ||
# is closed but the init task hasn't yet made | ||
# self.asyncgens a strong set to disable GC. We seem to | ||
# have hit it. | ||
self.trailing_needs_finalize.add(agen) | ||
|
||
def finalizer(agen): | ||
agen_name = name_asyncgen(agen) | ||
try: | ||
is_ours = not agen.ag_frame.f_locals.get("@trio_foreign_asyncgen") | ||
except AttributeError: # pragma: no cover | ||
is_ours = True | ||
|
||
if is_ours: | ||
runner.entry_queue.run_sync_soon( | ||
finalize_in_trio_context, agen, agen_name | ||
) | ||
|
||
# Do this last, because it might raise an exception | ||
# depending on the user's warnings filter. (That | ||
# exception will be printed to the terminal and | ||
# ignored, since we're running in GC context.) | ||
warnings.warn( | ||
f"Async generator {agen_name!r} was garbage collected before it " | ||
f"had been exhausted. Surround its use in 'async with " | ||
f"aclosing(...):' to ensure that it gets cleaned up as soon as " | ||
f"you're done using it.", | ||
ResourceWarning, | ||
stacklevel=2, | ||
source=agen, | ||
) | ||
else: | ||
# Not ours -> forward to the host loop's async generator finalizer | ||
if self.prev_hooks.finalizer is not None: | ||
self.prev_hooks.finalizer(agen) | ||
else: | ||
# Host has no finalizer. Reimplement the default | ||
# Python behavior with no hooks installed: throw in | ||
# GeneratorExit, step once, raise RuntimeError if | ||
# it doesn't exit. | ||
closer = agen.aclose() | ||
try: | ||
# If the next thing is a yield, this will raise RuntimeError | ||
# which we allow to propagate | ||
closer.send(None) | ||
except StopIteration: | ||
pass | ||
else: | ||
# If the next thing is an await, we get here. Give a nicer | ||
# error than the default "async generator ignored GeneratorExit" | ||
raise RuntimeError( | ||
f"Non-Trio async generator {agen_name!r} awaited something " | ||
f"during finalization; install a finalization hook to " | ||
f"support this, or wrap it in 'async with aclosing(...):'" | ||
) | ||
|
||
self.prev_hooks = sys.get_asyncgen_hooks() | ||
sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer) | ||
|
||
async def finalize_remaining(self, runner): | ||
# This is called from init after shutting down the system nursery. | ||
# The only tasks running at this point are init and | ||
# the run_sync_soon task, and since the system nursery is closed, | ||
# there's no way for user code to spawn more. | ||
assert _core.current_task() is runner.init_task | ||
assert len(runner.tasks) == 2 | ||
|
||
# To make async generator finalization easier to reason | ||
# about, we'll shut down asyncgen garbage collection by turning | ||
# the alive WeakSet into a regular set. | ||
self.alive = set(self.alive) | ||
|
||
# Process all pending run_sync_soon callbacks, in case one of | ||
# them was an asyncgen finalizer that snuck in under the wire. | ||
runner.entry_queue.run_sync_soon(runner.reschedule, runner.init_task) | ||
await _core.wait_task_rescheduled( | ||
lambda _: _core.Abort.FAILED # pragma: no cover | ||
) | ||
self.alive.update(self.trailing_needs_finalize) | ||
self.trailing_needs_finalize.clear() | ||
|
||
# None of the still-living tasks use async generators, so | ||
# every async generator must be suspended at a yield point -- | ||
# there's no one to be doing the iteration. That's good, | ||
# because aclose() only works on an asyncgen that's suspended | ||
# at a yield point. (If it's suspended at an event loop trap, | ||
# because someone is in the middle of iterating it, then you | ||
# get a RuntimeError on 3.8+, and a nasty surprise on earlier | ||
# versions due to https://bugs.python.org/issue32526.) | ||
# | ||
# However, once we start aclose() of one async generator, it | ||
# might start fetching the next value from another, thus | ||
# preventing us from closing that other (at least until | ||
# aclose() of the first one is complete). This constraint | ||
# effectively requires us to finalize the remaining asyncgens | ||
# in arbitrary order, rather than doing all of them at the | ||
# same time. On 3.8+ we could defer any generator with | ||
# ag_running=True to a later batch, but that only catches | ||
# the case where our aclose() starts after the user's | ||
# asend()/etc. If our aclose() starts first, then the | ||
# user's asend()/etc will raise RuntimeError, since they're | ||
# probably not checking ag_running. | ||
# | ||
# It might be possible to allow some parallelized cleanup if | ||
# we can determine that a certain set of asyncgens have no | ||
# interdependencies, using gc.get_referents() and such. | ||
# But just doing one at a time will typically work well enough | ||
# (since each aclose() executes in a cancelled scope) and | ||
# is much easier to reason about. | ||
|
||
# It's possible that that cleanup code will itself create | ||
# more async generators, so we iterate repeatedly until | ||
# all are gone. | ||
while self.alive: | ||
batch = self.alive | ||
self.alive = set() | ||
for agen in batch: | ||
await self._finalize_one(agen, name_asyncgen(agen)) | ||
|
||
def close(self): | ||
sys.set_asyncgen_hooks(*self.prev_hooks) | ||
|
||
async def _finalize_one(self, agen, name): | ||
try: | ||
# This shield ensures that finalize_asyncgen never exits | ||
# with an exception, not even a Cancelled. The inside | ||
# is cancelled so there's no deadlock risk. | ||
with _core.CancelScope(shield=True) as cancel_scope: | ||
cancel_scope.cancel() | ||
await agen.aclose() | ||
except BaseException: | ||
ASYNCGEN_LOGGER.exception( | ||
"Exception ignored during finalization of async generator %r -- " | ||
"surround your use of the generator in 'async with aclosing(...):' " | ||
"to raise exceptions like this in the context where they're generated", | ||
name, | ||
) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, does the batching here do anything? Would it work just as well to write:
? I'm not too worried either way but I'm wondering if I'm missing something...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The batching finalizes the currently-active generators before any new ones that get firstiter'ed during that process. I can't think of anywhere that the difference would matter but the batch-based ordering seems more intuitive to me.