From b298e131ff776f774603fa1a292d4887973539f5 Mon Sep 17 00:00:00 2001 From: slush Date: Wed, 8 Jan 2025 11:24:52 -0600 Subject: [PATCH] fix: add timeout to asyncio subscription send wait and removes possibility of fetching wrong response --- silverback/subscriptions.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index 23a1e64b..7944f2df 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -17,6 +17,7 @@ class Web3SubscriptionsManager: websocket_reconnect_max_tries: int = 3 rpc_response_timeout_count: int = 10 subscription_polling_time: float = 0.1 # secs + request_timeout: float = 50.0 def __init__(self, ws_provider_uri: str): # TODO: Temporary until a more permanent solution is added to ProviderAPI @@ -113,8 +114,11 @@ async def subscribe(self, type: SubscriptionType, **filter_params) -> str: "eth_subscribe", [type.value, filter_params] if type is SubscriptionType.EVENTS else [type.value], ) - await self.connection.send(json.dumps(request)) - response = await self._get_response(request.get("id") or self._last_request) + await asyncio.wait_for( + self.connection.send(json.dumps(request)), timeout=self.request_timeout + ) + + response = await self._get_response(int(request.get("id"))) # type: ignore sub_id = response.get("result") if not sub_id: @@ -165,9 +169,11 @@ async def unsubscribe(self, sub_id: str) -> bool: return True request = self._create_request("eth_unsubscribe", [sub_id]) - await self.connection.send(json.dumps(request)) + await asyncio.wait_for( + self.connection.send(json.dumps(request)), timeout=self.request_timeout + ) - response = await self._get_response(request.get("id") or self._last_request) + response = await self._get_response(int(request.get("id"))) # type: ignore if success := response.get("result", False): del self._subscriptions[sub_id] # NOTE: Save memory