From 6bd971739e346b6773c2b744e5856e9a6ad6138b Mon Sep 17 00:00:00 2001 From: Li Yun Date: Sun, 16 Jun 2024 13:10:25 +0800 Subject: [PATCH 1/2] enhancement: `asyncio_coroutine_concurrent` recipe. --- cookbook/core/asyncio/coroutine_concurrent.md | 72 ++++++++----------- examples/core/asyncio_coroutine_concurrent.py | 48 ++----------- ...yncio_coroutine_concurrent_before_py311.py | 43 +++++++++++ 3 files changed, 81 insertions(+), 82 deletions(-) create mode 100644 examples/core/asyncio_coroutine_concurrent_before_py311.py diff --git a/cookbook/core/asyncio/coroutine_concurrent.md b/cookbook/core/asyncio/coroutine_concurrent.md index 1e01010..5360075 100644 --- a/cookbook/core/asyncio/coroutine_concurrent.md +++ b/cookbook/core/asyncio/coroutine_concurrent.md @@ -3,9 +3,11 @@ ## Recipes ```python +"""Asynchronous I/O - Run coroutines concurrently. +""" + import asyncio import logging -import sys logging.basicConfig( level=logging.DEBUG, style='{', format='[{threadName} ({thread})] {message}' @@ -18,48 +20,16 @@ async def do_task(name: str, delay: float) -> str: return f'task ({name}) result' -async def concurrent_task(arg: str) -> None: +async def concurrent_task(arg: str) -> tuple[str, str]: """run coroutines concurrently.""" logging.debug(f'run concurrent_task: {arg=}') - if sys.version_info >= (3, 11): - # The await is implicit when the context manager exits. - async with asyncio.TaskGroup() as tg: - _ = tg.create_task(do_task('1', 3.0), name='t1') - _ = tg.create_task(do_task('2', 3.5)) - else: - if sys.version_info >= (3, 7): - t1 = asyncio.create_task(do_task('1', 3.0), name='t1') - t2 = asyncio.create_task(do_task('2', 3.5)) - else: - # Low-level APIs - loop = asyncio.get_running_loop() - t1 = loop.create_task(do_task('1', 3.0), name='t1') - t2 = loop.create_task(do_task('2', 3.5)) - - # wait until both tasks are completed - await t1 - await t2 - - -async def concurrent_task_result(arg: str) -> tuple[str, str]: - """run coroutines concurrently, with results returned.""" - logging.debug(f'run concurrent_task: {arg=}') - - if sys.version_info >= (3, 7): - t1 = asyncio.create_task(do_task('3', 3.0)) - t2 = asyncio.create_task(do_task('4', 3.5)) - else: - # Low-level APIs - loop = asyncio.get_running_loop() - t1 = loop.create_task(do_task('3', 3.0)) - t2 = loop.create_task(do_task('4', 3.5)) - - # wait until both tasks are completed - r1 = await t1 - r2 = await t2 + # The await is implicit when the context manager exits. + async with asyncio.TaskGroup() as tg: + t1 = tg.create_task(do_task('1', 3.0), name='t1') + t2 = tg.create_task(do_task('2', 3.5), name='t2') - return r1, r2 + return await t1, await t2 async def coroutine_gather(arg: str) -> tuple[str, str]: @@ -69,8 +39,7 @@ async def coroutine_gather(arg: str) -> tuple[str, str]: async def main() -> tuple[str, ...]: - await concurrent_task('task') - r1 = await concurrent_task_result('task_result') + r1 = await concurrent_task('task') r2 = await coroutine_gather('gather') return r1 + tuple(r2) @@ -80,6 +49,27 @@ result: tuple[str, ...] = asyncio.run(main()) logging.debug(f'result: {result}') ``` +### Before Python 3.11 + +```python +async def concurrent_task(arg: str) -> tuple[str, str]: + """run coroutines concurrently.""" + logging.debug(f'run concurrent_task: {arg=}') + + t1 = asyncio.create_task(do_task('1', 3.0), name='t1') + t2 = asyncio.create_task(do_task('2', 3.5), name='t2') + + # Low-level APIs + # Before Python 3.7 + # + # loop = asyncio.get_running_loop() + # t1 = loop.create_task(do_task('1', 3.0), name='t1') + # t2 = loop.create_task(do_task('2', 3.5), name='t2') + + # wait until both tasks are completed + return await t1, await t2 +``` + ## References - [Python - `asyncio` module](https://docs.python.org/3/library/asyncio.html) diff --git a/examples/core/asyncio_coroutine_concurrent.py b/examples/core/asyncio_coroutine_concurrent.py index 9d2d168..082d382 100644 --- a/examples/core/asyncio_coroutine_concurrent.py +++ b/examples/core/asyncio_coroutine_concurrent.py @@ -3,7 +3,6 @@ import asyncio import logging -import sys logging.basicConfig( level=logging.DEBUG, style='{', format='[{threadName} ({thread})] {message}' @@ -16,48 +15,16 @@ async def do_task(name: str, delay: float) -> str: return f'task ({name}) result' -async def concurrent_task(arg: str) -> None: +async def concurrent_task(arg: str) -> tuple[str, str]: """run coroutines concurrently.""" logging.debug(f'run concurrent_task: {arg=}') - if sys.version_info >= (3, 11): - # The await is implicit when the context manager exits. - async with asyncio.TaskGroup() as tg: - _ = tg.create_task(do_task('1', 3.0), name='t1') - _ = tg.create_task(do_task('2', 3.5)) - else: - if sys.version_info >= (3, 7): - t1 = asyncio.create_task(do_task('1', 3.0), name='t1') - t2 = asyncio.create_task(do_task('2', 3.5)) - else: - # Low-level APIs - loop = asyncio.get_running_loop() - t1 = loop.create_task(do_task('1', 3.0), name='t1') - t2 = loop.create_task(do_task('2', 3.5)) + # The await is implicit when the context manager exits. + async with asyncio.TaskGroup() as tg: + t1 = tg.create_task(do_task('1', 3.0), name='t1') + t2 = tg.create_task(do_task('2', 3.5), name='t2') - # wait until both tasks are completed - await t1 - await t2 - - -async def concurrent_task_result(arg: str) -> tuple[str, str]: - """run coroutines concurrently, with results returned.""" - logging.debug(f'run concurrent_task: {arg=}') - - if sys.version_info >= (3, 7): - t1 = asyncio.create_task(do_task('3', 3.0)) - t2 = asyncio.create_task(do_task('4', 3.5)) - else: - # Low-level APIs - loop = asyncio.get_running_loop() - t1 = loop.create_task(do_task('3', 3.0)) - t2 = loop.create_task(do_task('4', 3.5)) - - # wait until both tasks are completed - r1 = await t1 - r2 = await t2 - - return r1, r2 + return await t1, await t2 async def coroutine_gather(arg: str) -> tuple[str, str]: @@ -67,8 +34,7 @@ async def coroutine_gather(arg: str) -> tuple[str, str]: async def main() -> tuple[str, ...]: - await concurrent_task('task') - r1 = await concurrent_task_result('task_result') + r1 = await concurrent_task('task') r2 = await coroutine_gather('gather') return r1 + tuple(r2) diff --git a/examples/core/asyncio_coroutine_concurrent_before_py311.py b/examples/core/asyncio_coroutine_concurrent_before_py311.py new file mode 100644 index 0000000..2fca1af --- /dev/null +++ b/examples/core/asyncio_coroutine_concurrent_before_py311.py @@ -0,0 +1,43 @@ +"""Asynchronous I/O - Run coroutines concurrently. + +Before Python 3.11 +""" + +import asyncio +import logging +import sys + +logging.basicConfig( + level=logging.DEBUG, style='{', format='[{threadName} ({thread})] {message}' +) + + +async def do_task(name: str, delay: float) -> str: + logging.debug(f'run task ({name}), sleep {delay} seconds') + await asyncio.sleep(delay) + return f'task ({name}) result' + + +async def concurrent_task(arg: str) -> tuple[str, str]: + """run coroutines concurrently.""" + logging.debug(f'run concurrent_task: {arg=}') + + if sys.version_info >= (3, 7): + t1 = asyncio.create_task(do_task('1', 3.0), name='t1') + t2 = asyncio.create_task(do_task('2', 3.5), name='t2') + else: + # Low-level APIs + loop = asyncio.get_running_loop() + t1 = loop.create_task(do_task('1', 3.0), name='t1') + t2 = loop.create_task(do_task('2', 3.5), name='t2') + + # wait until both tasks are completed + return await t1, await t2 + + +async def main() -> tuple[str, str]: + return await concurrent_task('task') + + +result: tuple[str, ...] = asyncio.run(main()) +logging.debug(f'result: {result}') From 76943c4aa8114a52aaa65d7992312fcb6675cb29 Mon Sep 17 00:00:00 2001 From: Li Yun Date: Sun, 16 Jun 2024 14:35:04 +0800 Subject: [PATCH 2/2] fix. --- ...syncio_coroutine_concurrent_before_py311.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/examples/core/asyncio_coroutine_concurrent_before_py311.py b/examples/core/asyncio_coroutine_concurrent_before_py311.py index 2fca1af..3c50b21 100644 --- a/examples/core/asyncio_coroutine_concurrent_before_py311.py +++ b/examples/core/asyncio_coroutine_concurrent_before_py311.py @@ -5,7 +5,6 @@ import asyncio import logging -import sys logging.basicConfig( level=logging.DEBUG, style='{', format='[{threadName} ({thread})] {message}' @@ -22,14 +21,15 @@ async def concurrent_task(arg: str) -> tuple[str, str]: """run coroutines concurrently.""" logging.debug(f'run concurrent_task: {arg=}') - if sys.version_info >= (3, 7): - t1 = asyncio.create_task(do_task('1', 3.0), name='t1') - t2 = asyncio.create_task(do_task('2', 3.5), name='t2') - else: - # Low-level APIs - loop = asyncio.get_running_loop() - t1 = loop.create_task(do_task('1', 3.0), name='t1') - t2 = loop.create_task(do_task('2', 3.5), name='t2') + t1 = asyncio.create_task(do_task('1', 3.0), name='t1') + t2 = asyncio.create_task(do_task('2', 3.5), name='t2') + + # Low-level APIs + # Before Python 3.7 + # + # loop = asyncio.get_running_loop() + # t1 = loop.create_task(do_task('1', 3.0), name='t1') + # t2 = loop.create_task(do_task('2', 3.5), name='t2') # wait until both tasks are completed return await t1, await t2