From 9980f3fe437b775a9c1937abd9e4ea3a413ab8ff Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 19 Jun 2019 01:10:53 -0700 Subject: [PATCH 1/6] Refactor channel interfaces Make MemorySendChannel and MemoryReceiveChannel into public classes, and move a number of the methods that used to be on the abstract SendChannel/ReceiveChannel interfaces into the Memory*Channel concrete classes. Also add a Channel type, analogous to Stream. See: gh-719 Still to do: - Merge MemorySendChannel and MemoryReceiveChannel into a single MemoryChannel - decide what to do about clone - decide whether to add some kind of half-close on Channel - refactor this PR's one-off solution to gh-1092 into something more general. --- docs/source/reference-core.rst | 32 +++++--- docs/source/reference-io.rst | 17 +++- newsfragments/719.feature.rst | 6 ++ trio/__init__.py | 4 +- trio/_abc.py | 138 +++++++-------------------------- trio/_channel.py | 115 ++++++++++++++++++++++----- trio/_util.py | 10 +-- trio/abc.py | 2 +- 8 files changed, 168 insertions(+), 156 deletions(-) create mode 100644 newsfragments/719.feature.rst diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 1f60031792..9a99252150 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1212,7 +1212,7 @@ Using channels to pass values between tasks different tasks. They're particularly useful for implementing producer/consumer patterns. -The channel API is defined by the abstract base classes +The core channel API is defined by the abstract base classes :class:`trio.abc.SendChannel` and :class:`trio.abc.ReceiveChannel`. You can use these to implement your own custom channels, that do things like pass objects between processes or over the network. But in @@ -1228,14 +1228,23 @@ inside a single process, and for that you can use what you use when you're looking for a queue. The main difference is that Trio splits the classic queue interface up into two objects. The advantage of this is that it makes it possible to put - the two ends in different processes, and that we can close the two - sides separately. + the two ends in different processes without rewriting your code, + and that we can close the two sides separately. + +`MemorySendChannel` and `MemoryReceiveChannel` also expose several +more features beyond the core channel interface: + +.. autoclass:: MemorySendChannel + :members: + +.. autoclass:: MemoryReceiveChannel + :members: A simple channel example ++++++++++++++++++++++++ -Here's a simple example of how to use channels: +Here's a simple example of how to use memory channels: .. literalinclude:: reference-core/channels-simple.py @@ -1347,14 +1356,13 @@ program above: .. literalinclude:: reference-core/channels-mpmc-fixed.py :emphasize-lines: 7, 9, 10, 12, 13 -This example demonstrates using the :meth:`SendChannel.clone -` and :meth:`ReceiveChannel.clone -` methods. What these do is create -copies of our endpoints, that act just like the original – except that -they can be closed independently. And the underlying channel is only -closed after *all* the clones have been closed. So this completely -solves our problem with shutdown, and if you run this program, you'll -see it print its six lines of output and then exits cleanly. +This example demonstrates using the `MemorySendChannel.clone` and +`MemoryReceiveChannel.clone` methods. What these do is create copies +of our endpoints, that act just like the original – except that they +can be closed independently. And the underlying channel is only closed +after *all* the clones have been closed. So this completely solves our +problem with shutdown, and if you run this program, you'll see it +print its six lines of output and then exits cleanly. Notice a small trick we use: the code in ``main`` creates clone objects to pass into all the child tasks, and then closes the original diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index e395f2c6fb..945c82b1c1 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -117,14 +117,19 @@ Abstract base classes - :class:`~trio.SocketListener`, :class:`~trio.SSLListener` * - :class:`SendChannel` - :class:`AsyncResource` - - :meth:`~SendChannel.send`, :meth:`~SendChannel.send_nowait` + - :meth:`~SendChannel.send` - - - :func:`~trio.open_memory_channel` + - `~trio.MemorySendChannel` * - :class:`ReceiveChannel` - :class:`AsyncResource` - - :meth:`~ReceiveChannel.receive`, :meth:`~ReceiveChannel.receive_nowait` + - :meth:`~ReceiveChannel.receive` - ``__aiter__``, ``__anext__`` - - :func:`~trio.open_memory_channel` + - `~trio.MemoryReceiveChannel` + * - `Channel` + - `SendChannel`, `ReceiveChannel` + - + - + - .. autoclass:: trio.abc.AsyncResource :members: @@ -165,6 +170,10 @@ Abstract base classes :members: :show-inheritance: +.. autoclass:: trio.abc.Channel + :members: + :show-inheritance: + .. currentmodule:: trio diff --git a/newsfragments/719.feature.rst b/newsfragments/719.feature.rst new file mode 100644 index 0000000000..d706afd9ae --- /dev/null +++ b/newsfragments/719.feature.rst @@ -0,0 +1,6 @@ +We cleaned up the distinction between the "abstract channel interface" +and the "memory channel" concrete implementation. +`trio.abc.SendChannel` and `trio.abc.ReceiveChannel` have been slimmed +down, `trio.SendMemoryChannel` and `trio.ReceiveMemoryChannel` are now +public types that can be used in type hints, and there's a new +`trio.abc.Channel` interface for future bidirectional channels. diff --git a/trio/__init__.py b/trio/__init__.py index 652253e2fe..eb49e3c8cb 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -38,7 +38,9 @@ from ._highlevel_generic import aclose_forcefully, StapledStream -from ._channel import open_memory_channel +from ._channel import ( + open_memory_channel, MemorySendChannel, MemoryReceiveChannel +) from ._signals import open_signal_receiver diff --git a/trio/_abc.py b/trio/_abc.py index 08c145851e..61bd584366 100644 --- a/trio/_abc.py +++ b/trio/_abc.py @@ -484,15 +484,18 @@ async def send_eof(self): """ +# A regular invariant generic type +T = TypeVar("T") + # The type of object produced by a ReceiveChannel (covariant because # ReceiveChannel[Derived] can be passed to someone expecting # ReceiveChannel[Base]) -T_co = TypeVar("T_co", covariant=True) +ReceiveType = TypeVar("ReceiveType", covariant=True) # The type of object accepted by a SendChannel (contravariant because # SendChannel[Base] can be passed to someone expecting # SendChannel[Derived]) -T_contra = TypeVar("T_contra", contravariant=True) +SendType = TypeVar("SendType", contravariant=True) # The type of object produced by a Listener (covariant plus must be # an AsyncResource) @@ -537,39 +540,21 @@ async def accept(self): """ -class SendChannel(AsyncResource, Generic[T_contra]): +class SendChannel(AsyncResource, Generic[SendType]): """A standard interface for sending Python objects to some receiver. - :class:`SendChannel` objects also implement the :class:`AsyncResource` - interface, so they can be closed by calling :meth:`~AsyncResource.aclose` - or using an ``async with`` block. + `SendChannel` objects also implement the `AsyncResource` interface, so + they can be closed by calling `~AsyncResource.aclose` or using an ``async + with`` block. If you want to send raw bytes rather than Python objects, see - :class:`ReceiveStream`. + `ReceiveStream`. """ __slots__ = () @abstractmethod - def send_nowait(self, value): - """Attempt to send an object through the channel, without blocking. - - Args: - value (object): The object to send. - - Raises: - trio.WouldBlock: if the operation cannot be completed immediately - (for example, because the channel's internal buffer is full). - trio.BrokenResourceError: if something has gone wrong, and the - channel is broken. For example, you may get this if the receiver - has already been closed. - trio.ClosedResourceError: if you previously closed this - :class:`SendChannel` object. - - """ - - @abstractmethod - async def send(self, value): + async def send(self, value: SendType) -> None: """Attempt to send an object through the channel, blocking if necessary. Args: @@ -585,33 +570,8 @@ async def send(self, value): """ - @abstractmethod - def clone(self): - """Clone this send channel object. - - This returns a new :class:`SendChannel` object, which acts as a - duplicate of the original: sending on the new object does exactly the - same thing as sending on the old object. - - However, closing one of the objects does not close the other, and - receivers don't get :exc:`~trio.EndOfChannel` until *all* clones have - been closed. - - This is useful for communication patterns that involve multiple - producers all sending objects to the same destination. If you give - each producer its own clone of the :class:`SendChannel`, and then make - sure to close each :class:`SendChannel` when it's finished, receivers - will automatically get notified when all producers are finished. See - :ref:`channel-mpmc` for examples. - - Raises: - trio.ClosedResourceError: if you already closed this - :class:`SendChannel` object. - - """ - -class ReceiveChannel(AsyncResource, Generic[T_co]): +class ReceiveChannel(AsyncResource, Generic[ReceiveType]): """A standard interface for receiving Python objects from some sender. You can iterate over a :class:`ReceiveChannel` using an ``async for`` @@ -621,45 +581,22 @@ class ReceiveChannel(AsyncResource, Generic[T_co]): ... This is equivalent to calling :meth:`receive` repeatedly. The loop exits - without error when :meth:`receive` raises :exc:`~trio.EndOfChannel`. + without error when `receive` raises `~trio.EndOfChannel`. - :class:`ReceiveChannel` objects also implement the :class:`AsyncResource` - interface, so they can be closed by calling :meth:`~AsyncResource.aclose` - or using an ``async with`` block. + `ReceiveChannel` objects also implement the `AsyncResource` interface, so + they can be closed by calling `~AsyncResource.aclose` or using an ``async + with`` block. If you want to receive raw bytes rather than Python objects, see - :class:`ReceiveStream`. + `ReceiveStream`. """ __slots__ = () @abstractmethod - def receive_nowait(self): - """Attempt to receive an incoming object, without blocking. - - Returns: - object: Whatever object was received. - - Raises: - trio.WouldBlock: if the operation cannot be completed immediately - (for example, because no object has been sent yet). - trio.EndOfChannel: if the sender has been closed cleanly, and no - more objects are coming. This is not an error condition. - trio.ClosedResourceError: if you previously closed this - :class:`ReceiveChannel` object. - trio.BrokenResourceError: if something has gone wrong, and the - channel is broken. - - """ - - @abstractmethod - async def receive(self): + async def receive(self) -> ReceiveType: """Attempt to receive an incoming object, blocking if necessary. - It's legal for multiple tasks to call :meth:`receive` at the same - time. If this happens, then one task receives the first value sent, - another task receives the next value sent, and so on. - Returns: object: Whatever object was received. @@ -673,40 +610,21 @@ async def receive(self): """ - @abstractmethod - def clone(self): - """Clone this receive channel object. - - This returns a new :class:`ReceiveChannel` object, which acts as a - duplicate of the original: receiving on the new object does exactly - the same thing as receiving on the old object. - - However, closing one of the objects does not close the other, and the - underlying channel is not closed until all clones are closed. - - This is useful for communication patterns involving multiple consumers - all receiving objects from the same underlying channel. See - :ref:`channel-mpmc` for examples. - - .. warning:: The clones all share the same underlying channel. - Whenever a clone :meth:`receive`\\s a value, it is removed from the - channel and the other clones do *not* receive that value. If you - want to send multiple copies of the same stream of values to - multiple destinations, like :func:`itertools.tee`, then you need to - find some other solution; this method does *not* do that. - - Raises: - trio.ClosedResourceError: if you already closed this - :class:`SendChannel` object. - - """ - @aiter_compat def __aiter__(self): return self - async def __anext__(self): + async def __anext__(self) -> ReceiveType: try: return await self.receive() except trio.EndOfChannel: raise StopAsyncIteration + + +class Channel(SendChannel[T], ReceiveChannel[T]): + """A standard interface for interacting with bidirectional channels. + + A `Channel` is an object that implements both the `SendChannel` and + `ReceiveChannel` interfaces, so you can both send and receive objects. + + """ diff --git a/trio/_channel.py b/trio/_channel.py index c3074537a6..30ed7caa00 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -4,8 +4,8 @@ import attr from outcome import Error, Value -from .abc import SendChannel, ReceiveChannel -from ._util import generic_function +from .abc import SendChannel, ReceiveChannel, Channel +from ._util import generic_function, NoPublicConstructor import trio from ._core import enable_ki_protection @@ -20,12 +20,12 @@ def open_memory_channel(max_buffer_size): of serialization. They just pass Python objects directly between tasks (with a possible stop in an internal buffer along the way). - Channel objects can be closed by calling - :meth:`~trio.abc.AsyncResource.aclose` or using ``async with``. They are - *not* automatically closed when garbage collected. Closing memory channels - isn't mandatory, but it is generally a good idea, because it helps avoid - situations where tasks get stuck waiting on a channel when there's no-one - on the other side. See :ref:`channel-shutdown` for details. + Channel objects can be closed by calling `~trio.abc.AsyncResource.aclose` + or using ``async with``. They are *not* automatically closed when garbage + collected. Closing memory channels isn't mandatory, but it is generally a + good idea, because it helps avoid situations where tasks get stuck waiting + on a channel when there's no-one on the other side. See + :ref:`channel-shutdown` for details. Args: max_buffer_size (int or math.inf): The maximum number of items that can @@ -48,11 +48,11 @@ def open_memory_channel(max_buffer_size): * ``max_buffer_size``: The maximum number of items allowed in the buffer, as passed to :func:`open_memory_channel`. * ``open_send_channels``: The number of open - :class:`~trio.abc.SendChannel` endpoints pointing to this channel. + :class:`MemorySendChannel` endpoints pointing to this channel. Initially 1, but can be increased by - :meth:`~trio.abc.SendChannel.clone`. + :meth:`MemorySendChannel.clone`. * ``open_receive_channels``: Likewise, but for open - :class:`~trio.abc.ReceiveChannel` endpoints. + :class:`MemoryReceiveChannel` endpoints. * ``tasks_waiting_send``: The number of tasks blocked in ``send`` on this channel (summing over all clones). * ``tasks_waiting_receive``: The number of tasks blocked in ``receive`` on @@ -64,11 +64,13 @@ def open_memory_channel(max_buffer_size): if max_buffer_size < 0: raise ValueError("max_buffer_size must be >= 0") state = MemoryChannelState(max_buffer_size) - return MemorySendChannel(state), MemoryReceiveChannel(state) + return ( + MemorySendChannel._create(state), MemoryReceiveChannel._create(state) + ) @attr.s(frozen=True) -class ChannelStats: +class MemoryChannelStats: current_buffer_used = attr.ib() max_buffer_size = attr.ib() open_send_channels = attr.ib() @@ -76,7 +78,6 @@ class ChannelStats: tasks_waiting_send = attr.ib() tasks_waiting_receive = attr.ib() - @attr.s class MemoryChannelState: max_buffer_size = attr.ib() @@ -90,7 +91,7 @@ class MemoryChannelState: receive_tasks = attr.ib(factory=OrderedDict) def statistics(self): - return ChannelStats( + return MemoryChannelStats( current_buffer_used=len(self.data), max_buffer_size=self.max_buffer_size, open_send_channels=self.open_send_channels, @@ -100,8 +101,19 @@ def statistics(self): ) +try: + from typing import GenericMeta +except ImportError: + from abc import ABCMeta + class _GenericNoPublicConstructor(NoPublicConstructor, ABCMeta): + pass +else: + class _GenericNoPublicConstructor(NoPublicConstructor, GenericMeta): + pass + + @attr.s(cmp=False, repr=False) -class MemorySendChannel(SendChannel): +class MemorySendChannel(SendChannel, metaclass=_GenericNoPublicConstructor): _state = attr.ib() _closed = attr.ib(default=False) # This is just the tasks waiting on *this* object. As compared to @@ -125,6 +137,10 @@ def statistics(self): @enable_ki_protection def send_nowait(self, value): + """Like `~trio.abc.SendChannel.send`, but if the channel's buffer is + full, raises `WouldBlock` instead of blocking. + + """ if self._closed: raise trio.ClosedResourceError if self._state.open_receive_channels == 0: @@ -141,6 +157,9 @@ def send_nowait(self, value): @enable_ki_protection async def send(self, value): + """See `~trio.abc.SendChannel.send`. + + """ await trio.hazmat.checkpoint_if_cancelled() try: self.send_nowait(value) @@ -164,9 +183,32 @@ def abort_fn(_): @enable_ki_protection def clone(self): + """Clone this send channel object. + + This returns a new `MemorySendChannel` object, which acts as a + duplicate of the original: sending on the new object does exactly the + same thing as sending on the old object. (If you're familiar with + `os.dup`, then this is a similar idea.) + + However, closing one of the objects does not close the other, and + receivers don't get `EndOfChannel` until *all* clones have been + closed. + + This is useful for communication patterns that involve multiple + producers all sending objects to the same destination. If you give + each producer its own clone of the `MemorySendChannel`, and then make + sure to close each `MemorySendChannel` when it's finished, receivers + will automatically get notified when all producers are finished. See + :ref:`channel-mpmc` for examples. + + Raises: + trio.ClosedResourceError: if you already closed this + `MemorySendChannel` object. + + """ if self._closed: raise trio.ClosedResourceError - return MemorySendChannel(self._state) + return MemorySendChannel._create(self._state) @enable_ki_protection async def aclose(self): @@ -189,7 +231,9 @@ async def aclose(self): @attr.s(cmp=False, repr=False) -class MemoryReceiveChannel(ReceiveChannel): +class MemoryReceiveChannel( + ReceiveChannel, metaclass=_GenericNoPublicConstructor +): _state = attr.ib() _closed = attr.ib(default=False) _tasks = attr.ib(factory=set) @@ -207,6 +251,10 @@ def __repr__(self): @enable_ki_protection def receive_nowait(self): + """Like `~trio.abc.ReceiveChannel.receive`, but if there's nothing + ready to receive, raises `WouldBlock` instead of blocking. + + """ if self._closed: raise trio.ClosedResourceError if self._state.send_tasks: @@ -223,6 +271,9 @@ def receive_nowait(self): @enable_ki_protection async def receive(self): + """See `~trio.abc.ReceiveChannel.receive`. + + """ await trio.hazmat.checkpoint_if_cancelled() try: value = self.receive_nowait() @@ -246,9 +297,35 @@ def abort_fn(_): @enable_ki_protection def clone(self): + """Clone this receive channel object. + + This returns a new `MemoryReceiveChannel` object, which acts as a + duplicate of the original: receiving on the new object does exactly + the same thing as receiving on the old object. + + However, closing one of the objects does not close the other, and the + underlying channel is not closed until all clones are closed. (If + you're familiar with `os.dup`, then this is a similar idea.) + + This is useful for communication patterns that involve multiple + consumers all receiving objects from the same underlying channel. See + :ref:`channel-mpmc` for examples. + + .. warning:: The clones all share the same underlying channel. + Whenever a clone :meth:`receive`\\s a value, it is removed from the + channel and the other clones do *not* receive that value. If you + want to send multiple copies of the same stream of values to + multiple destinations, like :func:`itertools.tee`, then you need to + find some other solution; this method does *not* do that. + + Raises: + trio.ClosedResourceError: if you already closed this + `MemoryReceiveChannel` object. + + """ if self._closed: raise trio.ClosedResourceError - return MemoryReceiveChannel(self._state) + return MemoryReceiveChannel._create(self._state) @enable_ki_protection async def aclose(self): diff --git a/trio/_util.py b/trio/_util.py index 41f3f681d5..edbe1dfa32 100644 --- a/trio/_util.py +++ b/trio/_util.py @@ -1,5 +1,6 @@ # Little utilities we use internally +from abc import ABCMeta import os import signal import sys @@ -17,15 +18,6 @@ import trio -__all__ = [ - "signal_raise", - "aiter_compat", - "ConflictDetector", - "fixup_module_metadata", - "fspath", - "generic_function", -] - # Equivalent to the C function raise(), which Python doesn't wrap if os.name == "nt": # On windows, os.kill exists but is really weird. diff --git a/trio/abc.py b/trio/abc.py index 5eb4ec8ef4..e3348360e4 100644 --- a/trio/abc.py +++ b/trio/abc.py @@ -7,5 +7,5 @@ from ._abc import ( Clock, Instrument, AsyncResource, SendStream, ReceiveStream, Stream, HalfCloseableStream, SocketFactory, HostnameResolver, Listener, - SendChannel, ReceiveChannel + SendChannel, ReceiveChannel, Channel ) From 17fce6287dc8737724e841404f196c65e9666a64 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 19 Jun 2019 02:22:10 -0700 Subject: [PATCH 2/6] yapf --- trio/_channel.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/trio/_channel.py b/trio/_channel.py index 30ed7caa00..dd1345cd12 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -78,6 +78,7 @@ class MemoryChannelStats: tasks_waiting_send = attr.ib() tasks_waiting_receive = attr.ib() + @attr.s class MemoryChannelState: max_buffer_size = attr.ib() @@ -105,9 +106,11 @@ def statistics(self): from typing import GenericMeta except ImportError: from abc import ABCMeta + class _GenericNoPublicConstructor(NoPublicConstructor, ABCMeta): pass else: + class _GenericNoPublicConstructor(NoPublicConstructor, GenericMeta): pass From e6c6c408835913f314e945f29bf0fe136e534483 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 19 Jun 2019 02:22:42 -0700 Subject: [PATCH 3/6] fix newsfragment --- newsfragments/719.feature.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/newsfragments/719.feature.rst b/newsfragments/719.feature.rst index d706afd9ae..07005b3bd9 100644 --- a/newsfragments/719.feature.rst +++ b/newsfragments/719.feature.rst @@ -1,6 +1,6 @@ We cleaned up the distinction between the "abstract channel interface" and the "memory channel" concrete implementation. `trio.abc.SendChannel` and `trio.abc.ReceiveChannel` have been slimmed -down, `trio.SendMemoryChannel` and `trio.ReceiveMemoryChannel` are now +down, `trio.MemorySendChannel` and `trio.MemoryReceiveChannel` are now public types that can be used in type hints, and there's a new `trio.abc.Channel` interface for future bidirectional channels. From 6e5000a32f0db09e3aafabb5b607957026154e08 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 19 Jun 2019 02:24:09 -0700 Subject: [PATCH 4/6] see if this helps with pypy? --- trio/_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trio/_util.py b/trio/_util.py index edbe1dfa32..5a2517bcbf 100644 --- a/trio/_util.py +++ b/trio/_util.py @@ -278,7 +278,7 @@ def __new__(cls, name, bases, cls_namespace): raise TypeError( "`%s` does not support subclassing" % base.__name__ ) - return type.__new__(cls, name, bases, cls_namespace) + return super().__new__(cls, name, bases, cls_namespace) class NoPublicConstructor(Final): From 68f680b71b4314e9ccfb158409e137e79e78ef9b Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 19 Jun 2019 19:04:25 -0700 Subject: [PATCH 5/6] Convert the Generic/ABC metaclass hack into a generic solution Fixes gh-1092 --- trio/_channel.py | 19 ++----------------- trio/_util.py | 16 +++++++++++++++- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/trio/_channel.py b/trio/_channel.py index dd1345cd12..873fd82e3c 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -102,21 +102,8 @@ def statistics(self): ) -try: - from typing import GenericMeta -except ImportError: - from abc import ABCMeta - - class _GenericNoPublicConstructor(NoPublicConstructor, ABCMeta): - pass -else: - - class _GenericNoPublicConstructor(NoPublicConstructor, GenericMeta): - pass - - @attr.s(cmp=False, repr=False) -class MemorySendChannel(SendChannel, metaclass=_GenericNoPublicConstructor): +class MemorySendChannel(SendChannel, metaclass=NoPublicConstructor): _state = attr.ib() _closed = attr.ib(default=False) # This is just the tasks waiting on *this* object. As compared to @@ -234,9 +221,7 @@ async def aclose(self): @attr.s(cmp=False, repr=False) -class MemoryReceiveChannel( - ReceiveChannel, metaclass=_GenericNoPublicConstructor -): +class MemoryReceiveChannel(ReceiveChannel, metaclass=NoPublicConstructor): _state = attr.ib() _closed = attr.ib(default=False) _tasks = attr.ib(factory=set) diff --git a/trio/_util.py b/trio/_util.py index 5a2517bcbf..c739c67fd9 100644 --- a/trio/_util.py +++ b/trio/_util.py @@ -257,7 +257,21 @@ def __getitem__(self, _): return self -class Final(type): +# If a new class inherits from any ABC, then the new class's metaclass has to +# inherit from ABCMeta. If a new class inherits from typing.Generic, and +# you're using Python 3.6 or earlier, then the new class's metaclass has to +# inherit from typing.GenericMeta. Some of the classes that want to use Final +# or NoPublicConstructor inherit from ABCs and generics, so Final has to +# inherit from these metaclasses. Fortunately, GenericMeta inherits from +# ABCMeta, so inheriting from GenericMeta alone is sufficient (when it +# exists at all). +if hasattr(t, "GenericMeta"): + BaseMeta = t.GenericMeta +else: + BaseMeta = ABCMeta + + +class Final(BaseMeta): """Metaclass that enforces a class to be final (i.e., subclass not allowed). If a class uses this metaclass like this:: From d2eb09c435d90ededc11b5f0942cb3b7097f2b0d Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 21 Jun 2019 20:48:59 -0700 Subject: [PATCH 6/6] Add discussion of channels allowing simultaneous send/receive --- trio/_abc.py | 8 ++++++++ trio/_channel.py | 10 ++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/trio/_abc.py b/trio/_abc.py index 61bd584366..b998c7dd2e 100644 --- a/trio/_abc.py +++ b/trio/_abc.py @@ -567,6 +567,10 @@ async def send(self, value: SendType) -> None: trio.ClosedResourceError: if you previously closed this :class:`SendChannel` object, or if another task closes it while :meth:`send` is running. + trio.BusyResourceError: some channels allow multiple tasks to call + `send` at the same time, but others don't. If you try to call + `send` simultaneously from multiple tasks on a channel that + doesn't support it, then you can get `~trio.BusyResourceError`. """ @@ -607,6 +611,10 @@ async def receive(self) -> ReceiveType: :class:`ReceiveChannel` object. trio.BrokenResourceError: if something has gone wrong, and the channel is broken. + trio.BusyResourceError: some channels allow multiple tasks to call + `receive` at the same time, but others don't. If you try to call + `receive` simultaneously from multiple tasks on a channel that + doesn't support it, then you can get `~trio.BusyResourceError`. """ diff --git a/trio/_channel.py b/trio/_channel.py index 873fd82e3c..451aa1b777 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -147,7 +147,9 @@ def send_nowait(self, value): @enable_ki_protection async def send(self, value): - """See `~trio.abc.SendChannel.send`. + """See `SendChannel.send `. + + Memory channels allow multiple tasks to call `send` at the same time. """ await trio.hazmat.checkpoint_if_cancelled() @@ -259,7 +261,11 @@ def receive_nowait(self): @enable_ki_protection async def receive(self): - """See `~trio.abc.ReceiveChannel.receive`. + """See `ReceiveChannel.receive `. + + Memory channels allow multiple tasks to call `receive` at the same + time. The first task will get the first item sent, the second task + will get the second item sent, and so on. """ await trio.hazmat.checkpoint_if_cancelled()