Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncio (mode) debugger support #362

Draft
wants to merge 25 commits into
base: ctx_cancel_semantics_and_overruns
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9716d86
Initial module import from `piker.data._sharemem`
goodboy Oct 15, 2022
7147729
Add `ShmList` wrapping the stdlib's `ShareableList`
goodboy Oct 16, 2022
c32b21b
Add initial readers-writer shm list tests
goodboy Oct 16, 2022
339d787
Add repetitive attach to existing segment test
goodboy Oct 17, 2022
edb82fd
Don't require runtime (for now), type annot fixing
goodboy Oct 17, 2022
1713ecd
Rename token type to `NDToken` in the style of `nptyping`
goodboy Oct 17, 2022
b52ff27
Add `ShmList` slice support in `.__getitem__()`
goodboy Oct 18, 2022
a9fc4c1
Parametrize rw test with variable frame sizes
goodboy Oct 18, 2022
e0bf964
Mod define `_USE_POSIX`, add a of of todos
goodboy Oct 18, 2022
f9a84f0
Allocate size-specced "empty" sequence from default values by type
goodboy Oct 19, 2022
4f442ef
Pass `str` dtype for `use_str` case
goodboy Oct 20, 2022
f745da9
Add `numpy` for testing optional integrated shm API layer
goodboy Oct 26, 2022
ebcb275
Add (first-draft) infected-`asyncio` actor task uses debugger example
goodboy Mar 7, 2023
ee87cf0
Add a debug-mode-breakpoint-causes-hang case!
goodboy Mar 27, 2023
fc56971
First proto: use `greenback` for sync func breakpointing
goodboy Jun 21, 2023
ac695a0
Updates from latest `piker.data._sharedmem` changes
goodboy Jun 22, 2023
565d7c3
Add longer "required reading" list B)
goodboy Jul 7, 2023
46972df
.log: more correct handling for `get_logger(__name__)` usage
goodboy Jul 7, 2023
98a7326
._runtime: log level tweaks, use crit for stale debug lock detection
goodboy Jul 7, 2023
4ace8f6
Fix frame-selection display on first REPL entry
goodboy Jul 7, 2023
b36b3d5
Map `breakpoint()` built-in to new `.pause_from_sync()` ep
goodboy Jul 7, 2023
bee2c36
Make `NamespacePath` work on object refs
goodboy Jul 12, 2023
e03bec5
Move `.to_asyncio` to modern optional value type annots
goodboy Jul 21, 2023
1102843
Teensie tidy up on actor doc string
goodboy Aug 18, 2023
22c14e2
Expose `Channel` @ pkg level, drop `_debug.pp()` alias
goodboy Aug 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions docs/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
|gh_actions|
|docs|

``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_.
``tractor`` is a `structured concurrent`_, (optionally
distributed_) multi-processing_ runtime built on trio_.

Fundamentally, ``tractor`` gives you parallelism via
``trio``-"*actors*": independent Python processes (aka
Expand All @@ -17,11 +17,20 @@ protocol" constructed on top of multiple Pythons each running a ``trio``
scheduled runtime - a call to ``trio.run()``.

We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
but likely *does not* look like what *you* probably think an "actor
model" looks like, and that's *intentional*.
but likely **does not** look like what **you** probably *think* an "actor
model" looks like, and that's **intentional**.

The first step to grok ``tractor`` is to get the basics of ``trio`` down.
A great place to start is the `trio docs`_ and this `blog post`_.

Where do i start!?
------------------
The first step to grok ``tractor`` is to get an intermediate
knowledge of ``trio`` and **structured concurrency** B)

Some great places to start are,
- the seminal `blog post`_
- obviously the `trio docs`_
- wikipedia's nascent SC_ page
- the fancy diagrams @ libdill-docs_


Features
Expand Down Expand Up @@ -593,6 +602,7 @@ matrix seems too hip, we're also mostly all in the the `trio gitter
channel`_!

.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _distributed: https://en.wikipedia.org/wiki/Distributed_computing
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
.. _trio: https://github.com/python-trio/trio
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
Expand All @@ -611,8 +621,9 @@ channel`_!
.. _trio docs: https://trio.readthedocs.io/en/latest/
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _SC: https://en.wikipedia.org/wiki/Structured_concurrency
.. _libdill-docs: https://sustrik.github.io/libdill/structured-concurrency.html
.. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
.. _async generators: https://www.python.org/dev/peps/pep-0525/
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel
Expand Down
117 changes: 117 additions & 0 deletions examples/debugging/asyncio_bp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import asyncio

import trio
import tractor
from tractor import to_asyncio


async def aio_sleep_forever():
await asyncio.sleep(float('inf'))


async def bp_then_error(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,

raise_after_bp: bool = True,

) -> None:

# sync with ``trio``-side (caller) task
to_trio.send_nowait('start')

# NOTE: what happens here inside the hook needs some refinement..
# => seems like it's still `._debug._set_trace()` but
# we set `Lock.local_task_in_debug = 'sync'`, we probably want
# some further, at least, meta-data about the task/actoq in debug
# in terms of making it clear it's asyncio mucking about.
breakpoint()

# short checkpoint / delay
await asyncio.sleep(0.5)

if raise_after_bp:
raise ValueError('blah')

# TODO: test case with this so that it gets cancelled?
else:
# XXX NOTE: this is required in order to get the SIGINT-ignored
# hang case documented in the module script section!
await aio_sleep_forever()


@tractor.context
async def trio_ctx(
ctx: tractor.Context,
bp_before_started: bool = False,
):

