diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index 23a1e64b..7944f2df 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -17,6 +17,7 @@ class Web3SubscriptionsManager: websocket_reconnect_max_tries: int = 3 rpc_response_timeout_count: int = 10 subscription_polling_time: float = 0.1 # secs + request_timeout: float = 50.0 def __init__(self, ws_provider_uri: str): # TODO: Temporary until a more permanent solution is added to ProviderAPI @@ -113,8 +114,11 @@ async def subscribe(self, type: SubscriptionType, **filter_params) -> str: "eth_subscribe", [type.value, filter_params] if type is SubscriptionType.EVENTS else [type.value], ) - await self.connection.send(json.dumps(request)) - response = await self._get_response(request.get("id") or self._last_request) + await asyncio.wait_for( + self.connection.send(json.dumps(request)), timeout=self.request_timeout + ) + + response = await self._get_response(int(request.get("id"))) # type: ignore sub_id = response.get("result") if not sub_id: @@ -165,9 +169,11 @@ async def unsubscribe(self, sub_id: str) -> bool: return True request = self._create_request("eth_unsubscribe", [sub_id]) - await self.connection.send(json.dumps(request)) + await asyncio.wait_for( + self.connection.send(json.dumps(request)), timeout=self.request_timeout + ) - response = await self._get_response(request.get("id") or self._last_request) + response = await self._get_response(int(request.get("id"))) # type: ignore if success := response.get("result", False): del self._subscriptions[sub_id] # NOTE: Save memory