Skip to content

Commit

Permalink
Async generator hooks, simpler approach
Browse files Browse the repository at this point in the history
  • Loading branch information
oremanj committed Jun 10, 2020
1 parent 094e015 commit 26f1aea
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 23 deletions.
14 changes: 9 additions & 5 deletions trio/_core/_entry_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ def run_cb(job):
async def kill_everything(exc):
raise exc

_core.spawn_system_task(kill_everything, exc)
try:
_core.spawn_system_task(kill_everything, exc)
except RuntimeError:
# We're quite late in the shutdown process and
# the system nursery is already closed.
_core.current_task().parent_nursery.start_soon(
kill_everything, exc
)

return True

# This has to be carefully written to be safe in the face of new items
Expand Down Expand Up @@ -102,10 +110,6 @@ def close(self):
def size(self):
return len(self.queue) + len(self.idempotent_queue)

def spawn(self):
name = "<TrioToken.run_sync_soon task>"
_core.spawn_system_task(self.task, name=name)

def run_sync_soon(self, sync_fn, *args, idempotent=False):
with self.lock:
if self.done:
Expand Down
193 changes: 176 additions & 17 deletions trio/_core/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import sys
import threading
from collections import deque
from functools import partial
import collections.abc
from contextlib import contextmanager, closing
import warnings
import weakref
import enum

from contextvars import copy_context
Expand Down Expand Up @@ -42,7 +44,7 @@
from ._thread_cache import start_thread_soon
from .. import _core
from .._deprecate import deprecated
from .._util import Final, NoPublicConstructor, coroutine_or_error
from .._util import Final, NoPublicConstructor, coroutine_or_error, name_asyncgen

_NO_SEND = object()

Expand All @@ -61,8 +63,9 @@ def _public(fn):
_ALLOW_DETERMINISTIC_SCHEDULING = False
_r = random.Random()

# Used to log exceptions in instruments
# Used to log exceptions in instruments and async generator finalizers
INSTRUMENT_LOGGER = logging.getLogger("trio.abc.Instrument")
ASYNCGEN_LOGGER = logging.getLogger("trio.async_generator_errors")


# On 3.7+, Context.run() is implemented in C and doesn't show up in
Expand Down Expand Up @@ -958,7 +961,7 @@ async def async_fn(arg1, arg2, \*, task_status=trio.TASK_STATUS_IGNORED):
self._pending_starts += 1
async with open_nursery() as old_nursery:
task_status = _TaskStatus(old_nursery, self)
thunk = functools.partial(async_fn, task_status=task_status)
thunk = partial(async_fn, task_status=task_status)
task = GLOBAL_RUN_CONTEXT.runner.spawn_impl(
thunk, args, old_nursery, name
)
Expand Down Expand Up @@ -1222,6 +1225,14 @@ class Runner:
is_guest = attr.ib(default=False)
guest_tick_scheduled = attr.ib(default=False)

# Async generators are added to this set when first iterated. Any
# left after the main task exits will be closed before trio.run()
# returns. During the execution of the main task, this is a
# WeakSet so GC works. During shutdown, it's a regular set so we
# don't have to deal with GC firing at unexpected times.
asyncgens = attr.ib(factory=weakref.WeakSet)
prev_asyncgen_hooks = attr.ib(default=None)

def force_guest_tick_asap(self):
if self.guest_tick_scheduled:
return
Expand All @@ -1231,6 +1242,8 @@ def force_guest_tick_asap(self):
def close(self):
self.io_manager.close()
self.entry_queue.close()
if self.prev_asyncgen_hooks is not None:
sys.set_asyncgen_hooks(*self.prev_asyncgen_hooks)
if self.instruments:
self.instrument("after_run")
# This is where KI protection gets disabled, so we do it last
Expand Down Expand Up @@ -1366,7 +1379,7 @@ def spawn_impl(self, async_fn, args, nursery, name, *, system_task=False):

if name is None:
name = async_fn
if isinstance(name, functools.partial):
if isinstance(name, partial):
name = name.func
if not isinstance(name, str):
try:
Expand Down Expand Up @@ -1432,11 +1445,7 @@ def task_exited(self, task, outcome):

task._activate_cancel_status(None)
self.tasks.remove(task)
if task is self.main_task:
self.main_task_outcome = outcome
self.system_nursery.cancel_scope.cancel()
self.system_nursery._child_finished(task, Value(None))
elif task is self.init_task:
if task is self.init_task:
# If the init task crashed, then something is very wrong and we
# let the error propagate. (It'll eventually be wrapped in a
# TrioInternalError.)
Expand All @@ -1446,11 +1455,120 @@ def task_exited(self, task, outcome):
if self.tasks: # pragma: no cover
raise TrioInternalError
else:
if task is self.main_task:
self.main_task_outcome = outcome
outcome = Value(None)
task._parent_nursery._child_finished(task, outcome)

if self.instruments:
self.instrument("task_exited", task)

################
# Async generator finalization support
################

async def finalize_asyncgen(self, agen, name, *, check_running):
if check_running and agen.ag_running:
# Another async generator is iterating this one, which is
# suspended at an event loop trap. Add it back to the
# asyncgens set and we'll get it on the next round. Note
# that this is only possible during end-of-run
# finalization; in GC-directed finalization, no one has a
# reference to agen anymore, so no one can be iterating it.
#
# This field is only reliable on 3.8+ due to
# ttps://bugs.python.org/issue32526. Pythons below
# 3.8 use a workaround in finalize_remaining_asyncgens.
self.asyncgens.add(agen)
return

