diff --git a/examples/debugging/asyncio_bp.py b/examples/debugging/asyncio_bp.py new file mode 100644 index 000000000..b32ad1d8f --- /dev/null +++ b/examples/debugging/asyncio_bp.py @@ -0,0 +1,117 @@ +import asyncio + +import trio +import tractor +from tractor import to_asyncio + + +async def aio_sleep_forever(): + await asyncio.sleep(float('inf')) + + +async def bp_then_error( + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, + + raise_after_bp: bool = True, + +) -> None: + + # sync with ``trio``-side (caller) task + to_trio.send_nowait('start') + + # NOTE: what happens here inside the hook needs some refinement.. + # => seems like it's still `._debug._set_trace()` but + # we set `Lock.local_task_in_debug = 'sync'`, we probably want + # some further, at least, meta-data about the task/actoq in debug + # in terms of making it clear it's asyncio mucking about. + breakpoint() + + # short checkpoint / delay + await asyncio.sleep(0.5) + + if raise_after_bp: + raise ValueError('blah') + + # TODO: test case with this so that it gets cancelled? + else: + # XXX NOTE: this is required in order to get the SIGINT-ignored + # hang case documented in the module script section! + await aio_sleep_forever() + + +@tractor.context +async def trio_ctx( + ctx: tractor.Context, + bp_before_started: bool = False, +): + + # this will block until the ``asyncio`` task sends a "first" + # message, see first line in above func. + async with ( + + to_asyncio.open_channel_from( + bp_then_error, + raise_after_bp=not bp_before_started, + ) as (first, chan), + + trio.open_nursery() as n, + ): + + assert first == 'start' + + if bp_before_started: + await tractor.breakpoint() + + await ctx.started(first) + + n.start_soon( + to_asyncio.run_task, + aio_sleep_forever, + ) + await trio.sleep_forever() + + +async def main( + bps_all_over: bool = False, + +) -> None: + + async with tractor.open_nursery() as n: + + p = await n.start_actor( + 'aio_daemon', + enable_modules=[__name__], + infect_asyncio=True, + debug_mode=True, + loglevel='cancel', + ) + + async with p.open_context( + trio_ctx, + bp_before_started=bps_all_over, + ) as (ctx, first): + + assert first == 'start' + + if bps_all_over: + await tractor.breakpoint() + + # await trio.sleep_forever() + await ctx.cancel() + assert 0 + + # TODO: case where we cancel from trio-side while asyncio task + # has debugger lock? + # await p.cancel_actor() + + +if __name__ == '__main__': + + # works fine B) + trio.run(main) + + # will hang and ignores SIGINT !! + # NOTE: you'll need to send a SIGQUIT (via ctl-\) to kill it + # manually.. + # trio.run(main, True) diff --git a/examples/debugging/restore_builtin_breakpoint.py b/examples/debugging/restore_builtin_breakpoint.py new file mode 100644 index 000000000..6e141dfcc --- /dev/null +++ b/examples/debugging/restore_builtin_breakpoint.py @@ -0,0 +1,24 @@ +import os +import sys + +import trio +import tractor + + +async def main() -> None: + async with tractor.open_nursery(debug_mode=True) as an: + + assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace' + + # TODO: an assert that verifies the hook has indeed been, hooked + # XD + assert sys.breakpointhook is not tractor._debug._set_trace + + breakpoint() + + # TODO: an assert that verifies the hook is unhooked.. + assert sys.breakpointhook + breakpoint() + +if __name__ == '__main__': + trio.run(main) diff --git a/nooz/356.trivial.rst b/nooz/356.trivial.rst new file mode 100644 index 000000000..ba0f3f279 --- /dev/null +++ b/nooz/356.trivial.rst @@ -0,0 +1,7 @@ +Drop `trio.Process.aclose()` usage, copy into our spawning code. + +The details are laid out in https://github.com/goodboy/tractor/issues/330. +`trio` changed is process running quite some time ago, this just copies +out the small bit we needed (from the old `.aclose()`) for hard kills +where a soft runtime cancel request fails and our "zombie killer" +implementation kicks in. diff --git a/setup.py b/setup.py index 88d661238..d26deb9b1 100755 --- a/setup.py +++ b/setup.py @@ -26,12 +26,12 @@ setup( name="tractor", version='0.1.0a6dev0', # alpha zone - description='structured concurrrent "actors"', + description='structured concurrrent `trio`-"actors"', long_description=readme, license='AGPLv3', author='Tyler Goodlet', maintainer='Tyler Goodlet', - maintainer_email='jgbt@protonmail.com', + maintainer_email='goodboy_foss@protonmail.com', url='https://github.com/goodboy/tractor', platforms=['linux', 'windows'], packages=[ @@ -52,16 +52,14 @@ # tooling 'tricycle', 'trio_typing', - - # tooling 'colorlog', 'wrapt', - # serialization + # IPC serialization 'msgspec', # debug mode REPL - 'pdbpp', + 'pdbp', # pip ref docs on these specs: # https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples @@ -73,10 +71,9 @@ # https://github.com/pdbpp/fancycompleter/issues/37 'pyreadline3 ; platform_system == "Windows"', - ], tests_require=['pytest'], - python_requires=">=3.9", + python_requires=">=3.10", keywords=[ 'trio', 'async', diff --git a/tests/test_clustering.py b/tests/test_clustering.py index df3d83570..02b1f8fab 100644 --- a/tests/test_clustering.py +++ b/tests/test_clustering.py @@ -49,7 +49,7 @@ async def worker( await ctx.started() async with ctx.open_stream( - backpressure=True, + allow_overruns=True, ) as stream: # TODO: this with the below assert causes a hang bug? diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index c92c4407a..4efc6319b 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -13,7 +13,10 @@ import pytest import trio import tractor -from tractor._exceptions import StreamOverrun +from tractor._exceptions import ( + StreamOverrun, + ContextCancelled, +) from conftest import tractor_test @@ -91,7 +94,10 @@ async def not_started_but_stream_opened( @pytest.mark.parametrize( 'target', - [too_many_starteds, not_started_but_stream_opened], + [ + too_many_starteds, + not_started_but_stream_opened, + ], ids='misuse_type={}'.format, ) def test_started_misuse(target): @@ -228,6 +234,88 @@ async def main(): trio.run(main) +@pytest.mark.parametrize( + 'callee_returns_early', + [True, False], + ids=lambda item: f'callee_returns_early={item}' +) +@pytest.mark.parametrize( + 'cancel_method', + ['ctx', 'portal'], + ids=lambda item: f'cancel_method={item}' +) +@pytest.mark.parametrize( + 'chk_ctx_result_before_exit', + [True, False], + ids=lambda item: f'chk_ctx_result_before_exit={item}' +) +def test_caller_cancels( + cancel_method: str, + chk_ctx_result_before_exit: bool, + callee_returns_early: bool, +): + ''' + Verify that when the opening side of a context (aka the caller) + cancels that context, the ctx does not raise a cancelled when + either calling `.result()` or on context exit. + + ''' + async def check_canceller( + ctx: tractor.Context, + ) -> None: + # should not raise yet return the remote + # context cancelled error. + res = await ctx.result() + + if callee_returns_early: + assert res == 'yo' + + else: + err = res + assert isinstance(err, ContextCancelled) + assert ( + tuple(err.canceller) + == + tractor.current_actor().uid + ) + + async def main(): + async with tractor.open_nursery() as nursery: + portal = await nursery.start_actor( + 'simple_context', + enable_modules=[__name__], + ) + timeout = 0.5 if not callee_returns_early else 2 + with trio.fail_after(timeout): + async with portal.open_context( + simple_setup_teardown, + data=10, + block_forever=not callee_returns_early, + ) as (ctx, sent): + + if callee_returns_early: + # ensure we block long enough before sending + # a cancel such that the callee has already + # returned it's result. + await trio.sleep(0.5) + + if cancel_method == 'ctx': + await ctx.cancel() + else: + await portal.cancel_actor() + + if chk_ctx_result_before_exit: + await check_canceller(ctx) + + if not chk_ctx_result_before_exit: + await check_canceller(ctx) + + if cancel_method != 'portal': + await portal.cancel_actor() + + trio.run(main) + + # basic stream terminations: # - callee context closes without using stream # - caller context closes without using stream @@ -506,7 +594,6 @@ async def test_callee_cancels_before_started(): cancel_self, ) as (ctx, sent): async with ctx.open_stream(): - await trio.sleep_forever() # raises a special cancel signal @@ -559,7 +646,6 @@ async def keep_sending_from_callee( 'overrun_by', [ ('caller', 1, never_open_stream), - ('cancel_caller_during_overrun', 1, never_open_stream), ('callee', 0, keep_sending_from_callee), ], ids='overrun_condition={}'.format, @@ -589,14 +675,13 @@ async def main(): if 'caller' in overrunner: async with ctx.open_stream() as stream: + + # itersend +1 msg more then the buffer size + # to cause the most basic overrun. for i in range(buf_size): print(f'sending {i}') await stream.send(i) - if 'cancel' in overrunner: - # without this we block waiting on the child side - await ctx.cancel() - else: # expect overrun error to be relayed back # and this sleep interrupted @@ -610,7 +695,9 @@ async def main(): # 2 overrun cases and the no overrun case (which pushes right up to # the msg limit) - if overrunner == 'caller' or 'cance' in overrunner: + if ( + overrunner == 'caller' + ): with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) @@ -634,40 +721,102 @@ async def echo_back_sequence( ctx: tractor.Context, seq: list[int], - msg_buffer_size: Optional[int] = None, + wait_for_cancel: bool, + allow_overruns_side: str, + be_slow: bool = False, + msg_buffer_size: int = 1, ) -> None: ''' - Send endlessly on the calleee stream. + Send endlessly on the calleee stream using a small buffer size + setting on the contex to simulate backlogging that would normally + cause overruns. ''' + # NOTE: ensure that if the caller is expecting to cancel this task + # that we stay echoing much longer then they are so we don't + # return early instead of receive the cancel msg. + total_batches: int = 1000 if wait_for_cancel else 6 + await ctx.started() + # await tractor.breakpoint() async with ctx.open_stream( msg_buffer_size=msg_buffer_size, + + # literally the point of this test XD + allow_overruns=(allow_overruns_side in {'child', 'both'}), ) as stream: - seq = list(seq) # bleh, `msgpack`... - count = 0 - while count < 3: + # ensure mem chan settings are correct + assert ( + ctx._send_chan._state.max_buffer_size + == + msg_buffer_size + ) + + seq = list(seq) # bleh, msgpack sometimes ain't decoded right + for _ in range(total_batches): batch = [] async for msg in stream: batch.append(msg) if batch == seq: break + if be_slow: + await trio.sleep(0.05) + + print('callee waiting on next') + for msg in batch: print(f'callee sending {msg}') await stream.send(msg) - count += 1 + print( + 'EXITING CALLEEE:\n' + f'{ctx.cancel_called_remote}' + ) + return 'yo' - return 'yo' - -def test_stream_backpressure(): +@pytest.mark.parametrize( + # aka the side that will / should raise + # and overrun under normal conditions. + 'allow_overruns_side', + ['parent', 'child', 'none', 'both'], + ids=lambda item: f'allow_overruns_side={item}' +) +@pytest.mark.parametrize( + # aka the side that will / should raise + # and overrun under normal conditions. + 'slow_side', + ['parent', 'child'], + ids=lambda item: f'slow_side={item}' +) +@pytest.mark.parametrize( + 'cancel_ctx', + [True, False], + ids=lambda item: f'cancel_ctx={item}' +) +def test_maybe_allow_overruns_stream( + cancel_ctx: bool, + slow_side: str, + allow_overruns_side: str, + loglevel: str, +): ''' Demonstrate small overruns of each task back and forth - on a stream not raising any errors by default. + on a stream not raising any errors by default by setting + the ``allow_overruns=True``. + + The original idea here was to show that if you set the feeder mem + chan to a size smaller then the # of msgs sent you could could not + get a `StreamOverrun` crash plus maybe get all the msgs that were + sent. The problem with the "real backpressure" case is that due to + the current arch it can result in the msg loop being blocked and thus + blocking cancellation - which is like super bad. So instead this test + had to be adjusted to more or less just "not send overrun errors" so + as to handle the case where the sender just moreso cares about not getting + errored out when it send to fast.. ''' async def main(): @@ -675,38 +824,104 @@ async def main(): portal = await n.start_actor( 'callee_sends_forever', enable_modules=[__name__], + loglevel=loglevel, + + # debug_mode=True, ) - seq = list(range(3)) + seq = list(range(10)) async with portal.open_context( echo_back_sequence, seq=seq, - msg_buffer_size=1, + wait_for_cancel=cancel_ctx, + be_slow=(slow_side == 'child'), + allow_overruns_side=allow_overruns_side, ) as (ctx, sent): + assert sent is None - async with ctx.open_stream(msg_buffer_size=1) as stream: - count = 0 - while count < 3: + async with ctx.open_stream( + msg_buffer_size=1 if slow_side == 'parent' else None, + allow_overruns=(allow_overruns_side in {'parent', 'both'}), + ) as stream: + + total_batches: int = 2 + for _ in range(total_batches): for msg in seq: - print(f'caller sending {msg}') + # print(f'root tx {msg}') await stream.send(msg) - await trio.sleep(0.1) + if slow_side == 'parent': + # NOTE: we make the parent slightly + # slower, when it is slow, to make sure + # that in the overruns everywhere case + await trio.sleep(0.16) batch = [] async for msg in stream: + print(f'root rx {msg}') batch.append(msg) if batch == seq: break - count += 1 + if cancel_ctx: + # cancel the remote task + print('sending root side cancel') + await ctx.cancel() + + res = await ctx.result() - # here the context should return - assert await ctx.result() == 'yo' + if cancel_ctx: + assert isinstance(res, ContextCancelled) + assert tuple(res.canceller) == tractor.current_actor().uid + + else: + print(f'RX ROOT SIDE RESULT {res}') + assert res == 'yo' # cancel the daemon await portal.cancel_actor() - trio.run(main) + if ( + allow_overruns_side == 'both' + or slow_side == allow_overruns_side + ): + trio.run(main) + + elif ( + slow_side != allow_overruns_side + ): + + with pytest.raises(tractor.RemoteActorError) as excinfo: + trio.run(main) + + err = excinfo.value + + if ( + allow_overruns_side == 'none' + ): + # depends on timing is is racy which side will + # overrun first :sadkitty: + + # NOTE: i tried to isolate to a deterministic case here + # based on timeing, but i was kinda wasted, and i don't + # think it's sane to catch them.. + assert err.type in ( + tractor.RemoteActorError, + StreamOverrun, + ) + + elif ( + slow_side == 'child' + ): + assert err.type == StreamOverrun + + elif slow_side == 'parent': + assert err.type == tractor.RemoteActorError + assert 'StreamOverrun' in err.msgdata['tb_str'] + + else: + # if this hits the logic blocks from above are not + # exhaustive.. + pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO') @tractor.context @@ -737,18 +952,18 @@ async def attach_to_sleep_forever(): finally: # XXX: previously this would trigger local # ``ContextCancelled`` to be received and raised in the - # local context overriding any local error due to - # logic inside ``_invoke()`` which checked for - # an error set on ``Context._error`` and raised it in - # under a cancellation scenario. - - # The problem is you can have a remote cancellation - # that is part of a local error and we shouldn't raise - # ``ContextCancelled`` **iff** we weren't the side of - # the context to initiate it, i.e. + # local context overriding any local error due to logic + # inside ``_invoke()`` which checked for an error set on + # ``Context._error`` and raised it in a cancellation + # scenario. + # ------ + # The problem is you can have a remote cancellation that + # is part of a local error and we shouldn't raise + # ``ContextCancelled`` **iff** we **were not** the side + # of the context to initiate it, i.e. # ``Context._cancel_called`` should **NOT** have been # set. The special logic to handle this case is now - # inside ``Context._may_raise_from_remote_msg()`` XD + # inside ``Context._maybe_raise_from_remote_msg()`` XD await peer_ctx.cancel() @@ -769,9 +984,10 @@ async def error_before_started( def test_do_not_swallow_error_before_started_by_remote_contextcancelled(): ''' - Verify that an error raised in a remote context which itself opens another - remote context, which it cancels, does not ovverride the original error that - caused the cancellation of the secondardy context. + Verify that an error raised in a remote context which itself opens + another remote context, which it cancels, does not ovverride the + original error that caused the cancellation of the secondardy + context. ''' async def main(): diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 788503461..5f90703ea 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -95,7 +95,7 @@ def _spawn(cmd): return _spawn -PROMPT = r"\(Pdb\+\+\)" +PROMPT = r"\(Pdb\+\)" def expect( @@ -151,6 +151,7 @@ def ctlc( use_ctlc = request.param + # TODO: we can remove this bc pdbp right? if ( sys.version_info <= (3, 10) and use_ctlc @@ -231,7 +232,7 @@ def test_root_actor_bp(spawn, user_in_out): child = spawn('root_actor_breakpoint') # scan for the pdbpp prompt - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) assert 'Error' not in str(child.before) @@ -272,7 +273,7 @@ def do_ctlc( if expect_prompt: before = str(child.before.decode()) time.sleep(delay) - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) time.sleep(delay) if patt: @@ -291,7 +292,7 @@ def test_root_actor_bp_forever( # entries for _ in range(10): - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) if ctlc: do_ctlc(child) @@ -301,7 +302,7 @@ def test_root_actor_bp_forever( # do one continue which should trigger a # new task to lock the tty child.sendline('continue') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) # seems that if we hit ctrl-c too fast the # sigint guard machinery might not kick in.. @@ -312,10 +313,10 @@ def test_root_actor_bp_forever( # XXX: this previously caused a bug! child.sendline('n') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) child.sendline('n') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) # quit out of the loop child.sendline('q') @@ -339,7 +340,7 @@ def test_subactor_error( child = spawn('subactor_error') # scan for the pdbpp prompt - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) before = str(child.before.decode()) assert "Attaching to pdb in crashed actor: ('name_error'" in before @@ -359,7 +360,7 @@ def test_subactor_error( # creating actor child.sendline('continue') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) before = str(child.before.decode()) # root actor gets debugger engaged @@ -387,7 +388,7 @@ def test_subactor_breakpoint( child = spawn('subactor_breakpoint') # scan for the pdbpp prompt - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) before = str(child.before.decode()) assert "Attaching pdb to actor: ('breakpoint_forever'" in before @@ -396,7 +397,7 @@ def test_subactor_breakpoint( # entries for _ in range(10): child.sendline('next') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) if ctlc: do_ctlc(child) @@ -404,7 +405,7 @@ def test_subactor_breakpoint( # now run some "continues" to show re-entries for _ in range(5): child.sendline('continue') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) before = str(child.before.decode()) assert "Attaching pdb to actor: ('breakpoint_forever'" in before @@ -415,7 +416,7 @@ def test_subactor_breakpoint( child.sendline('q') # child process should exit but parent will capture pdb.BdbQuit - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) before = str(child.before.decode()) assert "RemoteActorError: ('breakpoint_forever'" in before @@ -448,7 +449,7 @@ def test_multi_subactors( child = spawn(r'multi_subactors') # scan for the pdbpp prompt - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) before = str(child.before.decode()) assert "Attaching pdb to actor: ('breakpoint_forever'" in before @@ -460,7 +461,7 @@ def test_multi_subactors( # entries for _ in range(10): child.sendline('next') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) if ctlc: do_ctlc(child) @@ -469,7 +470,7 @@ def test_multi_subactors( child.sendline('c') # first name_error failure - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) before = str(child.before.decode()) assert "Attaching to pdb in crashed actor: ('name_error'" in before assert "NameError" in before @@ -481,7 +482,7 @@ def test_multi_subactors( child.sendline('c') # 2nd name_error failure - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) # TODO: will we ever get the race where this crash will show up? # blocklist strat now prevents this crash @@ -495,7 +496,7 @@ def test_multi_subactors( # breakpoint loop should re-engage child.sendline('c') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) before = str(child.before.decode()) assert "Attaching pdb to actor: ('breakpoint_forever'" in before @@ -511,7 +512,7 @@ def test_multi_subactors( ): child.sendline('c') time.sleep(0.1) - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) before = str(child.before.decode()) if ctlc: @@ -530,11 +531,11 @@ def test_multi_subactors( # now run some "continues" to show re-entries for _ in range(5): child.sendline('c') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) # quit the loop and expect parent to attach child.sendline('q') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) before = str(child.before.decode()) assert_before(child, [ @@ -578,7 +579,7 @@ def test_multi_daemon_subactors( ''' child = spawn('multi_daemon_subactors') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) # there can be a race for which subactor will acquire # the root's tty lock first so anticipate either crash @@ -608,7 +609,7 @@ def test_multi_daemon_subactors( # second entry by `bp_forever`. child.sendline('c') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) assert_before(child, [next_msg]) # XXX: hooray the root clobbering the child here was fixed! @@ -630,7 +631,7 @@ def test_multi_daemon_subactors( # expect another breakpoint actor entry child.sendline('c') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) try: assert_before(child, [bp_forever_msg]) @@ -646,7 +647,7 @@ def test_multi_daemon_subactors( # after 1 or more further bp actor entries. child.sendline('c') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) assert_before(child, [name_error_msg]) # wait for final error in root @@ -654,7 +655,7 @@ def test_multi_daemon_subactors( while True: try: child.sendline('c') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) assert_before( child, [bp_forever_msg] @@ -688,7 +689,7 @@ def test_multi_subactors_root_errors( child = spawn('multi_subactor_root_errors') # scan for the pdbpp prompt - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) # at most one subactor should attach before the root is cancelled before = str(child.before.decode()) @@ -703,7 +704,7 @@ def test_multi_subactors_root_errors( # due to block list strat from #337, this will no longer # propagate before the root errors and cancels the spawner sub-tree. - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) # only if the blocking condition doesn't kick in fast enough before = str(child.before.decode()) @@ -718,7 +719,7 @@ def test_multi_subactors_root_errors( do_ctlc(child) child.sendline('c') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) # check if the spawner crashed or was blocked from debug # and if this intermediary attached check the boxed error @@ -735,7 +736,7 @@ def test_multi_subactors_root_errors( do_ctlc(child) child.sendline('c') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) # expect a root actor crash assert_before(child, [ @@ -784,7 +785,7 @@ def test_multi_nested_subactors_error_through_nurseries( for send_char in itertools.cycle(['c', 'q']): try: - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) child.sendline(send_char) time.sleep(0.01) @@ -826,7 +827,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock( child = spawn('root_cancelled_but_child_is_in_tty_lock') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) before = str(child.before.decode()) assert "NameError: name 'doggypants' is not defined" in before @@ -841,7 +842,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock( for i in range(4): time.sleep(0.5) try: - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) except ( EOF, @@ -898,7 +899,7 @@ def test_root_cancels_child_context_during_startup( ''' child = spawn('fast_error_in_root_after_spawn') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) before = str(child.before.decode()) assert "AssertionError" in before @@ -915,7 +916,7 @@ def test_different_debug_mode_per_actor( ctlc: bool, ): child = spawn('per_actor_debug') - child.expect(r"\(Pdb\+\+\)") + child.expect(PROMPT) # only one actor should enter the debugger before = str(child.before.decode()) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 775ee98df..dd9d681ab 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -15,6 +15,7 @@ from tractor import ( to_asyncio, RemoteActorError, + ContextCancelled, ) from tractor.trionics import BroadcastReceiver @@ -224,14 +225,23 @@ async def main(): await trio.sleep_forever() - with pytest.raises(RemoteActorError) as excinfo: - trio.run(main) + return await ctx.result() - err = excinfo.value - assert isinstance(err, RemoteActorError) if parent_cancels: - assert err.type == trio.Cancelled + # bc the parent made the cancel request, + # the error is not raised locally but instead + # the context is exited silently + res = trio.run(main) + assert isinstance(res, ContextCancelled) + assert 'root' in res.canceller[0] + else: + expect = RemoteActorError + with pytest.raises(expect) as excinfo: + trio.run(main) + + err = excinfo.value + assert isinstance(err, expect) assert err.type == AssertionError diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 9f4a1fed2..5e18e10a7 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -86,7 +86,7 @@ async def open_sequence_streamer( ) as (ctx, first): assert first is None - async with ctx.open_stream(backpressure=True) as stream: + async with ctx.open_stream(allow_overruns=True) as stream: yield stream await portal.cancel_actor() @@ -413,8 +413,8 @@ async def sub_and_print( seq = brx._state.subs[brx.key] assert seq == len(brx._state.queue) - 1 - # all backpressured entries in the underlying - # channel should have been copied into the caster + # all no_overruns entries in the underlying + # channel should have been copied into the bcaster # queue trailing-window async for i in rx: print(f'bped: {i}') diff --git a/tractor/__init__.py b/tractor/__init__.py index 731f3e940..aa2621051 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -15,18 +15,20 @@ # along with this program. If not, see . """ -tractor: structured concurrent "actors". +tractor: structured concurrent ``trio``-"actors". """ from exceptiongroup import BaseExceptionGroup from ._clustering import open_actor_cluster from ._ipc import Channel -from ._streaming import ( +from ._context import ( Context, + context, +) +from ._streaming import ( MsgStream, stream, - context, ) from ._discovery import ( get_arbiter, @@ -44,7 +46,10 @@ ModuleNotExposed, ContextCancelled, ) -from ._debug import breakpoint, post_mortem +from ._debug import ( + breakpoint, + post_mortem, +) from . import msg from ._root import ( run_daemon, diff --git a/tractor/_context.py b/tractor/_context.py new file mode 100644 index 000000000..e95cd2b11 --- /dev/null +++ b/tractor/_context.py @@ -0,0 +1,771 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +The fundamental cross process SC abstraction: an inter-actor, +cancel-scope linked task "context". + +A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built +into each ``trio.Nursery`` except it links the lifetimes of memory space +disjoint, parallel executing tasks in separate actors. + +''' +from __future__ import annotations +from collections import deque +from contextlib import asynccontextmanager as acm +from dataclasses import ( + dataclass, + field, +) +from functools import partial +import inspect +from pprint import pformat +from typing import ( + Any, + Callable, + AsyncGenerator, + TYPE_CHECKING, +) +import warnings + +import trio + +from ._exceptions import ( + unpack_error, + pack_error, + ContextCancelled, + StreamOverrun, +) +from .log import get_logger +from ._ipc import Channel +from ._streaming import MsgStream +from ._state import current_actor + +if TYPE_CHECKING: + from ._portal import Portal + + +log = get_logger(__name__) + + +@dataclass +class Context: + ''' + An inter-actor, ``trio``-task communication context. + + NB: This class should never be instatiated directly, it is delivered + by either, + - runtime machinery to a remotely started task or, + - by entering ``Portal.open_context()``. + + and is always constructed using ``mkt_context()``. + + Allows maintaining task or protocol specific state between + 2 communicating, parallel executing actor tasks. A unique context is + allocated on each side of any task RPC-linked msg dialog, for + every request to a remote actor from a portal. On the "callee" + side a context is always allocated inside ``._runtime._invoke()``. + + A context can be cancelled and (possibly eventually restarted) from + either side of the underlying IPC channel, it can also open task + oriented message streams, and acts more or less as an IPC aware + inter-actor-task ``trio.CancelScope``. + + ''' + chan: Channel + cid: str + + # these are the "feeder" channels for delivering + # message values to the local task from the runtime + # msg processing loop. + _recv_chan: trio.MemoryReceiveChannel + _send_chan: trio.MemorySendChannel + + _remote_func_type: str | None = None + + # only set on the caller side + _portal: Portal | None = None # type: ignore # noqa + _result: Any | int = None + _remote_error: BaseException | None = None + + # cancellation state + _cancel_called: bool = False + _cancel_called_remote: tuple | None = None + _cancel_msg: str | None = None + _scope: trio.CancelScope | None = None + _enter_debugger_on_cancel: bool = True + + @property + def cancel_called(self) -> bool: + ''' + Records whether cancellation has been requested for this context + by either an explicit call to ``.cancel()`` or an implicit call + due to an error caught inside the ``Portal.open_context()`` + block. + + ''' + return self._cancel_called + + @property + def cancel_called_remote(self) -> tuple[str, str] | None: + ''' + ``Actor.uid`` of the remote actor who's task was cancelled + causing this side of the context to also be cancelled. + + ''' + remote_uid = self._cancel_called_remote + if remote_uid: + return tuple(remote_uid) + + @property + def cancelled_caught(self) -> bool: + return self._scope.cancelled_caught + + # init and streaming state + _started_called: bool = False + _started_received: bool = False + _stream_opened: bool = False + + # overrun handling machinery + # NOTE: none of this provides "backpressure" to the remote + # task, only an ability to not lose messages when the local + # task is configured to NOT transmit ``StreamOverrun``s back + # to the other side. + _overflow_q: deque[dict] = field( + default_factory=partial( + deque, + maxlen=616, + ) + ) + _scope_nursery: trio.Nursery | None = None + _in_overrun: bool = False + _allow_overruns: bool = False + + async def send_yield( + self, + data: Any, + + ) -> None: + + warnings.warn( + "`Context.send_yield()` is now deprecated. " + "Use ``MessageStream.send()``. ", + DeprecationWarning, + stacklevel=2, + ) + await self.chan.send({'yield': data, 'cid': self.cid}) + + async def send_stop(self) -> None: + await self.chan.send({'stop': True, 'cid': self.cid}) + + async def _maybe_cancel_and_set_remote_error( + self, + error_msg: dict[str, Any], + + ) -> None: + ''' + (Maybe) unpack and raise a msg error into the local scope + nursery for this context. + + Acts as a form of "relay" for a remote error raised + in the corresponding remote callee task. + + ''' + # If this is an error message from a context opened by + # ``Portal.open_context()`` we want to interrupt any ongoing + # (child) tasks within that context to be notified of the remote + # error relayed here. + # + # The reason we may want to raise the remote error immediately + # is that there is no guarantee the associated local task(s) + # will attempt to read from any locally opened stream any time + # soon. + # + # NOTE: this only applies when + # ``Portal.open_context()`` has been called since it is assumed + # (currently) that other portal APIs (``Portal.run()``, + # ``.run_in_actor()``) do their own error checking at the point + # of the call and result processing. + error = unpack_error( + error_msg, + self.chan, + ) + + # XXX: set the remote side's error so that after we cancel + # whatever task is the opener of this context it can raise + # that error as the reason. + self._remote_error = error + + if ( + isinstance(error, ContextCancelled) + ): + log.cancel( + 'Remote task-context sucessfully cancelled for ' + f'{self.chan.uid}:{self.cid}' + ) + + if self._cancel_called: + # this is an expected cancel request response message + # and we don't need to raise it in scope since it will + # potentially override a real error + return + else: + log.error( + f'Remote context error for {self.chan.uid}:{self.cid}:\n' + f'{error_msg["error"]["tb_str"]}' + ) + # TODO: tempted to **not** do this by-reraising in a + # nursery and instead cancel a surrounding scope, detect + # the cancellation, then lookup the error that was set? + # YES! this is way better and simpler! + if ( + self._scope + ): + # from trio.testing import wait_all_tasks_blocked + # await wait_all_tasks_blocked() + self._cancel_called_remote = self.chan.uid + self._scope.cancel() + + # NOTE: this usage actually works here B) + # from ._debug import breakpoint + # await breakpoint() + + # XXX: this will break early callee results sending + # since when `.result()` is finally called, this + # chan will be closed.. + # if self._recv_chan: + # await self._recv_chan.aclose() + + async def cancel( + self, + msg: str | None = None, + timeout: float = 0.5, + # timeout: float = 1000, + + ) -> None: + ''' + Cancel this inter-actor-task context. + + Request that the far side cancel it's current linked context, + Timeout quickly in an attempt to sidestep 2-generals... + + ''' + side = 'caller' if self._portal else 'callee' + if msg: + assert side == 'callee', 'Only callee side can provide cancel msg' + + log.cancel(f'Cancelling {side} side of context to {self.chan.uid}') + + self._cancel_called = True + # await _debug.breakpoint() + # breakpoint() + + if side == 'caller': + if not self._portal: + raise RuntimeError( + "No portal found, this is likely a callee side context" + ) + + cid = self.cid + with trio.move_on_after(timeout) as cs: + # cs.shield = True + log.cancel( + f"Cancelling stream {cid} to " + f"{self._portal.channel.uid}") + + # NOTE: we're telling the far end actor to cancel a task + # corresponding to *this actor*. The far end local channel + # instance is passed to `Actor._cancel_task()` implicitly. + await self._portal.run_from_ns( + 'self', + '_cancel_task', + cid=cid, + ) + # print("EXITING CANCEL CALL") + + if cs.cancelled_caught: + # XXX: there's no way to know if the remote task was indeed + # cancelled in the case where the connection is broken or + # some other network error occurred. + # if not self._portal.channel.connected(): + if not self.chan.connected(): + log.cancel( + "May have failed to cancel remote task " + f"{cid} for {self._portal.channel.uid}") + else: + log.cancel( + "Timed out on cancelling remote task " + f"{cid} for {self._portal.channel.uid}") + + # callee side remote task + else: + self._cancel_msg = msg + + # TODO: should we have an explicit cancel message + # or is relaying the local `trio.Cancelled` as an + # {'error': trio.Cancelled, cid: "blah"} enough? + # This probably gets into the discussion in + # https://github.com/goodboy/tractor/issues/36 + assert self._scope + self._scope.cancel() + + @acm + async def open_stream( + + self, + allow_overruns: bool | None = False, + msg_buffer_size: int | None = None, + + ) -> AsyncGenerator[MsgStream, None]: + ''' + Open a ``MsgStream``, a bi-directional stream connected to the + cross-actor (far end) task for this ``Context``. + + This context manager must be entered on both the caller and + callee for the stream to logically be considered "connected". + + A ``MsgStream`` is currently "one-shot" use, meaning if you + close it you can not "re-open" it for streaming and instead you + must re-establish a new surrounding ``Context`` using + ``Portal.open_context()``. In the future this may change but + currently there seems to be no obvious reason to support + "re-opening": + - pausing a stream can be done with a message. + - task errors will normally require a restart of the entire + scope of the inter-actor task context due to the nature of + ``trio``'s cancellation system. + + ''' + actor = current_actor() + + # here we create a mem chan that corresponds to the + # far end caller / callee. + + # Likewise if the surrounding context has been cancelled we error here + # since it likely means the surrounding block was exited or + # killed + + if self._cancel_called: + task = trio.lowlevel.current_task().name + raise ContextCancelled( + f'Context around {actor.uid[0]}:{task} was already cancelled!' + ) + + if not self._portal and not self._started_called: + raise RuntimeError( + 'Context.started()` must be called before opening a stream' + ) + + # NOTE: in one way streaming this only happens on the + # caller side inside `Actor.start_remote_task()` so if you try + # to send a stop from the caller to the callee in the + # single-direction-stream case you'll get a lookup error + # currently. + ctx = actor.get_context( + self.chan, + self.cid, + msg_buffer_size=msg_buffer_size, + allow_overruns=allow_overruns, + ) + ctx._allow_overruns = allow_overruns + assert ctx is self + + # XXX: If the underlying channel feeder receive mem chan has + # been closed then likely client code has already exited + # a ``.open_stream()`` block prior or there was some other + # unanticipated error or cancellation from ``trio``. + + if ctx._recv_chan._closed: + raise trio.ClosedResourceError( + 'The underlying channel for this stream was already closed!?') + + async with MsgStream( + ctx=self, + rx_chan=ctx._recv_chan, + ) as stream: + + if self._portal: + self._portal._streams.add(stream) + + try: + self._stream_opened = True + + # XXX: do we need this? + # ensure we aren't cancelled before yielding the stream + # await trio.lowlevel.checkpoint() + yield stream + + # NOTE: Make the stream "one-shot use". On exit, signal + # ``trio.EndOfChannel``/``StopAsyncIteration`` to the + # far end. + await stream.aclose() + + finally: + if self._portal: + try: + self._portal._streams.remove(stream) + except KeyError: + log.warning( + f'Stream was already destroyed?\n' + f'actor: {self.chan.uid}\n' + f'ctx id: {self.cid}' + ) + + def _maybe_raise_remote_err( + self, + err: Exception, + ) -> None: + # NOTE: whenever the context's "opener" side (task) **is** + # the side which requested the cancellation (likekly via + # ``Context.cancel()``), we don't want to re-raise that + # cancellation signal locally (would be akin to + # a ``trio.Nursery`` nursery raising ``trio.Cancelled`` + # whenever ``CancelScope.cancel()`` was called) and instead + # silently reap the expected cancellation "error"-msg. + # if 'pikerd' in err.msgdata['tb_str']: + # # from . import _debug + # # await _debug.breakpoint() + # breakpoint() + + if ( + isinstance(err, ContextCancelled) + and ( + self._cancel_called + or self.chan._cancel_called + or tuple(err.canceller) == current_actor().uid + ) + ): + return err + + raise err # from None + + async def result(self) -> Any | Exception: + ''' + From some (caller) side task, wait for and return the final + result from the remote (callee) side's task. + + This provides a mechanism for one task running in some actor to wait + on another task at the other side, in some other actor, to terminate. + + If the remote task is still in a streaming state (it is delivering + values from inside a ``Context.open_stream():`` block, then those + msgs are drained but discarded since it is presumed this side of + the context has already finished with its own streaming logic. + + If the remote context (or its containing actor runtime) was + canceled, either by a local task calling one of + ``Context.cancel()`` or `Portal.cancel_actor()``, we ignore the + received ``ContextCancelled`` exception if the context or + underlying IPC channel is marked as having been "cancel called". + This is similar behavior to using ``trio.Nursery.cancel()`` + wherein tasks which raise ``trio.Cancel`` are silently reaped; + the main different in this API is in the "cancel called" case, + instead of just not raising, we also return the exception *as + the result* since client code may be interested in the details + of the remote cancellation. + + ''' + assert self._portal, "Context.result() can not be called from callee!" + assert self._recv_chan + + # from . import _debug + # await _debug.breakpoint() + + re = self._remote_error + if re: + self._maybe_raise_remote_err(re) + return re + + if ( + self._result == id(self) + and not self._remote_error + and not self._recv_chan._closed # type: ignore + ): + # wait for a final context result consuming + # and discarding any bi dir stream msgs still + # in transit from the far end. + while True: + msg = await self._recv_chan.receive() + try: + self._result = msg['return'] + + # NOTE: we don't need to do this right? + # XXX: only close the rx mem chan AFTER + # a final result is retreived. + # if self._recv_chan: + # await self._recv_chan.aclose() + + break + except KeyError: # as msgerr: + + if 'yield' in msg: + # far end task is still streaming to us so discard + log.warning(f'Discarding stream delivered {msg}') + continue + + elif 'stop' in msg: + log.debug('Remote stream terminated') + continue + + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") + + err = unpack_error( + msg, + self._portal.channel + ) # from msgerr + + err = self._maybe_raise_remote_err(err) + self._remote_err = err + + return self._remote_error or self._result + + async def started( + self, + value: Any | None = None + + ) -> None: + ''' + Indicate to calling actor's task that this linked context + has started and send ``value`` to the other side. + + On the calling side ``value`` is the second item delivered + in the tuple returned by ``Portal.open_context()``. + + ''' + if self._portal: + raise RuntimeError( + f"Caller side context {self} can not call started!") + + elif self._started_called: + raise RuntimeError( + f"called 'started' twice on context with {self.chan.uid}") + + await self.chan.send({'started': value, 'cid': self.cid}) + self._started_called = True + + # TODO: do we need a restart api? + # async def restart(self) -> None: + # pass + + async def _drain_overflows( + self, + ) -> None: + ''' + Private task spawned to push newly received msgs to the local + task which getting overrun by the remote side. + + In order to not block the rpc msg loop, but also not discard + msgs received in this context, we need to async push msgs in + a new task which only runs for as long as the local task is in + an overrun state. + + ''' + self._in_overrun = True + try: + while self._overflow_q: + # NOTE: these msgs should never be errors since we always do + # the check prior to checking if we're in an overrun state + # inside ``.deliver_msg()``. + msg = self._overflow_q.popleft() + try: + await self._send_chan.send(msg) + except trio.BrokenResourceError: + log.warning( + f"{self._send_chan} consumer is already closed" + ) + return + except trio.Cancelled: + # we are obviously still in overrun + # but the context is being closed anyway + # so we just warn that there are un received + # msgs still.. + self._overflow_q.appendleft(msg) + fmt_msgs = '' + for msg in self._overflow_q: + fmt_msgs += f'{pformat(msg)}\n' + + log.warning( + f'Context for {self.cid} is being closed while ' + 'in an overrun state!\n' + 'Discarding the following msgs:\n' + f'{fmt_msgs}\n' + ) + raise + + finally: + # task is now finished with the backlog so mark us as + # no longer in backlog. + self._in_overrun = False + + async def _deliver_msg( + self, + msg: dict, + + draining: bool = False, + + ) -> bool: + + cid = self.cid + chan = self.chan + uid = chan.uid + send_chan: trio.MemorySendChannel = self._send_chan + + log.runtime( + f"Delivering {msg} from {uid} to caller {cid}" + ) + + error = msg.get('error') + if error: + await self._maybe_cancel_and_set_remote_error(msg) + + if ( + self._in_overrun + ): + self._overflow_q.append(msg) + return False + + try: + send_chan.send_nowait(msg) + return True + # if an error is deteced we should always + # expect it to be raised by any context (stream) + # consumer task + + except trio.BrokenResourceError: + # TODO: what is the right way to handle the case where the + # local task has already sent a 'stop' / StopAsyncInteration + # to the other side but and possibly has closed the local + # feeder mem chan? Do we wait for some kind of ack or just + # let this fail silently and bubble up (currently)? + + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{send_chan} consumer is already closed") + return False + + # NOTE XXX: by default we do **not** maintain context-stream + # backpressure and instead opt to relay stream overrun errors to + # the sender; the main motivation is that using bp can block the + # msg handling loop which calls into this method! + except trio.WouldBlock: + # XXX: always push an error even if the local + # receiver is in overrun state. + # await self._maybe_cancel_and_set_remote_error(msg) + + local_uid = current_actor().uid + lines = [ + f'OVERRUN on actor-task context {cid}@{local_uid}!\n' + # TODO: put remote task name here if possible? + f'remote sender actor: {uid}', + # TODO: put task func name here and maybe an arrow + # from sender to overrunner? + # f'local task {self.func_name}' + ] + if not self._stream_opened: + lines.insert( + 1, + f'\n*** No stream open on `{local_uid[0]}` side! ***\n' + ) + + text = '\n'.join(lines) + + # XXX: lul, this really can't be backpressure since any + # blocking here will block the entire msg loop rpc sched for + # a whole channel.. maybe we should rename it? + if self._allow_overruns: + text += f'\nStarting overflow queuing task on msg: {msg}' + log.warning(text) + if ( + not self._in_overrun + ): + self._overflow_q.append(msg) + n = self._scope_nursery + assert not n.child_tasks + try: + n.start_soon( + self._drain_overflows, + ) + except RuntimeError: + # if the nursery is already cancelled due to + # this context exiting or in error, we ignore + # the nursery error since we never expected + # anything different. + return False + else: + try: + raise StreamOverrun(text) + except StreamOverrun as err: + err_msg = pack_error(err) + err_msg['cid'] = cid + try: + await chan.send(err_msg) + except trio.BrokenResourceError: + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{chan} is already closed") + + return False + + +def mk_context( + chan: Channel, + cid: str, + msg_buffer_size: int = 2**6, + + **kwargs, + +) -> Context: + ''' + Internal factory to create an inter-actor task ``Context``. + + This is called by internals and should generally never be called + by user code. + + ''' + send_chan: trio.MemorySendChannel + recv_chan: trio.MemoryReceiveChannel + send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) + + ctx = Context( + chan, + cid, + _send_chan=send_chan, + _recv_chan=recv_chan, + **kwargs, + ) + ctx._result: int | Any = id(ctx) + return ctx + + +def context(func: Callable) -> Callable: + ''' + Mark an async function as a streaming routine with ``@context``. + + ''' + # TODO: apply whatever solution ``mypy`` ends up picking for this: + # https://github.com/python/mypy/issues/2087#issuecomment-769266912 + func._tractor_context_function = True # type: ignore + + sig = inspect.signature(func) + params = sig.parameters + if 'ctx' not in params: + raise TypeError( + "The first argument to the context function " + f"{func.__name__} must be `ctx: tractor.Context`" + ) + return func diff --git a/tractor/_debug.py b/tractor/_debug.py index 47a9a8853..59e1c7674 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -37,6 +37,7 @@ ) from types import FrameType +import pdbp import tractor import trio from trio_typing import TaskStatus @@ -54,15 +55,16 @@ from ._ipc import Channel -try: - # wtf: only exported when installed in dev mode? - import pdbpp -except ImportError: - # pdbpp is installed in regular mode...it monkey patches stuff - import pdb - xpm = getattr(pdb, 'xpm', None) - assert xpm, "pdbpp is not installed?" # type: ignore - pdbpp = pdb +# TODO: we can drop this now yah? +# try: +# # wtf: only exported when installed in dev mode? +# import pdbp +# except ImportError: +# # pdbpp is installed in regular mode...it monkey patches stuff +# import pdb +# xpm = getattr(pdb, 'xpm', None) +# assert xpm, "pdbpp is not installed?" # type: ignore +# pdbpp = pdb log = get_logger(__name__) @@ -154,22 +156,26 @@ def release(cls): cls.repl = None -class TractorConfig(pdbpp.DefaultConfig): +class TractorConfig(pdbp.DefaultConfig): ''' - Custom ``pdbpp`` goodness. + Custom ``pdbp`` goodness :surfer: ''' - # use_pygments = True - # sticky_by_default = True - enable_hidden_frames = False + use_pygments: bool = True + sticky_by_default: bool = False + enable_hidden_frames: bool = False + # much thanks @mdmintz for the hot tip! + # fixes line spacing issue when resizing terminal B) + truncate_long_lines: bool = False -class MultiActorPdb(pdbpp.Pdb): + +class MultiActorPdb(pdbp.Pdb): ''' - Add teardown hooks to the regular ``pdbpp.Pdb``. + Add teardown hooks to the regular ``pdbp.Pdb``. ''' - # override the pdbpp config with our coolio one + # override the pdbp config with our coolio one DefaultConfig = TractorConfig # def preloop(self): @@ -313,7 +319,7 @@ async def lock_tty_for_child( ) -> str: ''' Lock the TTY in the root process of an actor tree in a new - inter-actor-context-task such that the ``pdbpp`` debugger console + inter-actor-context-task such that the ``pdbp`` debugger console can be mutex-allocated to the calling sub-actor for REPL control without interference by other processes / threads. @@ -433,7 +439,7 @@ async def wait_for_parent_stdin_hijack( def mk_mpdb() -> tuple[MultiActorPdb, Callable]: pdb = MultiActorPdb() - # signal.signal = pdbpp.hideframe(signal.signal) + # signal.signal = pdbp.hideframe(signal.signal) Lock.shield_sigint() @@ -583,7 +589,7 @@ async def _breakpoint( # # frame = sys._getframe() # # last_f = frame.f_back # # last_f.f_globals['__tracebackhide__'] = True - # # signal.signal = pdbpp.hideframe(signal.signal) + # # signal.signal = pdbp.hideframe(signal.signal) def shield_sigint_handler( @@ -743,13 +749,13 @@ def do_cancel(): # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py - # XXX: lol, see ``pdbpp`` issue: + # XXX LEGACY: lol, see ``pdbpp`` issue: # https://github.com/pdbpp/pdbpp/issues/496 def _set_trace( - actor: Optional[tractor.Actor] = None, - pdb: Optional[MultiActorPdb] = None, + actor: tractor.Actor | None = None, + pdb: MultiActorPdb | None = None, ): __tracebackhide__ = True actor = actor or tractor.current_actor() @@ -759,7 +765,11 @@ def _set_trace( if frame: frame = frame.f_back # type: ignore - if frame and pdb and actor is not None: + if ( + frame + and pdb + and actor is not None + ): log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") # no f!#$&* idea, but when we're in async land # we need 2x frames up? @@ -768,7 +778,8 @@ def _set_trace( else: pdb, undo_sigint = mk_mpdb() - # we entered the global ``breakpoint()`` built-in from sync code? + # we entered the global ``breakpoint()`` built-in from sync + # code? Lock.local_task_in_debug = 'sync' pdb.set_trace(frame=frame) @@ -798,7 +809,7 @@ def _post_mortem( # https://github.com/pdbpp/pdbpp/issues/480 # TODO: help with a 3.10+ major release if/when it arrives. - pdbpp.xpm(Pdb=lambda: pdb) + pdbp.xpm(Pdb=lambda: pdb) post_mortem = partial( diff --git a/tractor/_discovery.py b/tractor/_discovery.py index b6957ba3a..03775ac21 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -41,8 +41,10 @@ async def get_arbiter( port: int, ) -> AsyncGenerator[Union[Portal, LocalPortal], None]: - '''Return a portal instance connected to a local or remote + ''' + Return a portal instance connected to a local or remote arbiter. + ''' actor = current_actor() @@ -134,12 +136,16 @@ async def find_actor( @acm async def wait_for_actor( name: str, - arbiter_sockaddr: tuple[str, int] | None = None + arbiter_sockaddr: tuple[str, int] | None = None, + # registry_addr: tuple[str, int] | None = None, + ) -> AsyncGenerator[Portal, None]: - """Wait on an actor to register with the arbiter. + ''' + Wait on an actor to register with the arbiter. A portal to the first registered actor is returned. - """ + + ''' actor = current_actor() async with get_arbiter( diff --git a/tractor/_entry.py b/tractor/_entry.py index 1e7997e8b..e8fb56db4 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -132,7 +132,7 @@ def _trio_main( else: trio.run(trio_main) except KeyboardInterrupt: - log.warning(f"Actor {actor.uid} received KBI") + log.cancel(f"Actor {actor.uid} received KBI") finally: log.info(f"Actor {actor.uid} terminated") diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 5440cad01..6da2e6573 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -18,18 +18,18 @@ Our classy exception set. """ +import builtins +import importlib from typing import ( Any, - Optional, Type, ) -import importlib -import builtins import traceback import exceptiongroup as eg import trio +from ._state import current_actor _this_mod = importlib.import_module(__name__) @@ -44,7 +44,7 @@ class RemoteActorError(Exception): def __init__( self, message: str, - suberror_type: Optional[Type[BaseException]] = None, + suberror_type: Type[BaseException] | None = None, **msgdata ) -> None: @@ -53,19 +53,34 @@ def __init__( self.type = suberror_type self.msgdata = msgdata + @property + def src_actor_uid(self) -> tuple[str, str] | None: + return self.msgdata.get('src_actor_uid') + class InternalActorError(RemoteActorError): - """Remote internal ``tractor`` error indicating + ''' + Remote internal ``tractor`` error indicating failure of some primitive or machinery. - """ - -class TransportClosed(trio.ClosedResourceError): - "Underlying channel transport was closed prior to use" + ''' class ContextCancelled(RemoteActorError): - "Inter-actor task context cancelled itself on the callee side." + ''' + Inter-actor task context was cancelled by either a call to + ``Portal.cancel_actor()`` or ``Context.cancel()``. + + ''' + @property + def canceller(self) -> tuple[str, str] | None: + value = self.msgdata.get('canceller') + if value: + return tuple(value) + + +class TransportClosed(trio.ClosedResourceError): + "Underlying channel transport was closed prior to use" class NoResult(RuntimeError): @@ -106,13 +121,17 @@ def pack_error( else: tb_str = traceback.format_exc() - return { - 'error': { - 'tb_str': tb_str, - 'type_str': type(exc).__name__, - } + error_msg = { + 'tb_str': tb_str, + 'type_str': type(exc).__name__, + 'src_actor_uid': current_actor().uid, } + if isinstance(exc, ContextCancelled): + error_msg.update(exc.msgdata) + + return {'error': error_msg} + def unpack_error( @@ -136,7 +155,7 @@ def unpack_error( if type_name == 'ContextCancelled': err_type = ContextCancelled - suberror_type = trio.Cancelled + suberror_type = RemoteActorError else: # try to lookup a suitable local error type for ns in [ diff --git a/tractor/_portal.py b/tractor/_portal.py index 17871aa2f..602937168 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -45,10 +45,8 @@ NoResult, ContextCancelled, ) -from ._streaming import ( - Context, - MsgStream, -) +from ._context import Context +from ._streaming import MsgStream log = get_logger(__name__) @@ -103,7 +101,7 @@ def __init__(self, channel: Channel) -> None: # When set to a ``Context`` (when _submit_for_result is called) # it is expected that ``result()`` will be awaited at some # point. - self._expect_result: Optional[Context] = None + self._expect_result: Context | None = None self._streams: set[MsgStream] = set() self.actor = current_actor() @@ -209,7 +207,10 @@ async def cancel_actor( try: # send cancel cmd - might not get response # XXX: sure would be nice to make this work with a proper shield - with trio.move_on_after(timeout or self.cancel_timeout) as cs: + with trio.move_on_after( + timeout + or self.cancel_timeout + ) as cs: cs.shield = True await self.run_from_ns('self', 'cancel') @@ -330,7 +331,9 @@ async def open_stream_from( f'{async_gen_func} must be an async generator function!') fn_mod_path, fn_name = NamespacePath.from_ref( - async_gen_func).to_tuple() + async_gen_func + ).to_tuple() + ctx = await self.actor.start_remote_task( self.channel, fn_mod_path, @@ -377,6 +380,7 @@ async def open_context( self, func: Callable, + allow_overruns: bool = False, **kwargs, ) -> AsyncGenerator[tuple[Context, Any], None]: @@ -396,13 +400,26 @@ async def open_context( raise TypeError( f'{func} must be an async generator function!') + # TODO: i think from here onward should probably + # just be factored into an `@acm` inside a new + # a new `_context.py` mod. fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple() ctx = await self.actor.start_remote_task( self.channel, fn_mod_path, fn_name, - kwargs + kwargs, + + # NOTE: it's imporant to expose this since you might + # get the case where the parent who opened the context does + # not open a stream until after some slow startup/init + # period, in which case when the first msg is read from + # the feeder mem chan, say when first calling + # `Context.open_stream(allow_overruns=True)`, the overrun condition will be + # raised before any ignoring of overflow msgs can take + # place.. + allow_overruns=allow_overruns, ) assert ctx._remote_func_type == 'context' @@ -426,29 +443,47 @@ async def open_context( f' but received a non-error msg:\n{pformat(msg)}' ) - _err: Optional[BaseException] = None - ctx._portal = self + _err: BaseException | None = None + ctx._portal: Portal = self - uid = self.channel.uid - cid = ctx.cid - etype: Optional[Type[BaseException]] = None + uid: tuple = self.channel.uid + cid: str = ctx.cid + etype: Type[BaseException] | None = None - # deliver context instance and .started() msg value in open tuple. + # deliver context instance and .started() msg value in enter + # tuple. try: - async with trio.open_nursery() as scope_nursery: - ctx._scope_nursery = scope_nursery - - # do we need this? - # await trio.lowlevel.checkpoint() + async with trio.open_nursery() as nurse: + ctx._scope_nursery = nurse + ctx._scope = nurse.cancel_scope yield ctx, first + # when in allow_ovveruns mode there may be lingering + # overflow sender tasks remaining? + if nurse.child_tasks: + # ensure we are in overrun state with + # ``._allow_overruns=True`` bc otherwise + # there should be no tasks in this nursery! + if ( + not ctx._allow_overruns + or len(nurse.child_tasks) > 1 + ): + raise RuntimeError( + 'Context has sub-tasks but is ' + 'not in `allow_overruns=True` Mode!?' + ) + ctx._scope.cancel() + except ContextCancelled as err: _err = err + + # swallow and mask cross-actor task context cancels that + # were initiated by *this* side's task. if not ctx._cancel_called: - # context was cancelled at the far end but was - # not part of this end requesting that cancel - # so raise for the local task to respond and handle. + # XXX: this should NEVER happen! + # from ._debug import breakpoint + # await breakpoint() raise # if the context was cancelled by client code @@ -468,17 +503,17 @@ async def open_context( ) as err: etype = type(err) - # the context cancels itself on any cancel - # causing error. - if ctx.chan.connected(): - log.cancel( - 'Context cancelled for task, sending cancel request..\n' - f'task:{cid}\n' - f'actor:{uid}' - ) + # cancel ourselves on any error. + log.cancel( + 'Context cancelled for task, sending cancel request..\n' + f'task:{cid}\n' + f'actor:{uid}' + ) + try: + await ctx.cancel() - else: + except trio.BrokenResourceError: log.warning( 'IPC connection for context is broken?\n' f'task:{cid}\n' @@ -487,12 +522,7 @@ async def open_context( raise - finally: - # in the case where a runtime nursery (due to internal bug) - # or a remote actor transmits an error we want to be - # sure we get the error the underlying feeder mem chan. - # if it's not raised here it *should* be raised from the - # msg loop nursery right? + else: if ctx.chan.connected(): log.info( 'Waiting on final context-task result for\n' @@ -505,6 +535,7 @@ async def open_context( f'value from callee `{result}`' ) + finally: # though it should be impossible for any tasks # operating *in* this scope to have survived # we tear down the runtime feeder chan last diff --git a/tractor/_root.py b/tractor/_root.py index 840b2882d..a2d315865 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -22,8 +22,9 @@ from functools import partial import importlib import logging -import os import signal +import sys +import os import typing import warnings @@ -84,8 +85,10 @@ async def open_root_actor( ''' # Override the global debugger hook to make it play nice with - # ``trio``, see: + # ``trio``, see much discussion in: # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 + builtin_bp_handler = sys.breakpointhook + orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None) os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace' # attempt to retreive ``trio``'s sigint handler and stash it @@ -251,9 +254,20 @@ async def open_root_actor( # tempn.start_soon(an.exited.wait) logger.cancel("Shutting down root actor") - await actor.cancel() + await actor.cancel( + requesting_uid=actor.uid, + ) finally: _state._current_actor = None + + # restore breakpoint hook state + sys.breakpointhook = builtin_bp_handler + if orig_bp_path is not None: + os.environ['PYTHONBREAKPOINT'] = orig_bp_path + else: + # clear env back to having no entry + os.environ.pop('PYTHONBREAKPOINT') + logger.runtime("Root actor terminated") @@ -289,7 +303,7 @@ def run_daemon( async def _main(): async with open_root_actor( - arbiter_addr=registry_addr, + registry_addr=registry_addr, name=name, start_method=start_method, debug_mode=debug_mode, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 707b9dd6f..78f9e4fd0 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -28,9 +28,11 @@ import signal import sys from typing import ( - Any, Optional, - Union, TYPE_CHECKING, + Any, Callable, + Optional, + Union, + TYPE_CHECKING, ) import uuid from types import ModuleType @@ -44,7 +46,10 @@ from trio_typing import TaskStatus from ._ipc import Channel -from ._streaming import Context +from ._context import ( + mk_context, + Context, +) from .log import get_logger from ._exceptions import ( pack_error, @@ -53,7 +58,6 @@ is_multi_cancelled, ContextCancelled, TransportClosed, - StreamOverrun, ) from . import _debug from ._discovery import get_arbiter @@ -79,7 +83,7 @@ async def _invoke( is_rpc: bool = True, task_status: TaskStatus[ - Union[trio.CancelScope, BaseException] + Union[Context, BaseException] ] = trio.TASK_STATUS_IGNORED, ): ''' @@ -99,7 +103,14 @@ async def _invoke( # activated cancel scope ref cs: Optional[trio.CancelScope] = None - ctx = actor.get_context(chan, cid) + ctx = actor.get_context( + chan, + cid, + # We shouldn't ever need to pass this through right? + # it's up to the soon-to-be called rpc task to + # open the stream with this option. + # allow_overruns=True, + ) context: bool = False if getattr(func, '_tractor_stream_function', False): @@ -138,7 +149,10 @@ async def _invoke( ): raise TypeError(f'{func} must be an async function!') - coro = func(**kwargs) + try: + coro = func(**kwargs) + except TypeError: + raise if inspect.isasyncgen(coro): await chan.send({'functype': 'asyncgen', 'cid': cid}) @@ -150,7 +164,8 @@ async def _invoke( # of the async gen in order to be sure the cancel # is propagated! with cancel_scope as cs: - task_status.started(cs) + ctx._scope = cs + task_status.started(ctx) async with aclosing(coro) as agen: async for item in agen: # TODO: can we send values back in here? @@ -176,7 +191,8 @@ async def _invoke( # manualy construct the response dict-packet-responses as # above with cancel_scope as cs: - task_status.started(cs) + ctx._scope = cs + task_status.started(ctx) await coro if not cs.cancelled_caught: @@ -189,19 +205,26 @@ async def _invoke( await chan.send({'functype': 'context', 'cid': cid}) try: - async with trio.open_nursery() as scope_nursery: - ctx._scope_nursery = scope_nursery - cs = scope_nursery.cancel_scope - task_status.started(cs) + async with trio.open_nursery() as nurse: + ctx._scope_nursery = nurse + ctx._scope = nurse.cancel_scope + task_status.started(ctx) res = await coro await chan.send({'return': res, 'cid': cid}) - except BaseExceptionGroup: + # XXX: do we ever trigger this block any more? + except ( + BaseExceptionGroup, + trio.Cancelled, + ): # if a context error was set then likely # thei multierror was raised due to that - if ctx._error is not None: - raise ctx._error from None + if ctx._remote_error is not None: + raise ctx._remote_error + # maybe TODO: pack in ``trio.Cancelled.__traceback__`` here + # so they can be unwrapped and displayed on the caller + # side? raise finally: @@ -213,37 +236,66 @@ async def _invoke( # associated child isn't in debug any more await _debug.maybe_wait_for_debugger() ctx = actor._contexts.pop((chan.uid, cid)) - if ctx: log.runtime( f'Context entrypoint {func} was terminated:\n{ctx}' ) - assert cs - if cs.cancelled_caught: + if ctx.cancelled_caught: - # TODO: pack in ``trio.Cancelled.__traceback__`` here - # so they can be unwrapped and displayed on the caller - # side! + # first check for and raise any remote error + # before raising any context cancelled case + # so that real remote errors don't get masked as + # ``ContextCancelled``s. + re = ctx._remote_error + if re: + ctx._maybe_raise_remote_err(re) fname = func.__name__ - if ctx._cancel_called: - msg = f'`{fname}()` cancelled itself' + cs: trio.CancelScope = ctx._scope + if cs.cancel_called: + canceller = ctx._cancel_called_remote + # await _debug.breakpoint() + + # NOTE / TODO: if we end up having + # ``Actor._cancel_task()`` call + # ``Context.cancel()`` directly, we're going to + # need to change this logic branch since it will + # always enter.. + if ctx._cancel_called: + msg = f'`{fname}()`@{actor.uid} cancelled itself' - elif cs.cancel_called: - msg = ( - f'`{fname}()` was remotely cancelled by its caller ' - f'{ctx.chan.uid}' - ) - - if ctx._cancel_msg: - msg += f' with msg:\n{ctx._cancel_msg}' + else: + msg = ( + f'`{fname}()`@{actor.uid} ' + 'was remotely cancelled by ' + ) - # task-contex was cancelled so relay to the cancel to caller - raise ContextCancelled( - msg, - suberror_type=trio.Cancelled, - ) + # if the channel which spawned the ctx is the + # one that cancelled it then we report that, vs. + # it being some other random actor that for ex. + # some actor who calls `Portal.cancel_actor()` + # and by side-effect cancels this ctx. + if canceller == ctx.chan.uid: + msg += f'its caller {canceller}' + else: + msg += f'remote actor {canceller}' + + # TODO: does this ever get set any more or can + # we remove it? + if ctx._cancel_msg: + msg += f' with msg:\n{ctx._cancel_msg}' + + # task-contex was either cancelled by request using + # ``Portal.cancel_actor()`` or ``Context.cancel()`` + # on the far end, or it was cancelled by the local + # (callee) task, so relay this cancel signal to the + # other side. + raise ContextCancelled( + msg, + suberror_type=trio.Cancelled, + canceller=canceller, + ) else: # regular async function @@ -259,12 +311,17 @@ async def _invoke( ) with cancel_scope as cs: - task_status.started(cs) + ctx._scope = cs + task_status.started(ctx) result = await coro - log.cancel(f'result: {result}') + fname = func.__name__ + log.runtime(f'{fname}() result: {result}') if not failed_resp: # only send result if we know IPC isn't down - await chan.send({'return': result, 'cid': cid}) + await chan.send( + {'return': result, + 'cid': cid} + ) except ( Exception, @@ -307,6 +364,7 @@ async def _invoke( # always ship errors back to caller err_msg = pack_error(err, tb=tb) err_msg['cid'] = cid + try: await chan.send(err_msg) @@ -323,14 +381,21 @@ async def _invoke( f"Failed to ship error to caller @ {chan.uid} !?" ) - if cs is None: - # error is from above code not from rpc invocation + # error is probably from above coro running code *not from the + # underlyingn rpc invocation* since a scope was never allocated + # around actual coroutine await. + if ctx._scope is None: + # we don't ever raise directly here to allow the + # msg-loop-scheduler to continue running for this + # channel. task_status.started(err) finally: # RPC task bookeeping try: - scope, func, is_complete = actor._rpc_tasks.pop((chan, cid)) + ctx, func, is_complete = actor._rpc_tasks.pop( + (chan, cid) + ) is_complete.set() except KeyError: @@ -339,6 +404,9 @@ async def _invoke( # cancel scope will not have been inserted yet log.warning( f"Task {func} likely errored or cancelled before start") + else: + log.cancel(f'{func.__name__}({kwargs}) failed?') + finally: if not actor._rpc_tasks: log.runtime("All RPC tasks have completed") @@ -437,6 +505,7 @@ def __init__( self.uid = (name, uid or str(uuid.uuid4())) self._cancel_complete = trio.Event() + self._cancel_called_remote: tuple[str, tuple] | None = None self._cancel_called: bool = False # retreive and store parent `__main__` data which @@ -475,7 +544,7 @@ def __init__( # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: dict[ tuple[Channel, str], - tuple[trio.CancelScope, Callable, trio.Event] + tuple[Context, Callable, trio.Event] ] = {} # map {actor uids -> Context} @@ -650,8 +719,8 @@ async def _stream_handler( if ( local_nursery ): - - log.cancel(f"Waiting on cancel request to peer {chan.uid}") + if chan._cancel_called: + log.cancel(f"Waiting on cancel request to peer {chan.uid}") # XXX: this is a soft wait on the channel (and its # underlying transport protocol) to close from the # remote peer side since we presume that any channel @@ -784,75 +853,15 @@ async def _push_result( f'\n{msg}') return - send_chan = ctx._send_chan - - log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") - - # XXX: we do **not** maintain backpressure and instead - # opt to relay stream overrun errors to the sender. - try: - send_chan.send_nowait(msg) - # if an error is deteced we should always - # expect it to be raised by any context (stream) - # consumer task - await ctx._maybe_raise_from_remote_msg(msg) - - except trio.BrokenResourceError: - # TODO: what is the right way to handle the case where the - # local task has already sent a 'stop' / StopAsyncInteration - # to the other side but and possibly has closed the local - # feeder mem chan? Do we wait for some kind of ack or just - # let this fail silently and bubble up (currently)? - - # XXX: local consumer has closed their side - # so cancel the far end streaming task - log.warning(f"{send_chan} consumer is already closed") - return - - except trio.WouldBlock: - # XXX: always push an error even if the local - # receiver is in overrun state. - await ctx._maybe_raise_from_remote_msg(msg) - - uid = chan.uid - lines = [ - 'Task context stream was overrun', - f'local task: {cid} @ {self.uid}', - f'remote sender: {uid}', - ] - if not ctx._stream_opened: - lines.insert( - 1, - f'\n*** No stream open on `{self.uid[0]}` side! ***\n' - ) - text = '\n'.join(lines) - - if ctx._backpressure: - log.warning(text) - try: - await send_chan.send(msg) - except trio.BrokenResourceError: - # XXX: local consumer has closed their side - # so cancel the far end streaming task - log.warning(f"{chan} is already closed") - else: - try: - raise StreamOverrun(text) from None - except StreamOverrun as err: - err_msg = pack_error(err) - err_msg['cid'] = cid - try: - await chan.send(err_msg) - except trio.BrokenResourceError: - # XXX: local consumer has closed their side - # so cancel the far end streaming task - log.warning(f"{chan} is already closed") + return await ctx._deliver_msg(msg) def get_context( self, chan: Channel, cid: str, - msg_buffer_size: Optional[int] = None, + + msg_buffer_size: int | None = None, + allow_overruns: bool = False, ) -> Context: ''' @@ -868,6 +877,7 @@ def get_context( assert actor_uid try: ctx = self._contexts[(actor_uid, cid)] + ctx._allow_overruns = allow_overruns # adjust buffer size if specified state = ctx._send_chan._state # type: ignore @@ -875,15 +885,11 @@ def get_context( state.max_buffer_size = msg_buffer_size except KeyError: - send_chan: trio.MemorySendChannel - recv_chan: trio.MemoryReceiveChannel - send_chan, recv_chan = trio.open_memory_channel( - msg_buffer_size or self.msg_buffer_size) - ctx = Context( + ctx = mk_context( chan, cid, - _send_chan=send_chan, - _recv_chan=recv_chan, + msg_buffer_size=msg_buffer_size or self.msg_buffer_size, + _allow_overruns=allow_overruns, ) self._contexts[(actor_uid, cid)] = ctx @@ -895,7 +901,8 @@ async def start_remote_task( ns: str, func: str, kwargs: dict, - msg_buffer_size: Optional[int] = None, + msg_buffer_size: int | None = None, + allow_overruns: bool = False, ) -> Context: ''' @@ -909,9 +916,16 @@ async def start_remote_task( ''' cid = str(uuid.uuid4()) assert chan.uid - ctx = self.get_context(chan, cid, msg_buffer_size=msg_buffer_size) + ctx = self.get_context( + chan, + cid, + msg_buffer_size=msg_buffer_size, + allow_overruns=allow_overruns, + ) log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") - await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) + await chan.send( + {'cmd': (ns, func, kwargs, self.uid, cid)} + ) # Wait on first response msg and validate; this should be # immediate. @@ -921,7 +935,11 @@ async def start_remote_task( if 'error' in first_msg: raise unpack_error(first_msg, chan) - elif functype not in ('asyncfunc', 'asyncgen', 'context'): + elif functype not in ( + 'asyncfunc', + 'asyncgen', + 'context', + ): raise ValueError(f"{first_msg} is an invalid response packet?") ctx._remote_func_type = functype @@ -980,7 +998,7 @@ async def _from_parent( log.warning( f"Failed to connect to parent @ {parent_addr}," " closing server") - await self.cancel() + await self.cancel(requesting_uid=self.uid) raise async def _serve_forever( @@ -1033,7 +1051,11 @@ def cancel_soon(self) -> None: assert self._service_n self._service_n.start_soon(self.cancel) - async def cancel(self) -> bool: + async def cancel( + self, + requesting_uid: tuple[str, str], + + ) -> bool: ''' Cancel this actor's runtime. @@ -1047,6 +1069,7 @@ async def cancel(self) -> bool: ''' log.cancel(f"{self.uid} is trying to cancel") + self._cancel_called_remote: tuple = requesting_uid self._cancel_called = True # cancel all ongoing rpc tasks @@ -1060,7 +1083,7 @@ async def cancel(self) -> bool: dbcs.cancel() # kill all ongoing tasks - await self.cancel_rpc_tasks() + await self.cancel_rpc_tasks(requesting_uid=requesting_uid) # stop channel server self.cancel_server() @@ -1086,7 +1109,13 @@ async def cancel(self) -> bool: # for n in root.child_nurseries: # n.cancel_scope.cancel() - async def _cancel_task(self, cid, chan): + async def _cancel_task( + self, + cid: str, + chan: Channel, + + requesting_uid: tuple[str, str] | None = None, + ) -> bool: ''' Cancel a local task by call-id / channel. @@ -1101,35 +1130,51 @@ async def _cancel_task(self, cid, chan): try: # this ctx based lookup ensures the requested task to # be cancelled was indeed spawned by a request from this channel - scope, func, is_complete = self._rpc_tasks[(chan, cid)] + ctx, func, is_complete = self._rpc_tasks[(chan, cid)] + scope = ctx._scope except KeyError: log.cancel(f"{cid} has already completed/terminated?") - return + return True log.cancel( f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") + if ( + ctx._cancel_called_remote is None + and requesting_uid + ): + ctx._cancel_called_remote: tuple = requesting_uid + # don't allow cancelling this function mid-execution # (is this necessary?) if func is self._cancel_task: - return + return True + # TODO: shouldn't we eventually be calling ``Context.cancel()`` + # directly here instead (since that method can handle both + # side's calls into it? scope.cancel() # wait for _invoke to mark the task complete log.runtime( - f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" - f"peer: {chan.uid}\n") + 'Waiting on task to cancel:\n' + f'cid: {cid}\nfunc: {func}\n' + f'peer: {chan.uid}\n' + ) await is_complete.wait() log.runtime( f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") + return True + async def cancel_rpc_tasks( self, - only_chan: Optional[Channel] = None, + only_chan: Channel | None = None, + requesting_uid: tuple[str, str] | None = None, + ) -> None: ''' Cancel all existing RPC responder tasks using the cancel scope @@ -1141,7 +1186,7 @@ async def cancel_rpc_tasks( log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") for ( (chan, cid), - (scope, func, is_complete), + (ctx, func, is_complete), ) in tasks.copy().items(): if only_chan is not None: if only_chan != chan: @@ -1149,7 +1194,11 @@ async def cancel_rpc_tasks( # TODO: this should really done in a nursery batch if func != self._cancel_task: - await self._cancel_task(cid, chan) + await self._cancel_task( + cid, + chan, + requesting_uid=requesting_uid, + ) log.cancel( f"Waiting for remaining rpc tasks to complete {tasks}") @@ -1235,8 +1284,8 @@ async def async_main( Actor runtime entrypoint; start the IPC channel server, maybe connect back to the parent, and startup all core machinery tasks. - A "root-most" (or "top-level") nursery for this actor is opened here - and when cancelled effectively cancels the actor. + A "root" (or "top-level") nursery for this actor is opened here and + when cancelled/terminated effectively closes the actor's "runtime". ''' # attempt to retreive ``trio``'s sigint handler and stash it @@ -1330,13 +1379,15 @@ async def async_main( ) ) log.runtime("Waiting on service nursery to complete") - log.runtime("Service nursery complete") - log.runtime("Waiting on root nursery to complete") + log.runtime( + "Service nursery complete\n" + "Waiting on root nursery to complete" + ) # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) except Exception as err: - log.info("Closing all actor lifetime contexts") + log.runtime("Closing all actor lifetime contexts") actor.lifetime_stack.close() if not registered_with_arbiter: @@ -1357,7 +1408,14 @@ async def async_main( await try_ship_error_to_parent(actor._parent_chan, err) # always! - log.exception("Actor errored:") + match err: + case ContextCancelled(): + log.cancel( + f'Actor: {actor.uid} was task-context-cancelled with,\n' + f'str(err)' + ) + case _: + log.exception("Actor errored:") raise finally: @@ -1424,15 +1482,16 @@ async def process_messages( ) -> bool: ''' - Process messages for the IPC transport channel async-RPC style. + This is the per-channel, low level RPC task scheduler loop. - Receive multiplexed RPC requests, spawn handler tasks and deliver - responses over or boxed errors back to the "caller" task. + Receive multiplexed RPC request messages from some remote process, + spawn handler tasks depending on request type and deliver responses + or boxed errors back to the remote caller (task). ''' # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! - msg = None + msg: dict | None = None nursery_cancelled_before_task: bool = False log.runtime(f"Entering msg loop for {chan} from {chan.uid}") @@ -1454,7 +1513,10 @@ async def process_messages( for (channel, cid) in actor._rpc_tasks.copy(): if channel is chan: - await actor._cancel_task(cid, channel) + await actor._cancel_task( + cid, + channel, + ) log.runtime( f"Msg loop signalled to terminate for" @@ -1468,12 +1530,14 @@ async def process_messages( cid = msg.get('cid') if cid: # deliver response to local caller/waiter + # via its per-remote-context memory channel. await actor._push_result(chan, cid, msg) log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") continue + # TODO: implement with ``match:`` syntax? # process command request try: ns, funcname, kwargs, actorid, cid = msg['cmd'] @@ -1493,13 +1557,12 @@ async def process_messages( f"{ns}.{funcname}({kwargs})") if ns == 'self': - func = getattr(actor, funcname) - if funcname == 'cancel': + func = actor.cancel + kwargs['requesting_uid'] = chan.uid - # don't start entire actor runtime - # cancellation if this actor is in debug - # mode + # don't start entire actor runtime cancellation + # if this actor is currently in debug mode! pdb_complete = _debug.Lock.local_pdb_complete if pdb_complete: await pdb_complete.wait() @@ -1511,43 +1574,56 @@ async def process_messages( # msg loop and break out into # ``async_main()`` log.cancel( - f"Actor {actor.uid} was remotely cancelled " + "Actor runtime for was remotely cancelled " f"by {chan.uid}" ) await _invoke( - actor, cid, chan, func, kwargs, is_rpc=False + actor, + cid, + chan, + func, + kwargs, + is_rpc=False, ) + log.cancel( + f'Cancelling msg loop for {chan.uid}' + ) loop_cs.cancel() break if funcname == '_cancel_task': + func = actor._cancel_task # we immediately start the runtime machinery # shutdown - with trio.CancelScope(shield=True): - # actor.cancel() was called so kill this - # msg loop and break out into - # ``async_main()`` - kwargs['chan'] = chan - log.cancel( - f'Remote request to cancel task\n' - f'remote actor: {chan.uid}\n' - f'task: {cid}' + # with trio.CancelScope(shield=True): + kwargs['chan'] = chan + target_cid = kwargs['cid'] + kwargs['requesting_uid'] = chan.uid + log.cancel( + f'Remote request to cancel task\n' + f'remote actor: {chan.uid}\n' + f'task: {target_cid}' + ) + try: + await _invoke( + actor, + cid, + chan, + func, + kwargs, + is_rpc=False, ) - try: - await _invoke( - actor, - cid, - chan, - func, - kwargs, - is_rpc=False, - ) - except BaseException: - log.exception("failed to cancel task?") - - continue + except BaseException: + log.exception("failed to cancel task?") + + continue + else: + # normally registry methods, eg. + # ``.register_actor()`` etc. + func = getattr(actor, funcname) + else: # complain to client about restricted modules try: @@ -1562,34 +1638,49 @@ async def process_messages( log.runtime(f"Spawning task for {func}") assert actor._service_n try: - cs = await actor._service_n.start( - partial(_invoke, actor, cid, chan, func, kwargs), + ctx: Context = await actor._service_n.start( + partial( + _invoke, + actor, + cid, + chan, + func, + kwargs, + ), name=funcname, ) + except ( RuntimeError, BaseExceptionGroup, ): # avoid reporting a benign race condition # during actor runtime teardown. - nursery_cancelled_before_task = True + nursery_cancelled_before_task: bool = True break - # never allow cancelling cancel requests (results in - # deadlock and other weird behaviour) - # if func != actor.cancel: - if isinstance(cs, Exception): + # in the lone case where a ``Context`` is not + # delivered, it's likely going to be a locally + # scoped exception from ``_invoke()`` itself. + if isinstance(ctx, Exception): log.warning( f"Task for RPC func {func} failed with" - f"{cs}") + f"{ctx}" + ) + continue + else: # mark that we have ongoing rpc tasks actor._ongoing_rpc_tasks = trio.Event() log.runtime(f"RPC func is {func}") + # store cancel scope such that the rpc task can be # cancelled gracefully if requested actor._rpc_tasks[(chan, cid)] = ( - cs, func, trio.Event()) + ctx, + func, + trio.Event(), + ) log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") @@ -1630,7 +1721,15 @@ async def process_messages( else: # ship any "internal" exception (i.e. one from internal # machinery not from an rpc task) to parent - log.exception("Actor errored:") + match err: + case ContextCancelled(): + log.cancel( + f'Actor: {actor.uid} was context-cancelled with,\n' + f'str(err)' + ) + case _: + log.exception("Actor errored:") + if actor._parent_chan: await try_ship_error_to_parent(actor._parent_chan, err) @@ -1642,7 +1741,8 @@ async def process_messages( # msg debugging for when he machinery is brokey log.runtime( f"Exiting msg loop for {chan} from {chan.uid} " - f"with last msg:\n{msg}") + f"with last msg:\n{msg}" + ) # transport **was not** disconnected return False diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 900aea2f1..b1a20f492 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -23,13 +23,12 @@ import platform from typing import ( Any, + Awaitable, Literal, - Optional, Callable, TypeVar, TYPE_CHECKING, ) -from collections.abc import Awaitable from exceptiongroup import BaseExceptionGroup import trio @@ -60,7 +59,7 @@ log = get_logger('tractor') # placeholder for an mp start context if so using that backend -_ctx: Optional[mp.context.BaseContext] = None +_ctx: mp.context.BaseContext | None = None SpawnMethodKey = Literal[ 'trio', # supported on all platforms 'mp_spawn', @@ -86,7 +85,7 @@ async def proc_waiter(proc: mp.Process) -> None: def try_set_start_method( key: SpawnMethodKey -) -> Optional[mp.context.BaseContext]: +) -> mp.context.BaseContext | None: ''' Attempt to set the method for process starting, aka the "actor spawning backend". @@ -200,16 +199,37 @@ async def cancel_on_completion( async def do_hard_kill( proc: trio.Process, terminate_after: int = 3, + ) -> None: # NOTE: this timeout used to do nothing since we were shielding # the ``.wait()`` inside ``new_proc()`` which will pretty much # never release until the process exits, now it acts as # a hard-kill time ultimatum. + log.debug(f"Terminating {proc}") with trio.move_on_after(terminate_after) as cs: - # NOTE: This ``__aexit__()`` shields internally. - async with proc: # calls ``trio.Process.aclose()`` - log.debug(f"Terminating {proc}") + # NOTE: code below was copied verbatim from the now deprecated + # (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc + # string: + # + # Close any pipes we have to the process (both input and output) + # and wait for it to exit. If cancelled, kills the process and + # waits for it to finish exiting before propagating the + # cancellation. + with trio.CancelScope(shield=True): + if proc.stdin is not None: + await proc.stdin.aclose() + if proc.stdout is not None: + await proc.stdout.aclose() + if proc.stderr is not None: + await proc.stderr.aclose() + try: + await proc.wait() + finally: + if proc.returncode is None: + proc.kill() + with trio.CancelScope(shield=True): + await proc.wait() if cs.cancelled_caught: # XXX: should pretty much never get here unless we have @@ -355,12 +375,11 @@ async def trio_proc( spawn_cmd.append("--asyncio") cancelled_during_spawn: bool = False - proc: Optional[trio.Process] = None + proc: trio.Process | None = None try: try: # TODO: needs ``trio_typing`` patch? - proc = await trio.lowlevel.open_process( # type: ignore - spawn_cmd) + proc = await trio.lowlevel.open_process(spawn_cmd) log.runtime(f"Started {proc}") @@ -438,14 +457,14 @@ async def trio_proc( # cancel result waiter that may have been spawned in # tandem if not done already - log.warning( + log.cancel( "Cancelling existing result waiter task for " f"{subactor.uid}") nursery.cancel_scope.cancel() finally: - # The "hard" reap since no actor zombies are allowed! - # XXX: do this **after** cancellation/tearfown to avoid + # XXX NOTE XXX: The "hard" reap since no actor zombies are + # allowed! Do this **after** cancellation/teardown to avoid # killing the process too early. if proc: log.cancel(f'Hard reap sequence starting for {subactor.uid}') diff --git a/tractor/_state.py b/tractor/_state.py index 28fa16e78..f94c3ebb0 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -23,11 +23,6 @@ Any, ) -import trio - -from ._exceptions import NoRuntime - - _current_actor: Optional['Actor'] = None # type: ignore # noqa _runtime_vars: dict[str, Any] = { '_debug_mode': False, @@ -37,8 +32,11 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore # noqa - """Get the process-local actor instance. - """ + ''' + Get the process-local actor instance. + + ''' + from ._exceptions import NoRuntime if _current_actor is None and err_on_no_runtime: raise NoRuntime("No local actor has been initialized yet") @@ -46,16 +44,20 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore # def is_main_process() -> bool: - """Bool determining if this actor is running in the top-most process. - """ + ''' + Bool determining if this actor is running in the top-most process. + + ''' import multiprocessing as mp return mp.current_process().name == 'MainProcess' def debug_mode() -> bool: - """Bool determining if "debug mode" is on which enables + ''' + Bool determining if "debug mode" is on which enables remote subactor pdb entry on crashes. - """ + + ''' return bool(_runtime_vars['_debug_mode']) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index b1129567b..3045b835e 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -14,31 +14,36 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' Message stream types and APIs. -""" +The machinery and types behind ``Context.open_stream()`` + +''' from __future__ import annotations import inspect -from contextlib import asynccontextmanager -from dataclasses import dataclass +from contextlib import asynccontextmanager as acm from typing import ( Any, - Optional, Callable, - AsyncGenerator, - AsyncIterator + AsyncIterator, + TYPE_CHECKING, ) - import warnings import trio -from ._ipc import Channel -from ._exceptions import unpack_error, ContextCancelled -from ._state import current_actor +from ._exceptions import ( + unpack_error, +) from .log import get_logger -from .trionics import broadcast_receiver, BroadcastReceiver +from .trionics import ( + broadcast_receiver, + BroadcastReceiver, +) + +if TYPE_CHECKING: + from ._context import Context log = get_logger(__name__) @@ -70,9 +75,9 @@ class MsgStream(trio.abc.Channel): ''' def __init__( self, - ctx: 'Context', # typing: ignore # noqa + ctx: Context, # typing: ignore # noqa rx_chan: trio.MemoryReceiveChannel, - _broadcaster: Optional[BroadcastReceiver] = None, + _broadcaster: BroadcastReceiver | None = None, ) -> None: self._ctx = ctx @@ -275,7 +280,7 @@ async def aclose(self): # still need to consume msgs that are "in transit" from the far # end (eg. for ``Context.result()``). - @asynccontextmanager + @acm async def subscribe( self, @@ -335,8 +340,8 @@ async def send( Send a message over this stream to the far end. ''' - if self._ctx._error: - raise self._ctx._error # from None + if self._ctx._remote_error: + raise self._ctx._remote_error # from None if self._closed: raise trio.ClosedResourceError('This stream was already closed') @@ -344,371 +349,11 @@ async def send( await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) -@dataclass -class Context: +def stream(func: Callable) -> Callable: ''' - An inter-actor, ``trio`` task communication context. - - NB: This class should never be instatiated directly, it is delivered - by either runtime machinery to a remotely started task or by entering - ``Portal.open_context()``. - - Allows maintaining task or protocol specific state between - 2 communicating actor tasks. A unique context is created on the - callee side/end for every request to a remote actor from a portal. - - A context can be cancelled and (possibly eventually restarted) from - either side of the underlying IPC channel, open task oriented - message streams and acts as an IPC aware inter-actor-task cancel - scope. + Mark an async function as a streaming routine with ``@stream``. ''' - chan: Channel - cid: str - - # these are the "feeder" channels for delivering - # message values to the local task from the runtime - # msg processing loop. - _recv_chan: trio.MemoryReceiveChannel - _send_chan: trio.MemorySendChannel - - _remote_func_type: Optional[str] = None - - # only set on the caller side - _portal: Optional['Portal'] = None # type: ignore # noqa - _result: Optional[Any] = False - _error: Optional[BaseException] = None - - # status flags - _cancel_called: bool = False - _cancel_msg: Optional[str] = None - _enter_debugger_on_cancel: bool = True - _started_called: bool = False - _started_received: bool = False - _stream_opened: bool = False - - # only set on the callee side - _scope_nursery: Optional[trio.Nursery] = None - - _backpressure: bool = False - - async def send_yield(self, data: Any) -> None: - - warnings.warn( - "`Context.send_yield()` is now deprecated. " - "Use ``MessageStream.send()``. ", - DeprecationWarning, - stacklevel=2, - ) - await self.chan.send({'yield': data, 'cid': self.cid}) - - async def send_stop(self) -> None: - await self.chan.send({'stop': True, 'cid': self.cid}) - - async def _maybe_raise_from_remote_msg( - self, - msg: dict[str, Any], - - ) -> None: - ''' - (Maybe) unpack and raise a msg error into the local scope - nursery for this context. - - Acts as a form of "relay" for a remote error raised - in the corresponding remote callee task. - - ''' - error = msg.get('error') - if error: - # If this is an error message from a context opened by - # ``Portal.open_context()`` we want to interrupt any ongoing - # (child) tasks within that context to be notified of the remote - # error relayed here. - # - # The reason we may want to raise the remote error immediately - # is that there is no guarantee the associated local task(s) - # will attempt to read from any locally opened stream any time - # soon. - # - # NOTE: this only applies when - # ``Portal.open_context()`` has been called since it is assumed - # (currently) that other portal APIs (``Portal.run()``, - # ``.run_in_actor()``) do their own error checking at the point - # of the call and result processing. - log.error( - f'Remote context error for {self.chan.uid}:{self.cid}:\n' - f'{msg["error"]["tb_str"]}' - ) - error = unpack_error(msg, self.chan) - if ( - isinstance(error, ContextCancelled) and - self._cancel_called - ): - # this is an expected cancel request response message - # and we don't need to raise it in scope since it will - # potentially override a real error - return - - self._error = error - - # TODO: tempted to **not** do this by-reraising in a - # nursery and instead cancel a surrounding scope, detect - # the cancellation, then lookup the error that was set? - if self._scope_nursery: - - async def raiser(): - raise self._error from None - - # from trio.testing import wait_all_tasks_blocked - # await wait_all_tasks_blocked() - if not self._scope_nursery._closed: # type: ignore - self._scope_nursery.start_soon(raiser) - - async def cancel( - self, - msg: Optional[str] = None, - - ) -> None: - ''' - Cancel this inter-actor-task context. - - Request that the far side cancel it's current linked context, - Timeout quickly in an attempt to sidestep 2-generals... - - ''' - side = 'caller' if self._portal else 'callee' - if msg: - assert side == 'callee', 'Only callee side can provide cancel msg' - - log.cancel(f'Cancelling {side} side of context to {self.chan.uid}') - - self._cancel_called = True - - if side == 'caller': - if not self._portal: - raise RuntimeError( - "No portal found, this is likely a callee side context" - ) - - cid = self.cid - with trio.move_on_after(0.5) as cs: - cs.shield = True - log.cancel( - f"Cancelling stream {cid} to " - f"{self._portal.channel.uid}") - - # NOTE: we're telling the far end actor to cancel a task - # corresponding to *this actor*. The far end local channel - # instance is passed to `Actor._cancel_task()` implicitly. - await self._portal.run_from_ns('self', '_cancel_task', cid=cid) - - if cs.cancelled_caught: - # XXX: there's no way to know if the remote task was indeed - # cancelled in the case where the connection is broken or - # some other network error occurred. - # if not self._portal.channel.connected(): - if not self.chan.connected(): - log.cancel( - "May have failed to cancel remote task " - f"{cid} for {self._portal.channel.uid}") - else: - log.cancel( - "Timed out on cancelling remote task " - f"{cid} for {self._portal.channel.uid}") - - # callee side remote task - else: - self._cancel_msg = msg - - # TODO: should we have an explicit cancel message - # or is relaying the local `trio.Cancelled` as an - # {'error': trio.Cancelled, cid: "blah"} enough? - # This probably gets into the discussion in - # https://github.com/goodboy/tractor/issues/36 - assert self._scope_nursery - self._scope_nursery.cancel_scope.cancel() - - if self._recv_chan: - await self._recv_chan.aclose() - - @asynccontextmanager - async def open_stream( - - self, - backpressure: Optional[bool] = True, - msg_buffer_size: Optional[int] = None, - - ) -> AsyncGenerator[MsgStream, None]: - ''' - Open a ``MsgStream``, a bi-directional stream connected to the - cross-actor (far end) task for this ``Context``. - - This context manager must be entered on both the caller and - callee for the stream to logically be considered "connected". - - A ``MsgStream`` is currently "one-shot" use, meaning if you - close it you can not "re-open" it for streaming and instead you - must re-establish a new surrounding ``Context`` using - ``Portal.open_context()``. In the future this may change but - currently there seems to be no obvious reason to support - "re-opening": - - pausing a stream can be done with a message. - - task errors will normally require a restart of the entire - scope of the inter-actor task context due to the nature of - ``trio``'s cancellation system. - - ''' - actor = current_actor() - - # here we create a mem chan that corresponds to the - # far end caller / callee. - - # Likewise if the surrounding context has been cancelled we error here - # since it likely means the surrounding block was exited or - # killed - - if self._cancel_called: - task = trio.lowlevel.current_task().name - raise ContextCancelled( - f'Context around {actor.uid[0]}:{task} was already cancelled!' - ) - - if not self._portal and not self._started_called: - raise RuntimeError( - 'Context.started()` must be called before opening a stream' - ) - - # NOTE: in one way streaming this only happens on the - # caller side inside `Actor.start_remote_task()` so if you try - # to send a stop from the caller to the callee in the - # single-direction-stream case you'll get a lookup error - # currently. - ctx = actor.get_context( - self.chan, - self.cid, - msg_buffer_size=msg_buffer_size, - ) - ctx._backpressure = backpressure - assert ctx is self - - # XXX: If the underlying channel feeder receive mem chan has - # been closed then likely client code has already exited - # a ``.open_stream()`` block prior or there was some other - # unanticipated error or cancellation from ``trio``. - - if ctx._recv_chan._closed: - raise trio.ClosedResourceError( - 'The underlying channel for this stream was already closed!?') - - async with MsgStream( - ctx=self, - rx_chan=ctx._recv_chan, - ) as stream: - - if self._portal: - self._portal._streams.add(stream) - - try: - self._stream_opened = True - - # XXX: do we need this? - # ensure we aren't cancelled before yielding the stream - # await trio.lowlevel.checkpoint() - yield stream - - # NOTE: Make the stream "one-shot use". On exit, signal - # ``trio.EndOfChannel``/``StopAsyncIteration`` to the - # far end. - await stream.aclose() - - finally: - if self._portal: - try: - self._portal._streams.remove(stream) - except KeyError: - log.warning( - f'Stream was already destroyed?\n' - f'actor: {self.chan.uid}\n' - f'ctx id: {self.cid}' - ) - - async def result(self) -> Any: - ''' - From a caller side, wait for and return the final result from - the callee side task. - - ''' - assert self._portal, "Context.result() can not be called from callee!" - assert self._recv_chan - - if self._result is False: - - if not self._recv_chan._closed: # type: ignore - - # wait for a final context result consuming - # and discarding any bi dir stream msgs still - # in transit from the far end. - while True: - - msg = await self._recv_chan.receive() - try: - self._result = msg['return'] - break - except KeyError as msgerr: - - if 'yield' in msg: - # far end task is still streaming to us so discard - log.warning(f'Discarding stream delivered {msg}') - continue - - elif 'stop' in msg: - log.debug('Remote stream terminated') - continue - - # internal error should never get here - assert msg.get('cid'), ( - "Received internal error at portal?") - - raise unpack_error( - msg, self._portal.channel - ) from msgerr - - return self._result - - async def started( - self, - value: Optional[Any] = None - - ) -> None: - ''' - Indicate to calling actor's task that this linked context - has started and send ``value`` to the other side. - - On the calling side ``value`` is the second item delivered - in the tuple returned by ``Portal.open_context()``. - - ''' - if self._portal: - raise RuntimeError( - f"Caller side context {self} can not call started!") - - elif self._started_called: - raise RuntimeError( - f"called 'started' twice on context with {self.chan.uid}") - - await self.chan.send({'started': value, 'cid': self.cid}) - self._started_called = True - - # TODO: do we need a restart api? - # async def restart(self) -> None: - # pass - - -def stream(func: Callable) -> Callable: - """Mark an async function as a streaming routine with ``@stream``. - - """ - # annotate # TODO: apply whatever solution ``mypy`` ends up picking for this: # https://github.com/python/mypy/issues/2087#issuecomment-769266912 func._tractor_stream_function = True # type: ignore @@ -734,22 +379,3 @@ def stream(func: Callable) -> Callable: "(Or ``to_trio`` if using ``asyncio`` in guest mode)." ) return func - - -def context(func: Callable) -> Callable: - """Mark an async function as a streaming routine with ``@context``. - - """ - # annotate - # TODO: apply whatever solution ``mypy`` ends up picking for this: - # https://github.com/python/mypy/issues/2087#issuecomment-769266912 - func._tractor_context_function = True # type: ignore - - sig = inspect.signature(func) - params = sig.parameters - if 'ctx' not in params: - raise TypeError( - "The first argument to the context function " - f"{func.__name__} must be `ctx: tractor.Context`" - ) - return func diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 3085272af..7f77784b5 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -302,7 +302,7 @@ async def _open_and_supervise_one_cancels_all_nursery( ) -> typing.AsyncGenerator[ActorNursery, None]: # TODO: yay or nay? - # __tracebackhide__ = True + __tracebackhide__ = True # the collection of errors retreived from spawned sub-actors errors: dict[tuple[str, str], BaseException] = {} diff --git a/tractor/experimental/_pubsub.py b/tractor/experimental/_pubsub.py index 99117b01b..89f286d2c 100644 --- a/tractor/experimental/_pubsub.py +++ b/tractor/experimental/_pubsub.py @@ -37,7 +37,7 @@ import wrapt from ..log import get_logger -from .._streaming import Context +from .._context import Context __all__ = ['pub'] @@ -148,7 +148,8 @@ def pub( *, tasks: set[str] = set(), ): - """Publisher async generator decorator. + ''' + Publisher async generator decorator. A publisher can be called multiple times from different actors but will only spawn a finite set of internal tasks to stream values to @@ -227,7 +228,8 @@ async def pub_service(get_topics): running in a single actor to stream data to an arbitrary number of subscribers. If you are ok to have a new task running for every call to ``pub_service()`` then probably don't need this. - """ + + ''' global _pubtask2lock # handle the decorator not called with () case diff --git a/tractor/log.py b/tractor/log.py index 342257fa6..1ea99315a 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -82,6 +82,10 @@ def transport( msg: str, ) -> None: + ''' + IPC level msg-ing. + + ''' return self.log(5, msg) def runtime( @@ -94,12 +98,20 @@ def cancel( self, msg: str, ) -> None: + ''' + Cancellation logging, mostly for runtime reporting. + + ''' return self.log(16, msg) def pdb( self, msg: str, ) -> None: + ''' + Debugger logging. + + ''' return self.log(500, msg) def log(self, level, msg, *args, **kwargs): diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 5621f79db..89db895b3 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -237,7 +237,7 @@ async def maybe_open_context( yielded = _Cache.values[ctx_key] except KeyError: - log.info(f'Allocating new {acm_func} for {ctx_key}') + log.debug(f'Allocating new {acm_func} for {ctx_key}') mngr = acm_func(**kwargs) resources = _Cache.resources assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' @@ -265,7 +265,7 @@ async def maybe_open_context( if yielded is not None: # if no more consumers, teardown the client if _Cache.users <= 0: - log.info(f'De-allocating resource for {ctx_key}') + log.debug(f'De-allocating resource for {ctx_key}') # XXX: if we're cancelled we the entry may have never # been entered since the nursery task was killed.