Skip to content

Commit

Permalink
feat: Add async retrieval of results
Browse files Browse the repository at this point in the history
  • Loading branch information
07pepa committed Jan 5, 2025
1 parent b372f43 commit fb7c372
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 1 deletion.
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,4 @@ of those changes to CLEARTYPE SRL.
| [@5tefan](https://github.com/5tefan/) | Stefan Codrescu |
| [@kuba-lilz](https://github.com/kuba-lilz/) | Jakub Kolodziejczyk |
| [@dbowring](https://github.com/dbowring/) | Daniel Bowring |
| [@07pepa](https://github.com/07pepa) | Pepa |
49 changes: 48 additions & 1 deletion dramatiq/results/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import asyncio
import hashlib
import time
import typing
Expand Down Expand Up @@ -110,6 +110,53 @@ def get_result(self, message, *, block: bool = False, timeout: typing.Optional[i
else:
return self.unwrap_result(result)

async def get_result_async(self, message, *, timeout: typing.Optional[int] = None) -> Result:
"""Get a result from the backend asynchronously.
This code is non-blocking and will return the result when it is available.
Parameters:
message(Message)
timeout(int): The maximum amount of time, in ms, to wait for
a result when block is True. Defaults to 10 seconds.
Raises:
ResultTimeout: When waiting for a result times out.
Returns:
object: The result.
"""
message_key = self.build_message_key(message)

result = self._get(message_key)
if result is not Missing:
return self.unwrap_result(result)

attempts = 1
timeout_triggered = asyncio.Event()

async def timeout_setter(sleep_time):
if sleep_time is None:
sleep_time = DEFAULT_TIMEOUT
await asyncio.sleep(sleep_time / 1000)
timeout_triggered.set()

to_setter = asyncio.create_task(timeout_setter(timeout))
while not timeout_triggered.is_set():
try:
attempts, delay = compute_backoff(attempts, factor=BACKOFF_FACTOR)
delay /= 1000
await asyncio.wait_for(timeout_triggered.wait(), delay)
except asyncio.TimeoutError:
result = self._get(message_key)
if result is not Missing:
return self.unwrap_result(result)
continue
finally:
if not to_setter.done():
to_setter.cancel()

raise ResultTimeout(message)

def store_result(self, message, result: Result, ttl: int) -> None:
"""Store a result in the backend.
Expand Down
26 changes: 26 additions & 0 deletions tests/test_results.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import time
from unittest.mock import patch

Expand Down Expand Up @@ -374,3 +375,28 @@ def do_work():

# Then the result should still be None.
assert result_backend.get_result(sent_message) is None


async def test_async_get_result_works(result_backend):
@dramatiq.actor(store_results=True)
async def do_work():
await asyncio.sleep(0.1)
return 42

sent_message = do_work.send()

# Await a result
assert (await result_backend.get_result_async(sent_message)) == 42


async def test_async_get_result_timeouts(result_backend):
@dramatiq.actor(store_results=True)
async def do_work():
await asyncio.sleep(0.2)
return 42

sent_message = do_work.send()

# Await a result
with pytest.raises(ResultTimeout):
await result_backend.get_result_async(sent_message, timeout=100)

0 comments on commit fb7c372

Please sign in to comment.