From 670c08090b96076f1247b9cbd94af76d0e5ed6bf Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Sun, 25 Feb 2024 14:46:30 +0330 Subject: [PATCH] fix: remove adapter <> api deadlock (#108) Adapter sends price update notifications to the subscriber API connection actor but at the same time the subscriber actor can get blcoked on receiving messages (e.g. product info) from Adapter. If the channel from Adapter to the subscriber gets full the Adapter can get blocked and this results in a deadlock. This situation propagates and queues messages in other actors and WS messages from the clients will grow in an unbounded queue that results in a constant memory growth (leak). This change simply makes the Adaptor to subscriber API connection nonblocking by dropping messages if the channels are full. --- src/agent/pythd/adapter.rs | 38 ++++++++++++++++-------------- src/agent/remote_keypair_loader.rs | 3 +-- src/agent/solana/exporter.rs | 3 +-- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/src/agent/pythd/adapter.rs b/src/agent/pythd/adapter.rs index d29addc..ebea11d 100644 --- a/src/agent/pythd/adapter.rs +++ b/src/agent/pythd/adapter.rs @@ -214,7 +214,7 @@ impl Adapter { Some(message) = self.message_rx.recv() => { if let Err(err) = self.handle_message(message).await { error!(self.logger, "{}", err); - debug!(self.logger, "error context"; "context" => format!("{:?}", err)); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } _ = self.shutdown_rx.recv() => { @@ -224,7 +224,7 @@ impl Adapter { _ = self.notify_price_sched_interval.tick() => { if let Err(err) = self.send_notify_price_sched().await { error!(self.logger, "{}", err); - debug!(self.logger, "error context"; "context" => format!("{:?}", err)); + debug!(self.logger, "error context"; "context" => format!("{:?}", err)); } } } @@ -508,12 +508,14 @@ impl Adapter { async fn send_notify_price_sched(&self) -> Result<()> { for subscription in self.notify_price_sched_subscriptions.values().flatten() { + // Send the notify price sched update without awaiting. This results in raising errors + // if the channel is full which normally should not happen. This is because we do not + // want to block the adapter if the channel is full. subscription .notify_price_sched_tx - .send(NotifyPriceSched { + .try_send(NotifyPriceSched { subscription: subscription.subscription_id, - }) - .await?; + })?; } Ok(()) @@ -580,19 +582,19 @@ impl Adapter { // Send the Notify Price update to each subscription for subscription in subscriptions { - subscription - .notify_price_tx - .send(NotifyPrice { - subscription: subscription.subscription_id, - result: PriceUpdate { - price, - conf, - status: Self::price_status_to_str(status), - valid_slot, - pub_slot, - }, - }) - .await?; + // Send the notify price update without awaiting. This results in raising errors if the + // channel is full which normally should not happen. This is because we do not want to + // block the adapter if the channel is full. + subscription.notify_price_tx.try_send(NotifyPrice { + subscription: subscription.subscription_id, + result: PriceUpdate { + price, + conf, + status: Self::price_status_to_str(status), + valid_slot, + pub_slot, + }, + })?; } Ok(()) diff --git a/src/agent/remote_keypair_loader.rs b/src/agent/remote_keypair_loader.rs index ee800f2..f8f3a67 100644 --- a/src/agent/remote_keypair_loader.rs +++ b/src/agent/remote_keypair_loader.rs @@ -314,8 +314,7 @@ async fn handle_key_requests( match response_tx.send(copied_keypair) { Ok(()) => {} Err(_e) => { - warn!(logger, "remote_keypair_loader: Could not send back secondary keypair to channel"; - ); + warn!(logger, "remote_keypair_loader: Could not send back secondary keypair to channel"); } } } diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index c903337..e7f9828 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -462,7 +462,7 @@ impl Exporter { self.update_our_prices(&publish_keypair.pubkey()); debug!(self.logger, "Exporter: filtering prices permissioned to us"; - "our_prices" => format!("{:?}", self.our_prices), + "our_prices" => format!("{:?}", self.our_prices.keys()), "publish_pubkey" => publish_keypair.pubkey().to_string(), ); @@ -594,7 +594,6 @@ impl Exporter { trace!( self.logger, "Exporter: No more permissioned price accounts in channel, using cached value"; - "cached_value" => format!("{:?}", self.our_prices), ); break; }