try:
# This shield ensures that finalize_asyncgen never exits
# with an exception, not even a Cancelled. The inside
# is cancelled so there's no deadlock risk.
with CancelScope(shield=True) as cancel_scope:
cancel_scope.cancel()
await agen.aclose()
except BaseException as exc:
ASYNCGEN_LOGGER.exception(
"Exception ignored during finalization of async generator %r -- "
"surround your use of the generator in 'async with aclosing(...):' "
"to raise exceptions like this in the context where they're generated",
name,
)

async def finalize_remaining_asyncgens(self):
# At the time this function is called, there are exactly two
# tasks running: init and the run_sync_soon task. (And we've
# shut down the system nursery, so no more can appear.)
# Neither one uses async generators, so every async generator
# must be suspended at a yield point -- there's no one to be
# doing the iteration. However, once we start aclose() of one
# async generator, it might start fetching the next value from
# another, thus preventing us from closing that other.
#
# On 3.8+, we can detect this condition by looking at
# ag_running. On earlier versions, ag_running doesn't provide
# useful information. We could look at ag_await, but that
# would fail in case of shenanigans like
# https://github.com/python-trio/async_generator/pull/16.
# It's easier to just not parallelize the shutdowns.
finalize_in_parallel = sys.version_info >= (3, 8)

# It's possible that that cleanup code will itself create
# more async generators, so we iterate repeatedly until
# all are gone.
while self.asyncgens:
batch = self.asyncgens
self.asyncgens = set()

if finalize_in_parallel:
async with open_nursery() as kill_them_all:
# This shield is needed to avoid the checkpoint
# in Nursery.__aexit__ raising Cancelled if we're
# in a cancelled scope. (Which can happen if
# a run_sync_soon callback raises an exception.)
kill_them_all.cancel_scope.shield = True
for agen in batch:
name = name_asyncgen(agen)
kill_them_all.start_soon(
partial(self.finalize_asyncgen, agen, name, check_running=True),
name="close asyncgen {} (outlived run)".format(name),
)

if self.asyncgens == batch: # pragma: no cover
# Something about the running-detection seems
# to have failed; fall back to one-at-a-time mode
# instead of looping forever
finalize_in_parallel = False
else:
for agen in batch:
await self.finalize_asyncgen(agen, name_asyncgen(agen), check_running=False)

def setup_asyncgen_hooks(self):
def firstiter(agen):
self.asyncgens.add(agen)

def finalizer(agen):
agen_name = name_asyncgen(agen)
warnings.warn(
f"Async generator {agen_name!r} was garbage collected before it had "
f"been exhausted. Surround its use in 'async with aclosing(...):' "
f"to ensure that it gets cleaned up as soon as you're done using it.",
ResourceWarning,
stacklevel=2,
)
self.entry_queue.run_sync_soon(
partial(
self.spawn_system_task,
partial(self.finalize_asyncgen, agen, agen_name, check_running=False),
name=f"close asyncgen {agen_name} (abandoned)",
),
)

self.prev_asyncgen_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer)

################
# System tasks and init
################
Expand Down Expand Up @@ -1500,14 +1618,51 @@ def spawn_system_task(self, async_fn, *args, name=None):
)

async def init(self, async_fn, args):
async with open_nursery() as system_nursery:
self.system_nursery = system_nursery
try:
self.main_task = self.spawn_impl(async_fn, args, system_nursery, None)
except BaseException as exc:
self.main_task_outcome = Error(exc)
system_nursery.cancel_scope.cancel()
self.entry_queue.spawn()
# run_sync_soon task runs here:
async with open_nursery() as run_sync_soon_nursery:
# All other system tasks run here:
async with open_nursery() as self.system_nursery:
# Only the main task runs here:
async with open_nursery() as main_task_nursery:
try:
self.main_task = self.spawn_impl(
async_fn, args, main_task_nursery, None
)
except BaseException as exc:
self.main_task_outcome = Error(exc)
return
self.spawn_impl(
self.entry_queue.task,
(),
run_sync_soon_nursery,
"<TrioToken.run_sync_soon task>",
system_task=True,
)

# Main task is done. We should be exiting soon, so
# we're going to shut down GC-mediated async generator
# finalization by turning the asyncgens WeakSet into a
# regular set. We must do that before closing the system
# nursery, since finalization spawns a new system tasks.
self.asyncgens = set(self.asyncgens)

# Process all pending run_sync_soon callbacks, in case one of
# them was an asyncgen finalizer.
self.entry_queue.run_sync_soon(self.reschedule, self.init_task)
await wait_task_rescheduled(lambda _: Abort.FAILED)

# Now it's safe to proceed with shutting down system tasks
self.system_nursery.cancel_scope.cancel()

# System tasks are gone and no more will be appearing.
# The only async-colored user code left to run is the
# finalizers for the async generators that remain alive.
await self.finalize_remaining_asyncgens()

# There are no more asyncgens, which means no more user-provided
# code except possibly run_sync_soon callbacks. It's finally safe
# to stop the run_sync_soon task and exit run().
run_sync_soon_nursery.cancel_scope.cancel()

################
# Outside context problems
Expand Down Expand Up @@ -1989,6 +2144,10 @@ def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False):
if not host_uses_signal_set_wakeup_fd:
runner.entry_queue.wakeup.wakeup_on_signals()

# Do this before before_run in case before_run wants to override
# our hooks
runner.setup_asyncgen_hooks()

if runner.instruments:
runner.instrument("before_run")
runner.clock.start_clock()
Expand Down
Loading

0 comments on commit 26f1aea

Please sign in to comment.