Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: asyncio_coroutine_concurrent recipe. #288

Merged
merged 2 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 31 additions & 41 deletions cookbook/core/asyncio/coroutine_concurrent.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
## Recipes

```python
"""Asynchronous I/O - Run coroutines concurrently.
"""

import asyncio
import logging
import sys

logging.basicConfig(
level=logging.DEBUG, style='{', format='[{threadName} ({thread})] {message}'
Expand All @@ -18,48 +20,16 @@ async def do_task(name: str, delay: float) -> str:
return f'task ({name}) result'


async def concurrent_task(arg: str) -> None:
async def concurrent_task(arg: str) -> tuple[str, str]:
"""run coroutines concurrently."""
logging.debug(f'run concurrent_task: {arg=}')

if sys.version_info >= (3, 11):
# The await is implicit when the context manager exits.
async with asyncio.TaskGroup() as tg:
_ = tg.create_task(do_task('1', 3.0), name='t1')
_ = tg.create_task(do_task('2', 3.5))
else:
if sys.version_info >= (3, 7):
t1 = asyncio.create_task(do_task('1', 3.0), name='t1')
t2 = asyncio.create_task(do_task('2', 3.5))
else:
# Low-level APIs
loop = asyncio.get_running_loop()
t1 = loop.create_task(do_task('1', 3.0), name='t1')
t2 = loop.create_task(do_task('2', 3.5))

# wait until both tasks are completed
await t1
await t2


async def concurrent_task_result(arg: str) -> tuple[str, str]:
"""run coroutines concurrently, with results returned."""
logging.debug(f'run concurrent_task: {arg=}')

if sys.version_info >= (3, 7):
t1 = asyncio.create_task(do_task('3', 3.0))
t2 = asyncio.create_task(do_task('4', 3.5))
else:
# Low-level APIs
loop = asyncio.get_running_loop()
t1 = loop.create_task(do_task('3', 3.0))
t2 = loop.create_task(do_task('4', 3.5))

# wait until both tasks are completed
r1 = await t1
r2 = await t2
# The await is implicit when the context manager exits.
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(do_task('1', 3.0), name='t1')
t2 = tg.create_task(do_task('2', 3.5), name='t2')

return r1, r2
return await t1, await t2


async def coroutine_gather(arg: str) -> tuple[str, str]:
Expand All @@ -69,8 +39,7 @@ async def coroutine_gather(arg: str) -> tuple[str, str]:


async def main() -> tuple[str, ...]:
await concurrent_task('task')
r1 = await concurrent_task_result('task_result')
r1 = await concurrent_task('task')
r2 = await coroutine_gather('gather')

return r1 + tuple(r2)
Expand All @@ -80,6 +49,27 @@ result: tuple[str, ...] = asyncio.run(main())
logging.debug(f'result: {result}')
```

### Before Python 3.11

```python
async def concurrent_task(arg: str) -> tuple[str, str]:
"""run coroutines concurrently."""
logging.debug(f'run concurrent_task: {arg=}')

t1 = asyncio.create_task(do_task('1', 3.0), name='t1')
t2 = asyncio.create_task(do_task('2', 3.5), name='t2')

# Low-level APIs
# Before Python 3.7
#
# loop = asyncio.get_running_loop()
# t1 = loop.create_task(do_task('1', 3.0), name='t1')
# t2 = loop.create_task(do_task('2', 3.5), name='t2')

# wait until both tasks are completed
return await t1, await t2
```

## References

- [Python - `asyncio` module](https://docs.python.org/3/library/asyncio.html)
Expand Down
48 changes: 7 additions & 41 deletions examples/core/asyncio_coroutine_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import asyncio
import logging
import sys

logging.basicConfig(
level=logging.DEBUG, style='{', format='[{threadName} ({thread})] {message}'
Expand All @@ -16,48 +15,16 @@ async def do_task(name: str, delay: float) -> str:
return f'task ({name}) result'


async def concurrent_task(arg: str) -> None:
async def concurrent_task(arg: str) -> tuple[str, str]:
"""run coroutines concurrently."""
logging.debug(f'run concurrent_task: {arg=}')

if sys.version_info >= (3, 11):
# The await is implicit when the context manager exits.
async with asyncio.TaskGroup() as tg:
_ = tg.create_task(do_task('1', 3.0), name='t1')
_ = tg.create_task(do_task('2', 3.5))
else:
if sys.version_info >= (3, 7):
t1 = asyncio.create_task(do_task('1', 3.0), name='t1')
t2 = asyncio.create_task(do_task('2', 3.5))
else:
# Low-level APIs
loop = asyncio.get_running_loop()
t1 = loop.create_task(do_task('1', 3.0), name='t1')
t2 = loop.create_task(do_task('2', 3.5))
# The await is implicit when the context manager exits.
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(do_task('1', 3.0), name='t1')
t2 = tg.create_task(do_task('2', 3.5), name='t2')

# wait until both tasks are completed
await t1
await t2


async def concurrent_task_result(arg: str) -> tuple[str, str]:
"""run coroutines concurrently, with results returned."""
logging.debug(f'run concurrent_task: {arg=}')

if sys.version_info >= (3, 7):
t1 = asyncio.create_task(do_task('3', 3.0))
t2 = asyncio.create_task(do_task('4', 3.5))
else:
# Low-level APIs
loop = asyncio.get_running_loop()
t1 = loop.create_task(do_task('3', 3.0))
t2 = loop.create_task(do_task('4', 3.5))

# wait until both tasks are completed
r1 = await t1
r2 = await t2

return r1, r2
return await t1, await t2


async def coroutine_gather(arg: str) -> tuple[str, str]:
Expand All @@ -67,8 +34,7 @@ async def coroutine_gather(arg: str) -> tuple[str, str]:


async def main() -> tuple[str, ...]:
await concurrent_task('task')
r1 = await concurrent_task_result('task_result')
r1 = await concurrent_task('task')
r2 = await coroutine_gather('gather')

return r1 + tuple(r2)
Expand Down
43 changes: 43 additions & 0 deletions examples/core/asyncio_coroutine_concurrent_before_py311.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""Asynchronous I/O - Run coroutines concurrently.

Before Python 3.11
"""

import asyncio
import logging

logging.basicConfig(
level=logging.DEBUG, style='{', format='[{threadName} ({thread})] {message}'
)


async def do_task(name: str, delay: float) -> str:
logging.debug(f'run task ({name}), sleep {delay} seconds')
await asyncio.sleep(delay)
return f'task ({name}) result'


async def concurrent_task(arg: str) -> tuple[str, str]:
"""run coroutines concurrently."""
logging.debug(f'run concurrent_task: {arg=}')

t1 = asyncio.create_task(do_task('1', 3.0), name='t1')
t2 = asyncio.create_task(do_task('2', 3.5), name='t2')

# Low-level APIs
# Before Python 3.7
#
# loop = asyncio.get_running_loop()
# t1 = loop.create_task(do_task('1', 3.0), name='t1')
# t2 = loop.create_task(do_task('2', 3.5), name='t2')

# wait until both tasks are completed
return await t1, await t2


async def main() -> tuple[str, str]:
return await concurrent_task('task')


result: tuple[str, ...] = asyncio.run(main())
logging.debug(f'result: {result}')
Loading