From 6bc0b33a91713ee62fd1860d28b19cb620c45971 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 11 Apr 2024 01:01:42 +1000 Subject: [PATCH] gh-117531: Unblock getters after non-immediate queue shutdown (#117532) (This is a small tweak of the original gh-104750 which added shutdown.) --- Doc/library/queue.rst | 6 ++++-- Lib/queue.py | 8 +++++--- Lib/test/test_queue.py | 17 +++++++++++++++++ 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index f2a6dbf589fd87..fce23313c7de28 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -245,8 +245,10 @@ them down. queue is empty. Set *immediate* to true to make :meth:`~Queue.get` raise immediately instead. - All blocked callers of :meth:`~Queue.put` will be unblocked. If *immediate* - is true, also unblock callers of :meth:`~Queue.get` and :meth:`~Queue.join`. + All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get` will be + unblocked. If *immediate* is true, a task will be marked as done for each + remaining item in the queue, which may unblock callers of + :meth:`~Queue.join`. .. versionadded:: 3.13 diff --git a/Lib/queue.py b/Lib/queue.py index 387ce5425879a4..25beb46e30d6bd 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -239,8 +239,9 @@ def shutdown(self, immediate=False): By default, gets will only raise once the queue is empty. Set 'immediate' to True to make gets raise immediately instead. - All blocked callers of put() will be unblocked, and also get() - and join() if 'immediate'. + All blocked callers of put() and get() will be unblocked. If + 'immediate', a task is marked as done for each item remaining in + the queue, which may unblock callers of join(). ''' with self.mutex: self.is_shutdown = True @@ -249,9 +250,10 @@ def shutdown(self, immediate=False): self._get() if self.unfinished_tasks > 0: self.unfinished_tasks -= 1 - self.not_empty.notify_all() # release all blocked threads in `join()` self.all_tasks_done.notify_all() + # All getters need to re-check queue-empty to raise ShutDown + self.not_empty.notify_all() self.not_full.notify_all() # Override these methods to implement other queue organizations diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index c4d10110132393..d5927fbf39142b 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -636,6 +636,23 @@ def test_shutdown_get_task_done_join(self): self.assertEqual(results, [True]*len(thrds)) + def test_shutdown_pending_get(self): + def get(): + try: + results.append(q.get()) + except Exception as e: + results.append(e) + + q = self.type2test() + results = [] + get_thread = threading.Thread(target=get) + get_thread.start() + q.shutdown(immediate=False) + get_thread.join(timeout=10.0) + self.assertFalse(get_thread.is_alive()) + self.assertEqual(len(results), 1) + self.assertIsInstance(results[0], self.queue.ShutDown) + class QueueTest(BaseQueueTestMixin):