diff --git a/docs/README.rst b/docs/README.rst index 9dfe2f60..9dd7faf4 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -3,8 +3,8 @@ |gh_actions| |docs| -``tractor`` is a `structured concurrent`_, multi-processing_ runtime -built on trio_. +``tractor`` is a `structured concurrent`_, (optionally +distributed_) multi-processing_ runtime built on trio_. Fundamentally, ``tractor`` gives you parallelism via ``trio``-"*actors*": independent Python processes (aka @@ -17,11 +17,20 @@ protocol" constructed on top of multiple Pythons each running a ``trio`` scheduled runtime - a call to ``trio.run()``. We believe the system adheres to the `3 axioms`_ of an "`actor model`_" -but likely *does not* look like what *you* probably think an "actor -model" looks like, and that's *intentional*. +but likely **does not** look like what **you** probably *think* an "actor +model" looks like, and that's **intentional**. -The first step to grok ``tractor`` is to get the basics of ``trio`` down. -A great place to start is the `trio docs`_ and this `blog post`_. + +Where do i start!? +------------------ +The first step to grok ``tractor`` is to get an intermediate +knowledge of ``trio`` and **structured concurrency** B) + +Some great places to start are, +- the seminal `blog post`_ +- obviously the `trio docs`_ +- wikipedia's nascent SC_ page +- the fancy diagrams @ libdill-docs_ Features @@ -593,6 +602,7 @@ matrix seems too hip, we're also mostly all in the the `trio gitter channel`_! .. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228 +.. _distributed: https://en.wikipedia.org/wiki/Distributed_computing .. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing .. _trio: https://github.com/python-trio/trio .. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements @@ -611,8 +621,9 @@ channel`_! .. _trio docs: https://trio.readthedocs.io/en/latest/ .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency +.. _SC: https://en.wikipedia.org/wiki/Structured_concurrency +.. _libdill-docs: https://sustrik.github.io/libdill/structured-concurrency.html .. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency -.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency .. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony .. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _trio-parallel: https://github.com/richardsheridan/trio-parallel diff --git a/examples/debugging/asyncio_bp.py b/examples/debugging/asyncio_bp.py new file mode 100644 index 00000000..b32ad1d8 --- /dev/null +++ b/examples/debugging/asyncio_bp.py @@ -0,0 +1,117 @@ +import asyncio + +import trio +import tractor +from tractor import to_asyncio + + +async def aio_sleep_forever(): + await asyncio.sleep(float('inf')) + + +async def bp_then_error( + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, + + raise_after_bp: bool = True, + +) -> None: + + # sync with ``trio``-side (caller) task + to_trio.send_nowait('start') + + # NOTE: what happens here inside the hook needs some refinement.. + # => seems like it's still `._debug._set_trace()` but + # we set `Lock.local_task_in_debug = 'sync'`, we probably want + # some further, at least, meta-data about the task/actoq in debug + # in terms of making it clear it's asyncio mucking about. + breakpoint() + + # short checkpoint / delay + await asyncio.sleep(0.5) + + if raise_after_bp: + raise ValueError('blah') + + # TODO: test case with this so that it gets cancelled? + else: + # XXX NOTE: this is required in order to get the SIGINT-ignored + # hang case documented in the module script section! + await aio_sleep_forever() + + +@tractor.context +async def trio_ctx( + ctx: tractor.Context, + bp_before_started: bool = False, +): + + # this will block until the ``asyncio`` task sends a "first" + # message, see first line in above func. + async with ( + + to_asyncio.open_channel_from( + bp_then_error, + raise_after_bp=not bp_before_started, + ) as (first, chan), + + trio.open_nursery() as n, + ): + + assert first == 'start' + + if bp_before_started: + await tractor.breakpoint() + + await ctx.started(first) + + n.start_soon( + to_asyncio.run_task, + aio_sleep_forever, + ) + await trio.sleep_forever() + + +async def main( + bps_all_over: bool = False, + +) -> None: + + async with tractor.open_nursery() as n: + + p = await n.start_actor( + 'aio_daemon', + enable_modules=[__name__], + infect_asyncio=True, + debug_mode=True, + loglevel='cancel', + ) + + async with p.open_context( + trio_ctx, + bp_before_started=bps_all_over, + ) as (ctx, first): + + assert first == 'start' + + if bps_all_over: + await tractor.breakpoint() + + # await trio.sleep_forever() + await ctx.cancel() + assert 0 + + # TODO: case where we cancel from trio-side while asyncio task + # has debugger lock? + # await p.cancel_actor() + + +if __name__ == '__main__': + + # works fine B) + trio.run(main) + + # will hang and ignores SIGINT !! + # NOTE: you'll need to send a SIGQUIT (via ctl-\) to kill it + # manually.. + # trio.run(main, True) diff --git a/requirements-test.txt b/requirements-test.txt index 8070f2c7..b589bd12 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -6,3 +6,4 @@ mypy trio_typing pexpect towncrier +numpy diff --git a/tests/test_shm.py b/tests/test_shm.py new file mode 100644 index 00000000..2b7a382f --- /dev/null +++ b/tests/test_shm.py @@ -0,0 +1,167 @@ +""" +Shared mem primitives and APIs. + +""" +import uuid + +# import numpy +import pytest +import trio +import tractor +from tractor._shm import ( + open_shm_list, + attach_shm_list, +) + + +@tractor.context +async def child_attach_shml_alot( + ctx: tractor.Context, + shm_key: str, +) -> None: + + await ctx.started(shm_key) + + # now try to attach a boatload of times in a loop.. + for _ in range(1000): + shml = attach_shm_list( + key=shm_key, + readonly=False, + ) + assert shml.shm.name == shm_key + await trio.sleep(0.001) + + +def test_child_attaches_alot(): + async def main(): + async with tractor.open_nursery() as an: + + # allocate writeable list in parent + key = f'shml_{uuid.uuid4()}' + shml = open_shm_list( + key=key, + ) + + portal = await an.start_actor( + 'shm_attacher', + enable_modules=[__name__], + ) + + async with ( + portal.open_context( + child_attach_shml_alot, + shm_key=shml.key, + ) as (ctx, start_val), + ): + assert start_val == key + await ctx.result() + + await portal.cancel_actor() + + trio.run(main) + + +@tractor.context +async def child_read_shm_list( + ctx: tractor.Context, + shm_key: str, + use_str: bool, + frame_size: int, +) -> None: + + # attach in child + shml = attach_shm_list( + key=shm_key, + # dtype=str if use_str else float, + ) + await ctx.started(shml.key) + + async with ctx.open_stream() as stream: + async for i in stream: + print(f'(child): reading shm list index: {i}') + + if use_str: + expect = str(float(i)) + else: + expect = float(i) + + if frame_size == 1: + val = shml[i] + assert expect == val + print(f'(child): reading value: {val}') + else: + frame = shml[i - frame_size:i] + print(f'(child): reading frame: {frame}') + + +@pytest.mark.parametrize( + 'use_str', + [False, True], + ids=lambda i: f'use_str_values={i}', +) +@pytest.mark.parametrize( + 'frame_size', + [1, 2**6, 2**10], + ids=lambda i: f'frame_size={i}', +) +def test_parent_writer_child_reader( + use_str: bool, + frame_size: int, +): + + async def main(): + async with tractor.open_nursery( + # debug_mode=True, + ) as an: + + portal = await an.start_actor( + 'shm_reader', + enable_modules=[__name__], + debug_mode=True, + ) + + # allocate writeable list in parent + key = 'shm_list' + seq_size = int(2 * 2 ** 10) + shml = open_shm_list( + key=key, + size=seq_size, + dtype=str if use_str else float, + readonly=False, + ) + + async with ( + portal.open_context( + child_read_shm_list, + shm_key=key, + use_str=use_str, + frame_size=frame_size, + ) as (ctx, sent), + + ctx.open_stream() as stream, + ): + + assert sent == key + + for i in range(seq_size): + + val = float(i) + if use_str: + val = str(val) + + # print(f'(parent): writing {val}') + shml[i] = val + + # only on frame fills do we + # signal to the child that a frame's + # worth is ready. + if (i % frame_size) == 0: + print(f'(parent): signalling frame full on {val}') + await stream.send(i) + else: + print(f'(parent): signalling final frame on {val}') + await stream.send(i) + + await portal.cancel_actor() + + trio.run(main) diff --git a/tractor/__init__.py b/tractor/__init__.py index aa262105..c653ec05 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -21,7 +21,6 @@ from exceptiongroup import BaseExceptionGroup from ._clustering import open_actor_cluster -from ._ipc import Channel from ._context import ( Context, context, @@ -48,6 +47,8 @@ ) from ._debug import ( breakpoint, + pause, + pause_from_sync, post_mortem, ) from . import msg @@ -55,31 +56,35 @@ run_daemon, open_root_actor, ) +from ._ipc import Channel from ._portal import Portal from ._runtime import Actor __all__ = [ 'Actor', + 'BaseExceptionGroup', 'Channel', 'Context', 'ContextCancelled', 'ModuleNotExposed', 'MsgStream', - 'BaseExceptionGroup', 'Portal', 'RemoteActorError', 'breakpoint', 'context', 'current_actor', 'find_actor', + 'query_actor', 'get_arbiter', 'is_root_process', 'msg', 'open_actor_cluster', 'open_nursery', 'open_root_actor', + 'pause', 'post_mortem', + 'pause_from_sync', 'query_actor', 'run_daemon', 'stream', diff --git a/tractor/_debug.py b/tractor/_debug.py index b0482f18..d5f5f4f1 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -30,7 +30,6 @@ from contextlib import asynccontextmanager as acm from typing import ( Any, - Optional, Callable, AsyncIterator, AsyncGenerator, @@ -40,7 +39,10 @@ import pdbp import tractor import trio -from trio_typing import TaskStatus +from trio_typing import ( + TaskStatus, + # Task, +) from .log import get_logger from ._discovery import get_root @@ -69,10 +71,10 @@ class Lock: ''' repl: MultiActorPdb | None = None # placeholder for function to set a ``trio.Event`` on debugger exit - # pdb_release_hook: Optional[Callable] = None + # pdb_release_hook: Callable | None = None _trio_handler: Callable[ - [int, Optional[FrameType]], Any + [int, FrameType | None], Any ] | int | None = None # actor-wide variable pointing to current task name using debugger @@ -83,23 +85,23 @@ class Lock: # and must be cancelled if this actor is cancelled via IPC # request-message otherwise deadlocks with the parent actor may # ensure - _debugger_request_cs: Optional[trio.CancelScope] = None + _debugger_request_cs: trio.CancelScope | None = None # NOTE: set only in the root actor for the **local** root spawned task # which has acquired the lock (i.e. this is on the callee side of # the `lock_tty_for_child()` context entry). - _root_local_task_cs_in_debug: Optional[trio.CancelScope] = None + _root_local_task_cs_in_debug: trio.CancelScope | None = None # actor tree-wide actor uid that supposedly has the tty lock - global_actor_in_debug: Optional[tuple[str, str]] = None + global_actor_in_debug: tuple[str, str] = None - local_pdb_complete: Optional[trio.Event] = None - no_remote_has_tty: Optional[trio.Event] = None + local_pdb_complete: trio.Event | None = None + no_remote_has_tty: trio.Event | None = None # lock in root actor preventing multi-access to local tty _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() - _orig_sigint_handler: Optional[Callable] = None + _orig_sigint_handler: Callable | None = None _blocked: set[tuple[str, str]] = set() @classmethod @@ -110,6 +112,7 @@ def shield_sigint(cls): ) @classmethod + @pdbp.hideframe # XXX NOTE XXX see below in `.pause_from_sync()` def unshield_sigint(cls): # always restore ``trio``'s sigint handler. see notes below in # the pdb factory about the nightmare that is that code swapping @@ -129,10 +132,6 @@ def release(cls): if owner: raise - # actor-local state, irrelevant for non-root. - cls.global_actor_in_debug = None - cls.local_task_in_debug = None - try: # sometimes the ``trio`` might already be terminated in # which case this call will raise. @@ -143,6 +142,11 @@ def release(cls): cls.unshield_sigint() cls.repl = None + # actor-local state, irrelevant for non-root. + cls.global_actor_in_debug = None + cls.local_task_in_debug = None + + class TractorConfig(pdbp.DefaultConfig): ''' @@ -151,7 +155,7 @@ class TractorConfig(pdbp.DefaultConfig): ''' use_pygments: bool = True sticky_by_default: bool = False - enable_hidden_frames: bool = False + enable_hidden_frames: bool = True # much thanks @mdmintz for the hot tip! # fixes line spacing issue when resizing terminal B) @@ -228,26 +232,23 @@ async def _acquire_debug_lock_from_root_task( to the ``pdb`` repl. ''' - task_name = trio.lowlevel.current_task().name + task_name: str = trio.lowlevel.current_task().name + we_acquired: bool = False log.runtime( f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}" ) - - we_acquired = False - try: log.runtime( f"entering lock checkpoint, remote task: {task_name}:{uid}" ) - we_acquired = True - # NOTE: if the surrounding cancel scope from the # `lock_tty_for_child()` caller is cancelled, this line should # unblock and NOT leave us in some kind of # a "child-locked-TTY-but-child-is-uncontactable-over-IPC" # condition. await Lock._debug_lock.acquire() + we_acquired = True if Lock.no_remote_has_tty is None: # mark the tty lock as being in use so that the runtime @@ -374,7 +375,7 @@ async def wait_for_parent_stdin_hijack( This function is used by any sub-actor to acquire mutex access to the ``pdb`` REPL and thus the root's TTY for interactive debugging - (see below inside ``_breakpoint()``). It can be used to ensure that + (see below inside ``_pause()``). It can be used to ensure that an intermediate nursery-owning actor does not clobber its children if they are in debug (see below inside ``maybe_wait_for_debugger()``). @@ -440,17 +441,29 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]: return pdb, Lock.unshield_sigint -async def _breakpoint( +async def _pause( - debug_func, + debug_func: Callable | None = None, + release_lock_signal: trio.Event | None = None, # TODO: # shield: bool = False + task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED ) -> None: ''' - Breakpoint entry for engaging debugger instance sync-interaction, - from async code, executing in actor runtime (task). + A pause point (more commonly known as a "breakpoint") interrupt + instruction for engaging a blocking debugger instance to + conduct manual console-based-REPL-interaction from within + `tractor`'s async runtime, normally from some single-threaded + and currently executing actor-hosted-`trio`-task in some + (remote) process. + + NOTE: we use the semantics "pause" since it better encompasses + the entirety of the necessary global-runtime-state-mutation any + actor-task must access and lock in order to get full isolated + control over the process tree's root TTY: + https://en.wikipedia.org/wiki/Breakpoint ''' __tracebackhide__ = True @@ -559,10 +572,23 @@ async def _breakpoint( Lock.repl = pdb try: - # block here one (at the appropriate frame *up*) where - # ``breakpoint()`` was awaited and begin handling stdio. - log.debug("Entering the synchronous world of pdb") - debug_func(actor, pdb) + # breakpoint() + if debug_func is None: + # assert release_lock_signal, ( + # 'Must pass `release_lock_signal: trio.Event` if no ' + # 'trace func provided!' + # ) + print(f"{actor.uid} ENTERING WAIT") + task_status.started() + + # with trio.CancelScope(shield=True): + # await release_lock_signal.wait() + + else: + # block here one (at the appropriate frame *up*) where + # ``breakpoint()`` was awaited and begin handling stdio. + log.debug("Entering the synchronous world of pdb") + debug_func(actor, pdb) except bdb.BdbQuit: Lock.release() @@ -583,7 +609,7 @@ async def _breakpoint( def shield_sigint_handler( signum: int, frame: 'frame', # type: ignore # noqa - # pdb_obj: Optional[MultiActorPdb] = None, + # pdb_obj: MultiActorPdb | None = None, *args, ) -> None: @@ -597,7 +623,7 @@ def shield_sigint_handler( ''' __tracebackhide__ = True - uid_in_debug = Lock.global_actor_in_debug + uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug actor = tractor.current_actor() # print(f'{actor.uid} in HANDLER with ') @@ -615,14 +641,14 @@ def do_cancel(): else: raise KeyboardInterrupt - any_connected = False + any_connected: bool = False if uid_in_debug is not None: # try to see if the supposed (sub)actor in debug still # has an active connection to *this* actor, and if not # it's likely they aren't using the TTY lock / debugger # and we should propagate SIGINT normally. - chans = actor._peers.get(tuple(uid_in_debug)) + chans: list[tractor.Channel] = actor._peers.get(tuple(uid_in_debug)) if chans: any_connected = any(chan.connected() for chan in chans) if not any_connected: @@ -635,7 +661,7 @@ def do_cancel(): return do_cancel() # only set in the actor actually running the REPL - pdb_obj = Lock.repl + pdb_obj: MultiActorPdb | None = Lock.repl # root actor branch that reports whether or not a child # has locked debugger. @@ -693,7 +719,7 @@ def do_cancel(): ) return do_cancel() - task = Lock.local_task_in_debug + task: str | None = Lock.local_task_in_debug if ( task and pdb_obj @@ -708,8 +734,8 @@ def do_cancel(): # elif debug_mode(): else: # XXX: shouldn't ever get here? - print("WTFWTFWTF") - raise KeyboardInterrupt + raise RuntimeError("WTFWTFWTF") + # raise KeyboardInterrupt("WTFWTFWTF") # NOTE: currently (at least on ``fancycompleter`` 0.9.2) # it looks to be that the last command that was run (eg. ll) @@ -737,21 +763,18 @@ def do_cancel(): # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py - # XXX LEGACY: lol, see ``pdbpp`` issue: - # https://github.com/pdbpp/pdbpp/issues/496 - def _set_trace( actor: tractor.Actor | None = None, pdb: MultiActorPdb | None = None, ): __tracebackhide__ = True - actor = actor or tractor.current_actor() + actor: tractor.Actor = actor or tractor.current_actor() # start 2 levels up in user code - frame: Optional[FrameType] = sys._getframe() + frame: FrameType | None = sys._getframe() if frame: - frame = frame.f_back # type: ignore + frame: FrameType = frame.f_back # type: ignore if ( frame @@ -771,12 +794,76 @@ def _set_trace( Lock.local_task_in_debug = 'sync' pdb.set_trace(frame=frame) + # undo_ + + +# TODO: allow pausing from sync code, normally by remapping +# python's builtin breakpoint() hook to this runtime aware version. +def pause_from_sync() -> None: + print("ENTER SYNC PAUSE") + import greenback + __tracebackhide__ = True + actor: tractor.Actor = tractor.current_actor() + # task_can_release_tty_lock = trio.Event() + + # spawn bg task which will lock out the TTY, we poll + # just below until the release event is reporting that task as + # waiting.. not the most ideal but works for now ;) + greenback.await_( + actor._service_n.start(partial( + _pause, + debug_func=None, + # release_lock_signal=task_can_release_tty_lock, + )) + ) -breakpoint = partial( - _breakpoint, + db, undo_sigint = mk_mpdb() + Lock.local_task_in_debug = 'sync' + # db.config.enable_hidden_frames = True + + # we entered the global ``breakpoint()`` built-in from sync + # code? + frame: FrameType | None = sys._getframe() + # print(f'FRAME: {str(frame)}') + # assert not db._is_hidden(frame) + + frame: FrameType = frame.f_back # type: ignore + # print(f'FRAME: {str(frame)}') + # if not db._is_hidden(frame): + # pdbp.set_trace() + # db._hidden_frames.append( + # (frame, frame.f_lineno) + # ) + db.set_trace(frame=frame) + # NOTE XXX: see the `@pdbp.hideframe` decoration + # on `Lock.unshield_sigint()`.. I have NO CLUE why + # the next instruction's def frame is being shown + # in the tb but it seems to be something wonky with + # the way `pdb` core works? + # undo_sigint() + + # Lock.global_actor_in_debug = actor.uid + # Lock.release() + # task_can_release_tty_lock.set() + + +# using the "pause" semantics instead since +# that better covers actually somewhat "pausing the runtime" +# for this particular paralell task to do debugging B) +pause = partial( + _pause, _set_trace, ) +pp = pause # short-hand for "pause point" + + +async def breakpoint(**kwargs): + log.warning( + '`tractor.breakpoint()` is deprecated!\n' + 'Please use `tractor.pause()` instead!\n' + ) + await pause(**kwargs) def _post_mortem( @@ -801,7 +888,7 @@ def _post_mortem( post_mortem = partial( - _breakpoint, + _pause, _post_mortem, ) @@ -883,8 +970,7 @@ async def maybe_wait_for_debugger( # will make the pdb repl unusable. # Instead try to wait for pdb to be released before # tearing down. - - sub_in_debug = None + sub_in_debug: tuple[str, str] | None = None for _ in range(poll_steps): @@ -904,13 +990,15 @@ async def maybe_wait_for_debugger( debug_complete = Lock.no_remote_has_tty if ( - (debug_complete and - not debug_complete.is_set()) + debug_complete + and sub_in_debug is not None + and not debug_complete.is_set() ): - log.debug( + log.pdb( 'Root has errored but pdb is in use by ' f'child {sub_in_debug}\n' - 'Waiting on tty lock to release..') + 'Waiting on tty lock to release..' + ) await debug_complete.wait() diff --git a/tractor/_root.py b/tractor/_root.py index a2d31586..a19652df 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -89,7 +89,7 @@ async def open_root_actor( # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 builtin_bp_handler = sys.breakpointhook orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None) - os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace' + os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.pause_from_sync' # attempt to retreive ``trio``'s sigint handler and stash it # on our debugger lock state. @@ -235,9 +235,10 @@ async def open_root_actor( BaseExceptionGroup, ) as err: - entered = await _debug._maybe_enter_pm(err) - - if not entered and not is_multi_cancelled(err): + if ( + not (await _debug._maybe_enter_pm(err)) + and not is_multi_cancelled(err) + ): logger.exception("Root actor crashed:") # always re-raise diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 08ddabc4..c9e4bfe1 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -95,6 +95,10 @@ async def _invoke( treat_as_gen: bool = False failed_resp: bool = False + if _state.debug_mode(): + import greenback + await greenback.ensure_portal() + # possibly a traceback (not sure what typing is for this..) tb = None @@ -448,17 +452,18 @@ class Actor: (swappable) network protocols. - Each "actor" is ``trio.run()`` scheduled "runtime" composed of many - concurrent tasks in a single thread. The "runtime" tasks conduct - a slew of low(er) level functions to make it possible for message - passing between actors as well as the ability to create new actors - (aka new "runtimes" in new processes which are supervised via - a nursery construct). Each task which sends messages to a task in - a "peer" (not necessarily a parent-child, depth hierarchy)) is able - to do so via an "address", which maps IPC connections across memory - boundaries, and task request id which allows for per-actor - tasks to send and receive messages to specific peer-actor tasks with - which there is an ongoing RPC/IPC dialog. + Each "actor" is ``trio.run()`` scheduled "runtime" composed of + many concurrent tasks in a single thread. The "runtime" tasks + conduct a slew of low(er) level functions to make it possible + for message passing between actors as well as the ability to + create new actors (aka new "runtimes" in new processes which + are supervised via a nursery construct). Each task which sends + messages to a task in a "peer" (not necessarily a parent-child, + depth hierarchy) is able to do so via an "address", which maps + IPC connections across memory boundaries, and a task request id + which allows for per-actor tasks to send and receive messages + to specific peer-actor tasks with which there is an ongoing + RPC/IPC dialog. ''' # ugh, we need to get rid of this and replace with a "registry" sys @@ -756,6 +761,7 @@ async def _stream_handler( # deliver response to local caller/waiter await self._push_result(chan, cid, msg) + log.runtime('Waiting on actor nursery to exit..') await local_nursery.exited.wait() if disconnected: @@ -810,7 +816,7 @@ async def _stream_handler( db_cs and not db_cs.cancel_called ): - log.warning( + log.critical( f'STALE DEBUG LOCK DETECTED FOR {uid}' ) # TODO: figure out why this breaks tests.. @@ -1862,4 +1868,6 @@ async def unregister_actor( ) -> None: uid = (str(uid[0]), str(uid[1])) - self._registry.pop(uid) + entry: tuple = self._registry.pop(uid, None) + if entry is None: + log.warning(f'Request to de-register {uid} failed?') diff --git a/tractor/_shm.py b/tractor/_shm.py new file mode 100644 index 00000000..f8295105 --- /dev/null +++ b/tractor/_shm.py @@ -0,0 +1,833 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +SC friendly shared memory management geared at real-time +processing. + +Support for ``numpy`` compatible array-buffers is provided but is +considered optional within the context of this runtime-library. + +""" +from __future__ import annotations +from sys import byteorder +import time +from typing import Optional +from multiprocessing import shared_memory as shm +from multiprocessing.shared_memory import ( + SharedMemory, + ShareableList, +) + +from msgspec import Struct +import tractor + +from .log import get_logger + + +_USE_POSIX = getattr(shm, '_USE_POSIX', False) +if _USE_POSIX: + from _posixshmem import shm_unlink + + +try: + import numpy as np + from numpy.lib import recfunctions as rfn + import nptyping +except ImportError: + pass + + +log = get_logger(__name__) + + +def disable_mantracker(): + ''' + Disable all ``multiprocessing``` "resource tracking" machinery since + it's an absolute multi-threaded mess of non-SC madness. + + ''' + from multiprocessing import resource_tracker as mantracker + + # Tell the "resource tracker" thing to fuck off. + class ManTracker(mantracker.ResourceTracker): + def register(self, name, rtype): + pass + + def unregister(self, name, rtype): + pass + + def ensure_running(self): + pass + + # "know your land and know your prey" + # https://www.dailymotion.com/video/x6ozzco + mantracker._resource_tracker = ManTracker() + mantracker.register = mantracker._resource_tracker.register + mantracker.ensure_running = mantracker._resource_tracker.ensure_running + mantracker.unregister = mantracker._resource_tracker.unregister + mantracker.getfd = mantracker._resource_tracker.getfd + + +disable_mantracker() + + +class SharedInt: + ''' + Wrapper around a single entry shared memory array which + holds an ``int`` value used as an index counter. + + ''' + def __init__( + self, + shm: SharedMemory, + ) -> None: + self._shm = shm + + @property + def value(self) -> int: + return int.from_bytes(self._shm.buf, byteorder) + + @value.setter + def value(self, value) -> None: + self._shm.buf[:] = value.to_bytes(self._shm.size, byteorder) + + def destroy(self) -> None: + if _USE_POSIX: + # We manually unlink to bypass all the "resource tracker" + # nonsense meant for non-SC systems. + name = self._shm.name + try: + shm_unlink(name) + except FileNotFoundError: + # might be a teardown race here? + log.warning(f'Shm for {name} already unlinked?') + + +class NDToken(Struct, frozen=True): + ''' + Internal represenation of a shared memory ``numpy`` array "token" + which can be used to key and load a system (OS) wide shm entry + and correctly read the array by type signature. + + This type is msg safe. + + ''' + shm_name: str # this servers as a "key" value + shm_first_index_name: str + shm_last_index_name: str + dtype_descr: tuple + size: int # in struct-array index / row terms + + # TODO: use nptyping here on dtypes + @property + def dtype(self) -> list[tuple[str, str, tuple[int, ...]]]: + return np.dtype( + list( + map(tuple, self.dtype_descr) + ) + ).descr + + def as_msg(self): + return self.to_dict() + + @classmethod + def from_msg(cls, msg: dict) -> NDToken: + if isinstance(msg, NDToken): + return msg + + # TODO: native struct decoding + # return _token_dec.decode(msg) + + msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) + return NDToken(**msg) + + +# _token_dec = msgspec.msgpack.Decoder(NDToken) + +# TODO: this api? +# _known_tokens = tractor.ActorVar('_shm_tokens', {}) +# _known_tokens = tractor.ContextStack('_known_tokens', ) +# _known_tokens = trio.RunVar('shms', {}) + +# TODO: this should maybe be provided via +# a `.trionics.maybe_open_context()` wrapper factory? +# process-local store of keys to tokens +_known_tokens: dict[str, NDToken] = {} + + +def get_shm_token(key: str) -> NDToken | None: + ''' + Convenience func to check if a token + for the provided key is known by this process. + + Returns either the ``numpy`` token or a string for a shared list. + + ''' + return _known_tokens.get(key) + + +def _make_token( + key: str, + size: int, + dtype: np.dtype, + +) -> NDToken: + ''' + Create a serializable token that can be used + to access a shared array. + + ''' + return NDToken( + shm_name=key, + shm_first_index_name=key + "_first", + shm_last_index_name=key + "_last", + dtype_descr=tuple(np.dtype(dtype).descr), + size=size, + ) + + +class ShmArray: + ''' + A shared memory ``numpy.ndarray`` API. + + An underlying shared memory buffer is allocated based on + a user specified ``numpy.ndarray``. This fixed size array + can be read and written to by pushing data both onto the "front" + or "back" of a set index range. The indexes for the "first" and + "last" index are themselves stored in shared memory (accessed via + ``SharedInt`` interfaces) values such that multiple processes can + interact with the same array using a synchronized-index. + + ''' + def __init__( + self, + shmarr: np.ndarray, + first: SharedInt, + last: SharedInt, + shm: SharedMemory, + # readonly: bool = True, + ) -> None: + self._array = shmarr + + # indexes for first and last indices corresponding + # to fille data + self._first = first + self._last = last + + self._len = len(shmarr) + self._shm = shm + self._post_init: bool = False + + # pushing data does not write the index (aka primary key) + self._write_fields: list[str] | None = None + dtype = shmarr.dtype + if dtype.fields: + self._write_fields = list(shmarr.dtype.fields.keys())[1:] + + # TODO: ringbuf api? + + @property + def _token(self) -> NDToken: + return NDToken( + shm_name=self._shm.name, + shm_first_index_name=self._first._shm.name, + shm_last_index_name=self._last._shm.name, + dtype_descr=tuple(self._array.dtype.descr), + size=self._len, + ) + + @property + def token(self) -> dict: + """Shared memory token that can be serialized and used by + another process to attach to this array. + """ + return self._token.as_msg() + + @property + def index(self) -> int: + return self._last.value % self._len + + @property + def array(self) -> np.ndarray: + ''' + Return an up-to-date ``np.ndarray`` view of the + so-far-written data to the underlying shm buffer. + + ''' + a = self._array[self._first.value:self._last.value] + + # first, last = self._first.value, self._last.value + # a = self._array[first:last] + + # TODO: eventually comment this once we've not seen it in the + # wild in a long time.. + # XXX: race where first/last indexes cause a reader + # to load an empty array.. + if len(a) == 0 and self._post_init: + raise RuntimeError('Empty array race condition hit!?') + # breakpoint() + + return a + + def ustruct( + self, + fields: Optional[list[str]] = None, + + # type that all field values will be cast to + # in the returned view. + common_dtype: np.dtype = float, + + ) -> np.ndarray: + + array = self._array + + if fields: + selection = array[fields] + # fcount = len(fields) + else: + selection = array + # fcount = len(array.dtype.fields) + + # XXX: manual ``.view()`` attempt that also doesn't work. + # uview = selection.view( + # dtype=' np.ndarray: + ''' + Return the last ``length``'s worth of ("row") entries from the + array. + + ''' + return self.array[-length:] + + def push( + self, + data: np.ndarray, + + field_map: Optional[dict[str, str]] = None, + prepend: bool = False, + update_first: bool = True, + start: int | None = None, + + ) -> int: + ''' + Ring buffer like "push" to append data + into the buffer and return updated "last" index. + + NB: no actual ring logic yet to give a "loop around" on overflow + condition, lel. + + ''' + length = len(data) + + if prepend: + index = (start or self._first.value) - length + + if index < 0: + raise ValueError( + f'Array size of {self._len} was overrun during prepend.\n' + f'You have passed {abs(index)} too many datums.' + ) + + else: + index = start if start is not None else self._last.value + + end = index + length + + if field_map: + src_names, dst_names = zip(*field_map.items()) + else: + dst_names = src_names = self._write_fields + + try: + self._array[ + list(dst_names) + ][index:end] = data[list(src_names)][:] + + # NOTE: there was a race here between updating + # the first and last indices and when the next reader + # tries to access ``.array`` (which due to the index + # overlap will be empty). Pretty sure we've fixed it now + # but leaving this here as a reminder. + if ( + prepend + and update_first + and length + ): + assert index < self._first.value + + if ( + index < self._first.value + and update_first + ): + assert prepend, 'prepend=True not passed but index decreased?' + self._first.value = index + + elif not prepend: + self._last.value = end + + self._post_init = True + return end + + except ValueError as err: + if field_map: + raise + + # should raise if diff detected + self.diff_err_fields(data) + raise err + + def diff_err_fields( + self, + data: np.ndarray, + ) -> None: + # reraise with any field discrepancy + our_fields, their_fields = ( + set(self._array.dtype.fields), + set(data.dtype.fields), + ) + + only_in_ours = our_fields - their_fields + only_in_theirs = their_fields - our_fields + + if only_in_ours: + raise TypeError( + f"Input array is missing field(s): {only_in_ours}" + ) + elif only_in_theirs: + raise TypeError( + f"Input array has unknown field(s): {only_in_theirs}" + ) + + # TODO: support "silent" prepends that don't update ._first.value? + def prepend( + self, + data: np.ndarray, + ) -> int: + end = self.push(data, prepend=True) + assert end + + def close(self) -> None: + self._first._shm.close() + self._last._shm.close() + self._shm.close() + + def destroy(self) -> None: + if _USE_POSIX: + # We manually unlink to bypass all the "resource tracker" + # nonsense meant for non-SC systems. + shm_unlink(self._shm.name) + + self._first.destroy() + self._last.destroy() + + def flush(self) -> None: + # TODO: flush to storage backend like markestore? + ... + + +def open_shm_ndarray( + size: int, + key: str | None = None, + dtype: np.dtype | None = None, + append_start_index: int | None = None, + readonly: bool = False, + +) -> ShmArray: + ''' + Open a memory shared ``numpy`` using the standard library. + + This call unlinks (aka permanently destroys) the buffer on teardown + and thus should be used from the parent-most accessor (process). + + ''' + # create new shared mem segment for which we + # have write permission + a = np.zeros(size, dtype=dtype) + a['index'] = np.arange(len(a)) + + shm = SharedMemory( + name=key, + create=True, + size=a.nbytes + ) + array = np.ndarray( + a.shape, + dtype=a.dtype, + buffer=shm.buf + ) + array[:] = a[:] + array.setflags(write=int(not readonly)) + + token = _make_token( + key=key, + size=size, + dtype=dtype, + ) + + # create single entry arrays for storing an first and last indices + first = SharedInt( + shm=SharedMemory( + name=token.shm_first_index_name, + create=True, + size=4, # std int + ) + ) + + last = SharedInt( + shm=SharedMemory( + name=token.shm_last_index_name, + create=True, + size=4, # std int + ) + ) + + # Start the "real-time" append-updated (or "pushed-to") section + # after some start index: ``append_start_index``. This allows appending + # from a start point in the array which isn't the 0 index and looks + # something like, + # ------------------------- + # | | i + # _________________________ + # <-------------> <-------> + # history real-time + # + # Once fully "prepended", the history section will leave the + # ``ShmArray._start.value: int = 0`` and the yet-to-be written + # real-time section will start at ``ShmArray.index: int``. + + # this sets the index to nearly 2/3rds into the the length of + # the buffer leaving at least a "days worth of second samples" + # for the real-time section. + if append_start_index is None: + append_start_index = round(size * 0.616) + + last.value = first.value = append_start_index + + shmarr = ShmArray( + array, + first, + last, + shm, + ) + + assert shmarr._token == token + _known_tokens[key] = shmarr.token + + # "unlink" created shm on process teardown by + # pushing teardown calls onto actor context stack + stack = tractor.current_actor().lifetime_stack + stack.callback(shmarr.close) + stack.callback(shmarr.destroy) + + return shmarr + + +def attach_shm_ndarray( + token: tuple[str, str, tuple[str, str]], + readonly: bool = True, + +) -> ShmArray: + ''' + Attach to an existing shared memory array previously + created by another process using ``open_shared_array``. + + No new shared mem is allocated but wrapper types for read/write + access are constructed. + + ''' + token = NDToken.from_msg(token) + key = token.shm_name + + if key in _known_tokens: + assert NDToken.from_msg(_known_tokens[key]) == token, "WTF" + + # XXX: ugh, looks like due to the ``shm_open()`` C api we can't + # actually place files in a subdir, see discussion here: + # https://stackoverflow.com/a/11103289 + + # attach to array buffer and view as per dtype + _err: Optional[Exception] = None + for _ in range(3): + try: + shm = SharedMemory( + name=key, + create=False, + ) + break + except OSError as oserr: + _err = oserr + time.sleep(0.1) + else: + if _err: + raise _err + + shmarr = np.ndarray( + (token.size,), + dtype=token.dtype, + buffer=shm.buf + ) + shmarr.setflags(write=int(not readonly)) + + first = SharedInt( + shm=SharedMemory( + name=token.shm_first_index_name, + create=False, + size=4, # std int + ), + ) + last = SharedInt( + shm=SharedMemory( + name=token.shm_last_index_name, + create=False, + size=4, # std int + ), + ) + + # make sure we can read + first.value + + sha = ShmArray( + shmarr, + first, + last, + shm, + ) + # read test + sha.array + + # Stash key -> token knowledge for future queries + # via `maybe_opepn_shm_array()` but only after we know + # we can attach. + if key not in _known_tokens: + _known_tokens[key] = token + + # "close" attached shm on actor teardown + tractor.current_actor().lifetime_stack.callback(sha.close) + + return sha + + +def maybe_open_shm_ndarray( + key: str, # unique identifier for segment + size: int, + dtype: np.dtype | None = None, + append_start_index: int = 0, + readonly: bool = True, + +) -> tuple[ShmArray, bool]: + ''' + Attempt to attach to a shared memory block using a "key" lookup + to registered blocks in the users overall "system" registry + (presumes you don't have the block's explicit token). + + This function is meant to solve the problem of discovering whether + a shared array token has been allocated or discovered by the actor + running in **this** process. Systems where multiple actors may seek + to access a common block can use this function to attempt to acquire + a token as discovered by the actors who have previously stored + a "key" -> ``NDToken`` map in an actor local (aka python global) + variable. + + If you know the explicit ``NDToken`` for your memory segment instead + use ``attach_shm_array``. + + ''' + try: + # see if we already know this key + token = _known_tokens[key] + return ( + attach_shm_ndarray( + token=token, + readonly=readonly, + ), + False, # not newly opened + ) + except KeyError: + log.warning(f"Could not find {key} in shms cache") + if dtype: + token = _make_token( + key, + size=size, + dtype=dtype, + ) + else: + + try: + return ( + attach_shm_ndarray( + token=token, + readonly=readonly, + ), + False, + ) + except FileNotFoundError: + log.warning(f"Could not attach to shm with token {token}") + + # This actor does not know about memory + # associated with the provided "key". + # Attempt to open a block and expect + # to fail if a block has been allocated + # on the OS by someone else. + return ( + open_shm_ndarray( + key=key, + size=size, + dtype=dtype, + append_start_index=append_start_index, + readonly=readonly, + ), + True, + ) + + +class ShmList(ShareableList): + ''' + Carbon copy of ``.shared_memory.ShareableList`` with a few + enhancements: + + - readonly mode via instance var flag `._readonly: bool` + - ``.__getitem__()`` accepts ``slice`` inputs + - exposes the underlying buffer "name" as a ``.key: str`` + + ''' + def __init__( + self, + sequence: list | None = None, + *, + name: str | None = None, + readonly: bool = True + + ) -> None: + self._readonly = readonly + self._key = name + return super().__init__( + sequence=sequence, + name=name, + ) + + @property + def key(self) -> str: + return self._key + + @property + def readonly(self) -> bool: + return self._readonly + + def __setitem__( + self, + position, + value, + + ) -> None: + + # mimick ``numpy`` error + if self._readonly: + raise ValueError('assignment destination is read-only') + + return super().__setitem__(position, value) + + def __getitem__( + self, + indexish, + ) -> list: + + # NOTE: this is a non-writeable view (copy?) of the buffer + # in a new list instance. + if isinstance(indexish, slice): + return list(self)[indexish] + + return super().__getitem__(indexish) + + # TODO: should we offer a `.array` and `.push()` equivalent + # to the `ShmArray`? + # currently we have the following limitations: + # - can't write slices of input using traditional slice-assign + # syntax due to the ``ShareableList.__setitem__()`` implementation. + # - ``list(shmlist)`` returns a non-mutable copy instead of + # a writeable view which would be handier numpy-style ops. + + +def open_shm_list( + key: str, + sequence: list | None = None, + size: int = int(2 ** 10), + dtype: float | int | bool | str | bytes | None = float, + readonly: bool = True, + +) -> ShmList: + + if sequence is None: + default = { + float: 0., + int: 0, + bool: True, + str: 'doggy', + None: None, + }[dtype] + sequence = [default] * size + + shml = ShmList( + sequence=sequence, + name=key, + readonly=readonly, + ) + + # "close" attached shm on actor teardown + try: + actor = tractor.current_actor() + actor.lifetime_stack.callback(shml.shm.close) + actor.lifetime_stack.callback(shml.shm.unlink) + except RuntimeError: + log.warning('tractor runtime not active, skipping teardown steps') + + return shml + + +def attach_shm_list( + key: str, + readonly: bool = False, + +) -> ShmList: + + return ShmList( + name=key, + readonly=readonly, + ) diff --git a/tractor/log.py b/tractor/log.py index 1ea99315..5710e83e 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -193,15 +193,39 @@ def get_logger( ''' log = rlog = logging.getLogger(_root_name) - if name and name != _proj_name: + if ( + name + and name != _proj_name + ): - # handling for modules that use ``get_logger(__name__)`` to - # avoid duplicate project-package token in msg output - rname, _, tail = name.partition('.') - if rname == _root_name: - name = tail + # NOTE: for handling for modules that use ``get_logger(__name__)`` + # we make the following stylistic choice: + # - always avoid duplicate project-package token + # in msg output: i.e. tractor.tractor _ipc.py in header + # looks ridiculous XD + # - never show the leaf module name in the {name} part + # since in python the {filename} is always this same + # module-file. + + sub_name: None | str = None + rname, _, sub_name = name.partition('.') + pkgpath, _, modfilename = sub_name.rpartition('.') + + # NOTE: for tractor itself never include the last level + # module key in the name such that something like: eg. + # 'tractor.trionics._broadcast` only includes the first + # 2 tokens in the (coloured) name part. + if rname == 'tractor': + sub_name = pkgpath + + if _root_name in sub_name: + duplicate, _, sub_name = sub_name.partition('.') + + if not sub_name: + log = rlog + else: + log = rlog.getChild(sub_name) - log = rlog.getChild(name) log.level = rlog.level # add our actor-task aware adapter which will dynamically look up @@ -254,3 +278,7 @@ def get_console_log( def get_loglevel() -> str: return _default_loglevel + + +# global module logger for tractor itself +log = get_logger('tractor') diff --git a/tractor/msg.py b/tractor/msg.py index 9af3ccd7..ca34dba8 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -43,38 +43,62 @@ # - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type from __future__ import annotations +from inspect import isfunction from pkgutil import resolve_name class NamespacePath(str): ''' - A serializeable description of a (function) Python object location - described by the target's module path and namespace key meant as - a message-native "packet" to allows actors to point-and-load objects - by absolute reference. + A serializeable description of a (function) Python object + location described by the target's module path and namespace + key meant as a message-native "packet" to allows actors to + point-and-load objects by an absolute ``str`` (and thus + serializable) reference. ''' - _ref: object = None + _ref: object | type | None = None - def load_ref(self) -> object: + def load_ref(self) -> object | type: if self._ref is None: self._ref = resolve_name(self) return self._ref - def to_tuple( - self, - - ) -> tuple[str, str]: - ref = self.load_ref() - return ref.__module__, getattr(ref, '__name__', '') + @staticmethod + def _mk_fqnp(ref: type | object) -> tuple[str, str]: + ''' + Generate a minial ``str`` pair which describes a python + object's namespace path and object/type name. + + In more precise terms something like: + - 'py.namespace.path:object_name', + - eg.'tractor.msg:NamespacePath' will be the ``str`` form + of THIS type XD + + ''' + if ( + isinstance(ref, object) + and not isfunction(ref) + ): + name: str = type(ref).__name__ + else: + name: str = getattr(ref, '__name__') + + # fully qualified namespace path, tuple. + fqnp: tuple[str, str] = ( + ref.__module__, + name, + ) + return fqnp @classmethod def from_ref( cls, - ref, + ref: type | object, ) -> NamespacePath: - return cls(':'.join( - (ref.__module__, - getattr(ref, '__name__', '')) - )) + + fqnp: tuple[str, str] = cls._mk_fqnp(ref) + return cls(':'.join(fqnp)) + + def to_tuple(self) -> tuple[str, str]: + return self._mk_fqnp(self.load_ref()) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index be3ac8d3..788181e6 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -28,7 +28,6 @@ Callable, AsyncIterator, Awaitable, - Optional, ) import trio @@ -65,9 +64,9 @@ class LinkedTaskChannel(trio.abc.Channel): _trio_exited: bool = False # set after ``asyncio.create_task()`` - _aio_task: Optional[asyncio.Task] = None - _aio_err: Optional[BaseException] = None - _broadcaster: Optional[BroadcastReceiver] = None + _aio_task: asyncio.Task | None = None + _aio_err: BaseException | None = None + _broadcaster: BroadcastReceiver | None = None async def aclose(self) -> None: await self._from_aio.aclose() @@ -188,7 +187,7 @@ def _run_asyncio_task( cancel_scope = trio.CancelScope() aio_task_complete = trio.Event() - aio_err: Optional[BaseException] = None + aio_err: BaseException | None = None chan = LinkedTaskChannel( aio_q, # asyncio.Queue @@ -263,7 +262,7 @@ def cancel_trio(task: asyncio.Task) -> None: ''' nonlocal chan aio_err = chan._aio_err - task_err: Optional[BaseException] = None + task_err: BaseException | None = None # only to avoid ``asyncio`` complaining about uncaptured # task exceptions @@ -329,11 +328,11 @@ async def translate_aio_errors( ''' trio_task = trio.lowlevel.current_task() - aio_err: Optional[BaseException] = None + aio_err: BaseException | None = None # TODO: make thisi a channel method? def maybe_raise_aio_err( - err: Optional[Exception] = None + err: Exception | None = None ) -> None: aio_err = chan._aio_err if (