# this will block until the ``asyncio`` task sends a "first"
# message, see first line in above func.
async with (

to_asyncio.open_channel_from(
bp_then_error,
raise_after_bp=not bp_before_started,
) as (first, chan),

trio.open_nursery() as n,
):

assert first == 'start'

if bp_before_started:
await tractor.breakpoint()

await ctx.started(first)

n.start_soon(
to_asyncio.run_task,
aio_sleep_forever,
)
await trio.sleep_forever()


async def main(
bps_all_over: bool = False,

) -> None:

async with tractor.open_nursery() as n:

p = await n.start_actor(
'aio_daemon',
enable_modules=[__name__],
infect_asyncio=True,
debug_mode=True,
loglevel='cancel',
)

async with p.open_context(
trio_ctx,
bp_before_started=bps_all_over,
) as (ctx, first):

assert first == 'start'

if bps_all_over:
await tractor.breakpoint()

# await trio.sleep_forever()
await ctx.cancel()
assert 0

# TODO: case where we cancel from trio-side while asyncio task
# has debugger lock?
# await p.cancel_actor()


if __name__ == '__main__':

# works fine B)
trio.run(main)

# will hang and ignores SIGINT !!
# NOTE: you'll need to send a SIGQUIT (via ctl-\) to kill it
# manually..
# trio.run(main, True)
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ mypy
trio_typing
pexpect
towncrier
numpy
167 changes: 167 additions & 0 deletions tests/test_shm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""
Shared mem primitives and APIs.

"""
import uuid

# import numpy
import pytest
import trio
import tractor
from tractor._shm import (
open_shm_list,
attach_shm_list,
)


@tractor.context
async def child_attach_shml_alot(
ctx: tractor.Context,
shm_key: str,
) -> None:

await ctx.started(shm_key)

# now try to attach a boatload of times in a loop..
for _ in range(1000):
shml = attach_shm_list(
key=shm_key,
readonly=False,
)
assert shml.shm.name == shm_key
await trio.sleep(0.001)


def test_child_attaches_alot():
async def main():
async with tractor.open_nursery() as an:

# allocate writeable list in parent
key = f'shml_{uuid.uuid4()}'
shml = open_shm_list(
key=key,
)

portal = await an.start_actor(
'shm_attacher',
enable_modules=[__name__],
)

async with (
portal.open_context(
child_attach_shml_alot,
shm_key=shml.key,
) as (ctx, start_val),
):
assert start_val == key
await ctx.result()

await portal.cancel_actor()

trio.run(main)


@tractor.context
async def child_read_shm_list(
ctx: tractor.Context,
shm_key: str,
use_str: bool,
frame_size: int,
) -> None:

# attach in child
shml = attach_shm_list(
key=shm_key,
# dtype=str if use_str else float,
)
await ctx.started(shml.key)

async with ctx.open_stream() as stream:
async for i in stream:
print(f'(child): reading shm list index: {i}')

if use_str:
expect = str(float(i))
else:
expect = float(i)

if frame_size == 1:
val = shml[i]
assert expect == val
print(f'(child): reading value: {val}')
else:
frame = shml[i - frame_size:i]
print(f'(child): reading frame: {frame}')


@pytest.mark.parametrize(
'use_str',
[False, True],
ids=lambda i: f'use_str_values={i}',
)
@pytest.mark.parametrize(
'frame_size',
[1, 2**6, 2**10],
ids=lambda i: f'frame_size={i}',
)
def test_parent_writer_child_reader(
use_str: bool,
frame_size: int,
):

async def main():
async with tractor.open_nursery(
# debug_mode=True,
) as an:

portal = await an.start_actor(
'shm_reader',
enable_modules=[__name__],
debug_mode=True,
)

# allocate writeable list in parent
key = 'shm_list'
seq_size = int(2 * 2 ** 10)
shml = open_shm_list(
key=key,
size=seq_size,
dtype=str if use_str else float,
readonly=False,
)

async with (
portal.open_context(
child_read_shm_list,
shm_key=key,
use_str=use_str,
frame_size=frame_size,
) as (ctx, sent),

ctx.open_stream() as stream,
):

assert sent == key

for i in range(seq_size):

val = float(i)
if use_str:
val = str(val)

# print(f'(parent): writing {val}')
shml[i] = val

# only on frame fills do we
# signal to the child that a frame's
# worth is ready.
if (i % frame_size) == 0:
print(f'(parent): signalling frame full on {val}')
await stream.send(i)
else:
print(f'(parent): signalling final frame on {val}')
await stream.send(i)

await portal.cancel_actor()

trio.run(main)
9 changes: 7 additions & 2 deletions tractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from exceptiongroup import BaseExceptionGroup

from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._context import (
Context,
context,
Expand All @@ -48,38 +47,44 @@
)
from ._debug import (
breakpoint,
pause,
pause_from_sync,
post_mortem,
)
from . import msg
from ._root import (
run_daemon,
open_root_actor,
)
from ._ipc import Channel
from ._portal import Portal
from ._runtime import Actor


__all__ = [
'Actor',
'BaseExceptionGroup',
'Channel',
'Context',
'ContextCancelled',
'ModuleNotExposed',
'MsgStream',
'BaseExceptionGroup',
'Portal',
'RemoteActorError',
'breakpoint',
'context',
'current_actor',
'find_actor',
'query_actor',
'get_arbiter',
'is_root_process',
'msg',
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'pause',
'post_mortem',
'pause_from_sync',
'query_actor',
'run_daemon',
'stream',
Expand Down
Loading