From 49858646573b27b23ad71cd010d7f14b101ae2f5 Mon Sep 17 00:00:00 2001 From: Alex Butler Date: Thu, 25 Apr 2024 11:52:23 +0100 Subject: [PATCH] Send publisher permissions using a watch channel (#118) Fixes heap growth if Exporter::update_our_prices is not called as often as updates --- src/agent/solana.rs | 11 +++--- src/agent/solana/exporter.rs | 73 ++++++++++++++---------------------- src/agent/solana/oracle.rs | 15 ++++---- 3 files changed, 41 insertions(+), 58 deletions(-) diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 39bf4070..2b5c65ba 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -8,9 +8,9 @@ pub mod network { use { super::{ - super::{ - store, - store::global, + super::store::{ + self, + global, }, exporter, key_store::{ @@ -29,8 +29,8 @@ pub mod network { std::time::Duration, tokio::{ sync::{ - mpsc, mpsc::Sender, + watch, }, task::JoinHandle, }, @@ -86,8 +86,7 @@ pub mod network { logger: Logger, ) -> Result>> { // Publisher permissions updates between oracle and exporter - let (publisher_permissions_tx, publisher_permissions_rx) = - mpsc::channel(config.oracle.updates_channel_capacity); + let (publisher_permissions_tx, publisher_permissions_rx) = watch::channel(<_>::default()); // Spawn the Oracle let mut jhs = oracle::spawn_oracle( diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index 5784cc6d..2adca249 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -172,7 +172,7 @@ pub fn spawn_exporter( network: Network, rpc_url: &str, rpc_timeout: Duration, - publisher_permissions_rx: mpsc::Receiver>>, + publisher_permissions_rx: watch::Receiver>>, key_store: KeyStore, local_store_tx: Sender, global_store_tx: Sender, @@ -260,7 +260,7 @@ pub struct Exporter { inflight_transactions_tx: Sender, /// publisher => { permissioned_price => market hours } as read by the oracle module - publisher_permissions_rx: mpsc::Receiver>>, + publisher_permissions_rx: watch::Receiver>>, /// Currently known permissioned prices of this publisher along with their market hours our_prices: HashMap, @@ -287,7 +287,7 @@ impl Exporter { global_store_tx: Sender, network_state_rx: watch::Receiver, inflight_transactions_tx: Sender, - publisher_permissions_rx: mpsc::Receiver>>, + publisher_permissions_rx: watch::Receiver>>, keypair_request_tx: mpsc::Sender, logger: Logger, ) -> Self { @@ -556,51 +556,34 @@ impl Exporter { /// The loop ensures that we clear the channel and use /// only the final, latest message; try_recv() is /// non-blocking. - /// - /// Note: This behavior is similar to - /// tokio::sync::watch::channel(), which was not appropriate here - /// because its internal RwLock would complain about not being - /// Send with the HashMap> inside. - /// TODO(2023-05-05): Debug the watch::channel() compilation errors fn update_our_prices(&mut self, publish_pubkey: &Pubkey) { - loop { - match self.publisher_permissions_rx.try_recv() { - Ok(publisher_permissions) => { - self.our_prices = publisher_permissions.get(publish_pubkey) .cloned() - .unwrap_or_else( || { - warn!( - self.logger, - "Exporter: No permissioned prices were found for the publishing keypair on-chain. This is expected only on startup."; - "publish_pubkey" => publish_pubkey.to_string(), - ); - HashMap::new() - }); - trace!( - self.logger, - "Exporter: read permissioned price accounts from channel"; - "new_value" => format!("{:?}", self.our_prices), - ); - } - // Expected failures when channel is empty - Err(TryRecvError::Empty) => { - trace!( - self.logger, - "Exporter: No more permissioned price accounts in channel, using cached value"; - ); - break; - } - // Unexpected failures (channel closed, internal errors etc.) - Err(other) => { - warn!( - self.logger, - "Exporter: Updating permissioned price accounts failed unexpectedly, using cached value"; - "cached_value" => format!("{:?}", self.our_prices), - "error" => other.to_string(), - ); - break; - } + match self.publisher_permissions_rx.has_changed() { + Ok(true) => {} + Ok(false) => return, + Err(other) => { + warn!( + self.logger, + "Exporter: Updating permissioned price accounts failed unexpectedly, using cached value"; + "cached_value" => format!("{:?}", self.our_prices), + "error" => other.to_string(), + ); + return; } } + + self.our_prices = self + .publisher_permissions_rx + .borrow_and_update() + .get(publish_pubkey) + .cloned() + .unwrap_or_else(|| { + warn!( + self.logger, + "Exporter: No permissioned prices were found for the publishing keypair on-chain. This is expected only on startup."; + "publish_pubkey" => publish_pubkey.to_string(), + ); + HashMap::new() + }); } async fn fetch_local_store_contents(&self) -> Result> { diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index 088d8e70..af6e1b99 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -43,7 +43,10 @@ use { time::Duration, }, tokio::{ - sync::mpsc, + sync::{ + mpsc, + watch, + }, task::JoinHandle, time::Interval, }, @@ -204,7 +207,7 @@ pub fn spawn_oracle( wss_url: &str, rpc_timeout: Duration, global_store_update_tx: mpsc::Sender, - publisher_permissions_tx: mpsc::Sender>>, + publisher_permissions_tx: watch::Sender>>, key_store: KeyStore, logger: Logger, ) -> Vec> { @@ -419,7 +422,7 @@ struct Poller { data_tx: mpsc::Sender, /// Updates about permissioned price accounts from oracle to exporter - publisher_permissions_tx: mpsc::Sender>>, + publisher_permissions_tx: watch::Sender>>, /// The RPC client to use to poll data from the RPC node rpc_client: RpcClient, @@ -439,7 +442,7 @@ struct Poller { impl Poller { pub fn new( data_tx: mpsc::Sender, - publisher_permissions_tx: mpsc::Sender>>, + publisher_permissions_tx: watch::Sender>>, rpc_url: &str, rpc_timeout: Duration, commitment: CommitmentLevel, @@ -481,9 +484,7 @@ impl Poller { let fresh_data = self.poll().await?; self.publisher_permissions_tx - .send(fresh_data.publisher_permissions.clone()) - .await - .context("Updating permissioned price accounts for exporter")?; + .send_replace(fresh_data.publisher_permissions.clone()); self.data_tx .send(fresh_data)