From 172444351f4799035a27b1da7103df836ab666ae Mon Sep 17 00:00:00 2001 From: Reisen Date: Thu, 27 Jun 2024 06:09:38 +0000 Subject: [PATCH] refactor(agent): extract exporter component/service --- src/agent.rs | 40 +- src/agent/services.rs | 2 + src/agent/services/exporter.rs | 338 +++++++++++++ src/agent/services/oracle.rs | 27 +- src/agent/solana.rs | 89 +--- src/agent/solana/exporter.rs | 1 - src/agent/state.rs | 34 +- src/agent/state/exporter.rs | 830 ++++++++++++++++++++++++++++++++ src/agent/state/oracle.rs | 28 +- src/agent/state/transactions.rs | 111 +++++ 10 files changed, 1361 insertions(+), 139 deletions(-) create mode 100644 src/agent/services/exporter.rs create mode 100644 src/agent/state/exporter.rs create mode 100644 src/agent/state/transactions.rs diff --git a/src/agent.rs b/src/agent.rs index 2162a10..0395401 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -126,48 +126,34 @@ impl Agent { let mut jhs = vec![]; // Create the Application State. - let state = Arc::new(state::State::new(self.config.state.clone()).await); + let state = Arc::new(state::State::new(&self.config).await); // Spawn the primary network Oracle. - { - // Publisher permissions updates between oracle and exporter - let (publisher_permissions_tx, publisher_permissions_rx) = - watch::channel(<_>::default()); - - jhs.push(tokio::spawn(services::oracle( - self.config.primary_network.clone(), - network::Network::Primary, - state.clone(), - publisher_permissions_tx.clone(), - ))); + jhs.push(tokio::spawn(services::oracle( + self.config.primary_network.clone(), + network::Network::Primary, + state.clone(), + ))); - // Spawn the primary network - jhs.extend(network::spawn_network( - self.config.primary_network.clone(), - network::Network::Primary, - state.clone(), - publisher_permissions_rx.clone(), - )?); - } + jhs.push(tokio::spawn(services::exporter( + self.config.primary_network.clone(), + network::Network::Primary, + state.clone(), + ))); // Spawn the secondary network Oracle, if needed. if let Some(config) = &self.config.secondary_network { - let (publisher_permissions_tx, publisher_permissions_rx) = - watch::channel(<_>::default()); - jhs.push(tokio::spawn(services::oracle( config.clone(), network::Network::Secondary, state.clone(), - publisher_permissions_tx.clone(), ))); - jhs.extend(network::spawn_network( + jhs.push(tokio::spawn(services::exporter( config.clone(), network::Network::Secondary, state.clone(), - publisher_permissions_rx, - )?); + ))); } // Create the Notifier task for the Pythd RPC. diff --git a/src/agent/services.rs b/src/agent/services.rs index 554b866..3e4f829 100644 --- a/src/agent/services.rs +++ b/src/agent/services.rs @@ -1,8 +1,10 @@ +pub mod exporter; pub mod keypairs; pub mod notifier; pub mod oracle; pub use { + exporter::exporter, keypairs::keypairs, notifier::notifier, oracle::oracle, diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs new file mode 100644 index 0000000..cdee65d --- /dev/null +++ b/src/agent/services/exporter.rs @@ -0,0 +1,338 @@ +use { + crate::agent::{ + solana::network::{ + self, + Network, + }, + state::{ + exporter::Exporter, + transactions::Transactions, + }, + }, + anyhow::Result, + futures_util::future, + serde::{ + Deserialize, + Serialize, + }, + solana_client::nonblocking::rpc_client::RpcClient, + solana_sdk::commitment_config::CommitmentConfig, + std::{ + sync::Arc, + time::Duration, + }, + tokio::{ + sync::watch, + time::Interval, + }, +}; + +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(default)] +pub struct Config { + /// Duration of the interval at which to refresh the cached network state (current slot and blockhash). + /// It is recommended to set this to slightly less than the network's block time, + /// as the slot fetched will be used as the time of the price update. + #[serde(with = "humantime_serde")] + pub refresh_network_state_interval_duration: Duration, + /// Duration of the interval at which to publish updates + #[serde(with = "humantime_serde")] + pub publish_interval_duration: Duration, + /// Age after which a price update is considered stale and not published + #[serde(with = "humantime_serde")] + pub staleness_threshold: Duration, + /// Wait at least this long before publishing an unchanged price + /// state; unchanged price state means only timestamp has changed + /// with other state identical to last published state. + pub unchanged_publish_threshold: Duration, + /// Maximum size of a batch + pub max_batch_size: usize, + /// Capacity of the channel between the Exporter and the Transaction Monitor + pub inflight_transactions_channel_capacity: usize, + /// Configuration for the Transaction Monitor + pub transaction_monitor: transaction_monitor::Config, + /// Number of compute units requested per update_price instruction within the transaction + /// (i.e., requested units equals `n * compute_unit_limit`, where `n` is the number of update_price + /// instructions) + pub compute_unit_limit: u32, + /// Price per compute unit offered for update_price transactions If dynamic compute unit is + /// enabled and this value is set, the actual price per compute unit will be the maximum of the + /// network dynamic price and this value. + pub compute_unit_price_micro_lamports: Option, + /// Enable using dynamic price per compute unit based on the network previous prioritization + /// fees. + pub dynamic_compute_unit_pricing_enabled: bool, + /// Maximum total compute unit fee paid for a single transaction. Defaults to 0.001 SOL. This + /// is a safety measure while using dynamic compute price to prevent the exporter from paying + /// too much for a single transaction + pub maximum_compute_unit_price_micro_lamports: u64, + /// Maximum slot gap between the current slot and the oldest slot amongst all the accounts in + /// the batch. This is used to calculate the dynamic price per compute unit. When the slot gap + /// reaches this number we will use the maximum total_compute_fee for the transaction. + pub maximum_slot_gap_for_dynamic_compute_unit_price: u64, +} + +impl Default for Config { + fn default() -> Self { + Self { + refresh_network_state_interval_duration: Duration::from_millis(200), + publish_interval_duration: Duration::from_secs(1), + staleness_threshold: Duration::from_secs(5), + unchanged_publish_threshold: Duration::from_secs(5), + max_batch_size: 12, + inflight_transactions_channel_capacity: 10000, + transaction_monitor: Default::default(), + // The largest transactions without accumulator spend around 38k compute units + // and accumulator cpi costs around 10k compute units. We set the limit to 60k + // to have some buffer. + compute_unit_limit: 60000, + compute_unit_price_micro_lamports: None, + dynamic_compute_unit_pricing_enabled: false, + // Maximum compute unit price (as a cap on the dynamic price) + maximum_compute_unit_price_micro_lamports: 1_000_000, + // A publisher update is not included if it is 25 slots behind the current slot. + // Due to the delay in the network (until a block gets confirmed) and potential + // ws issues we add 15 slots to make sure we do not overpay. + maximum_slot_gap_for_dynamic_compute_unit_price: 40, + } + } +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct NetworkState { + pub blockhash: solana_sdk::hash::Hash, + pub current_slot: u64, +} + +/// NetworkStateQuerier periodically queries the current state of the network, +/// fetching the blockhash and slot number. +struct NetworkStateQuerier { + /// The RPC client + rpc_client: RpcClient, + + /// The interval with which to query the network state + query_interval: Interval, + + /// Channel the current network state is sent on + network_state_tx: watch::Sender, +} + +impl NetworkStateQuerier { + pub fn new( + rpc_endpoint: &str, + rpc_timeout: Duration, + query_interval: Interval, + network_state_tx: watch::Sender, + ) -> Self { + NetworkStateQuerier { + rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout), + query_interval, + network_state_tx, + } + } + + pub async fn run(&mut self) { + loop { + self.query_interval.tick().await; + if let Err(err) = self.query_network_state().await { + tracing::error!(err = ?err, "Network state query failed"); + } + } + } + + async fn query_network_state(&mut self) -> Result<()> { + // Fetch the blockhash and current slot in parallel + let current_slot_future = self + .rpc_client + .get_slot_with_commitment(CommitmentConfig::confirmed()); + let latest_blockhash_future = self.rpc_client.get_latest_blockhash(); + + let (current_slot_result, latest_blockhash_result) = + future::join(current_slot_future, latest_blockhash_future).await; + + // Send the result on the channel + self.network_state_tx.send(NetworkState { + blockhash: latest_blockhash_result?, + current_slot: current_slot_result?, + })?; + + Ok(()) + } +} + +pub async fn exporter(config: network::Config, network: Network, state: Arc) +where + S: Exporter, + S: Transactions, + S: Send + Sync + 'static, +{ + // Create and spawn the network state querier + let (network_state_tx, network_state_rx) = watch::channel(Default::default()); + let mut network_state_querier = NetworkStateQuerier::new( + &config.rpc_url, + config.rpc_timeout, + tokio::time::interval(config.exporter.refresh_network_state_interval_duration), + network_state_tx, + ); + + tokio::spawn(transaction_monitor::transaction_monitor( + config.clone(), + state.clone(), + )); + tokio::spawn(exporter::exporter(config, network, state, network_state_rx)); + tokio::spawn(async move { network_state_querier.run().await }); +} + +mod exporter { + use { + super::NetworkState, + crate::agent::{ + solana::{ + key_store::KeyStore, + network::{ + Config, + Network, + }, + }, + state::exporter::{ + get_publish_keypair, + publish_batches, + Exporter, + }, + }, + solana_client::nonblocking::rpc_client::RpcClient, + std::sync::Arc, + tokio::sync::watch, + }; + + pub async fn exporter( + config: Config, + network: Network, + state: Arc, + network_state_rx: watch::Receiver, + ) where + S: Exporter, + S: Send + Sync + 'static, + { + let mut publish_interval = tokio::time::interval(config.exporter.publish_interval_duration); + let mut dynamic_compute_unit_price_update_interval = + tokio::time::interval(config.exporter.publish_interval_duration); + + let client = Arc::new(RpcClient::new_with_timeout( + config.rpc_url.to_string(), + config.rpc_timeout, + )); + let Ok(key_store) = KeyStore::new(config.key_store.clone()) else { + tracing::warn!("Key store not available, Exporter won't start."); + return; + }; + + loop { + tokio::select! { + _ = publish_interval.tick() => { + if let Ok(publish_keypair) = get_publish_keypair(&*state, network, key_store.publish_keypair.as_ref()).await { + if let Ok(permissioned_updates) = Exporter::get_permissioned_updates( + &*state, + network, + &publish_keypair, + config.exporter.staleness_threshold, + config.exporter.unchanged_publish_threshold, + ).await { + if let Err(err) = publish_batches( + &*state, + client.clone(), + network, + &network_state_rx, + key_store.accumulator_key, + &publish_keypair, + key_store.program_key, + config.exporter.max_batch_size, + config.exporter.staleness_threshold, + config.exporter.compute_unit_limit, + config.exporter.compute_unit_price_micro_lamports, + config.exporter.maximum_compute_unit_price_micro_lamports, + config.exporter.maximum_slot_gap_for_dynamic_compute_unit_price, + config.exporter.dynamic_compute_unit_pricing_enabled, + permissioned_updates, + ).await { + tracing::error!(err = ?err, "Exporter failed to publish."); + } + } + } + } + _ = dynamic_compute_unit_price_update_interval.tick() => { + if config.exporter.dynamic_compute_unit_pricing_enabled { + if let Ok(publish_keypair) = get_publish_keypair(&*state, network, key_store.publish_keypair.as_ref()).await { + if let Err(err) = Exporter::update_recent_compute_unit_price( + &*state, + network, + &publish_keypair, + &client, + config.exporter.staleness_threshold, + config.exporter.unchanged_publish_threshold, + ).await { + tracing::error!(err = ?err, "Exporter failed to compute unit price."); + } + } + } + } + } + } + } +} + +mod transaction_monitor { + use { + crate::agent::{ + solana::network, + state::transactions::Transactions, + }, + serde::{ + Deserialize, + Serialize, + }, + solana_client::nonblocking::rpc_client::RpcClient, + std::{ + sync::Arc, + time::Duration, + }, + }; + + #[derive(Clone, Serialize, Deserialize, Debug)] + #[serde(default)] + pub struct Config { + /// Duration of the interval with which to poll the status of transactions. + /// It is recommended to set this to a value close to the Exporter's publish_interval. + #[serde(with = "humantime_serde")] + pub poll_interval_duration: Duration, + /// Maximum number of recent transactions to monitor. When this number is exceeded, + /// the oldest transactions are no longer monitored. It is recommended to set this to + /// a value at least as large as (number of products published / number of products in a batch). + pub max_transactions: usize, + } + + impl Default for Config { + fn default() -> Self { + Self { + poll_interval_duration: Duration::from_secs(4), + max_transactions: 100, + } + } + } + + pub async fn transaction_monitor(config: network::Config, state: Arc) + where + S: Transactions, + { + let client = RpcClient::new_with_timeout(config.rpc_url.to_string(), config.rpc_timeout); + let mut poll_interval = + tokio::time::interval(config.exporter.transaction_monitor.poll_interval_duration); + + loop { + poll_interval.tick().await; + if let Err(err) = Transactions::poll_transactions_status(&*state, &client).await { + tracing::error!(err = ?err, "Transaction monitor failed."); + } + } + } +} diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index 84d215f..dd0e87c 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -11,10 +11,7 @@ use { Network, }, }, - state::oracle::{ - Oracle, - PricePublishingMetadata, - }, + state::oracle::Oracle, }, anyhow::Result, solana_account_decoder::UiAccountEncoding, @@ -32,25 +29,20 @@ use { account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey, + signature::Keypair, }, std::{ - collections::HashMap, sync::Arc, time::{ Duration, Instant, }, }, - tokio::sync::watch::Sender, tokio_stream::StreamExt, }; -pub async fn oracle( - config: Config, - network: Network, - state: Arc, - publisher_permissions_tx: Sender>>, -) where +pub async fn oracle(config: Config, network: Network, state: Arc) +where S: Oracle, S: Send + Sync + 'static, { @@ -64,8 +56,8 @@ pub async fn oracle( network, state.clone(), key_store.mapping_key, + key_store.publish_keypair, config.oracle.max_lookup_batch_size, - publisher_permissions_tx.clone(), )); if config.oracle.subscriber_enabled { @@ -158,8 +150,8 @@ async fn poller( network: Network, state: Arc, mapping_key: Pubkey, + publish_keypair: Option, max_lookup_batch_size: usize, - publisher_permissions_tx: Sender>>, ) where S: Oracle, S: Send + Sync + 'static, @@ -175,15 +167,16 @@ async fn poller( )); loop { - tick.tick().await; - tracing::debug!("Polling for updates."); if let Err(err) = async { + tick.tick().await; + tracing::debug!("Polling for updates."); Oracle::poll_updates( &*state, + network, mapping_key, + publish_keypair.as_ref(), &client, max_lookup_batch_size, - publisher_permissions_tx.clone(), ) .await?; Oracle::sync_global_store(&*state, network).await diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 86ee433..80603a7 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -5,35 +5,20 @@ pub mod exporter; /// - The Exporter, which publishes data to the network pub mod network { use { - super::{ - exporter, - key_store::{ - self, - KeyStore, - }, + super::key_store::{ + self, }, - crate::agent::state::{ - oracle::{ + crate::agent::{ + services::exporter, + state::oracle::{ self, - PricePublishingMetadata, }, - State, }, - anyhow::Result, serde::{ Deserialize, Serialize, }, - solana_sdk::pubkey::Pubkey, - std::{ - collections::HashMap, - sync::Arc, - time::Duration, - }, - tokio::{ - sync::watch, - task::JoinHandle, - }, + std::time::Duration, }; #[derive(Clone, Copy, Serialize, Deserialize, Debug)] @@ -75,68 +60,6 @@ pub mod network { #[serde(default)] pub exporter: exporter::Config, } - - /// Spawn an Oracle, in-progress porting this to State. - /// - /// Behaviour: - /// - Spawns Oracle: (Obsolete, now Extracted to state/oracle.rs) - /// - Spawns a Subscriber: - /// o Subscribes to the Oracle program key. - /// o Decodes account events related to the Oracle. - /// o Sends update. - /// - Spawns a Poller: - /// o Fetches Mapping Accounts - /// o Iterates Product+Price Accounts - /// o Sends update. - /// - Oracle then Listens for Updates from Subscriber - /// o Filters for Price Account Updates. - /// o Stores its own copy of the Price Account. - /// o Updates the Global Store for that Price Account. - /// - Oracle also Listens for Updates from Poller - /// o Tracks if any new Mapping Accounts were found. - /// o Update Local Data - /// o Updates entire Global Store View. - /// - Spawns Exporter: - /// - Spawns NetworkQuerier - /// - Queries BlockHash in a timer. - /// - Sends BlockHash + Slot - /// - Spawns Transaction Monitor: - /// - Listens for for Transactions - /// - Adds to tracked Transactions - /// - Responds to queries about Tx status. - /// - Spawns Exporter - /// - On Publish tick: pushes updates to the network as a batch. - /// - On Compute Unit Price Tick: calculates new median price fee from recent - /// - /// Plan: - /// - Subscriber & Poller Can Be Spawnable Tasks - /// - Oracle becomes a State API - /// - - pub fn spawn_network( - config: Config, - network: Network, - state: Arc, - publisher_permissions_rx: watch::Receiver< - HashMap>, - >, - ) -> Result>> { - let mut jhs = vec![]; - - // Spawn the Exporter - let exporter_jhs = exporter::spawn_exporter( - config.exporter, - network, - &config.rpc_url, - config.rpc_timeout, - publisher_permissions_rx, - KeyStore::new(config.key_store.clone())?, - state, - )?; - - jhs.extend(exporter_jhs); - - Ok(jhs) - } } /// The key_store module is responsible for parsing the pythd key store. diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index f6ed681..036c0ad 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -514,7 +514,6 @@ impl Exporter { /// (n / batch_size) requests in flight. async fn publish_updates(&mut self) -> Result<()> { let permissioned_updates = self.get_permissioned_updates().await?; - let current_timestamp_millis = Utc::now().timestamp_millis(); if permissioned_updates.is_empty() { return Ok(()); diff --git a/src/agent/state.rs b/src/agent/state.rs index 41bdaed..1edcbd1 100644 --- a/src/agent/state.rs +++ b/src/agent/state.rs @@ -16,10 +16,12 @@ use { }; pub mod api; +pub mod exporter; pub mod global; pub mod keypairs; pub mod local; pub mod oracle; +pub mod transactions; pub use api::Prices; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -55,6 +57,12 @@ pub struct State { /// State for the Solana-based Oracle functionality. oracle: oracle::OracleState, + + /// State for the Solana-based Exporter functionality. + exporter: exporter::ExporterState, + + /// State for the Solana transaction monitor + transactions: transactions::TransactionsState, } /// Represents a single Notify Price Sched subscription @@ -74,7 +82,27 @@ struct NotifyPriceSubscription { } impl State { - pub async fn new(config: Config) -> Self { + pub async fn new(config: &crate::agent::config::Config) -> Self { + let registry = &mut *PROMETHEUS_REGISTRY.lock().await; + State { + global_store: global::Store::new(registry), + local_store: local::Store::new(registry), + keypairs: keypairs::KeypairState::default(), + prices: api::PricesState::new(config.state.clone()), + oracle: oracle::OracleState::new(), + exporter: exporter::ExporterState::new(), + transactions: transactions::TransactionsState::new( + config + .primary_network + .exporter + .transaction_monitor + .max_transactions, + ), + } + } + + #[cfg(test)] + pub async fn new_tests(config: Config) -> Self { let registry = &mut *PROMETHEUS_REGISTRY.lock().await; State { global_store: global::Store::new(registry), @@ -82,6 +110,8 @@ impl State { keypairs: keypairs::KeypairState::default(), prices: api::PricesState::new(config), oracle: oracle::OracleState::new(), + exporter: exporter::ExporterState::new(), + transactions: transactions::TransactionsState::new(100), } } } @@ -156,7 +186,7 @@ mod tests { let config = Config { notify_price_sched_interval_duration, }; - let state = Arc::new(State::new(config).await); + let state = Arc::new(State::new_tests(config).await); let (shutdown_tx, _) = broadcast::channel(1); // Spawn Price Notifier diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs new file mode 100644 index 0000000..0462abb --- /dev/null +++ b/src/agent/state/exporter.rs @@ -0,0 +1,830 @@ +use { + super::{ + local::PriceInfo, + oracle::PricePublishingMetadata, + transactions::Transactions, + State, + }, + crate::agent::{ + services::exporter::NetworkState, + solana::network::Network, + state::{ + global::GlobalStore, + keypairs::Keypairs, + local::LocalStore, + }, + }, + anyhow::{ + anyhow, + Context, + Result, + }, + bincode::Options, + chrono::Utc, + futures_util::future::join_all, + pyth_sdk::Identifier, + pyth_sdk_solana::state::PriceStatus, + serde::Serialize, + solana_client::{ + nonblocking::rpc_client::RpcClient, + rpc_config::RpcSendTransactionConfig, + }, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + instruction::{ + AccountMeta, + Instruction, + }, + pubkey::Pubkey, + signature::Keypair, + signer::Signer, + sysvar::clock, + transaction::Transaction, + }, + std::{ + collections::{ + BTreeMap, + HashMap, + }, + sync::Arc, + time::Duration, + }, + tokio::sync::{ + watch, + RwLock, + }, +}; + +const PYTH_ORACLE_VERSION: u32 = 2; +const UPDATE_PRICE_NO_FAIL_ON_ERROR: i32 = 13; + +#[repr(C)] +#[derive(Serialize, PartialEq, Debug, Clone)] +struct UpdPriceCmd { + version: u32, + cmd: i32, + status: PriceStatus, + unused_: u32, + price: i64, + conf: u64, + pub_slot: u64, +} + +#[derive(Default)] +pub struct ExporterState { + /// The last state published for each price identifier. Used to + /// rule out stale data and prevent repetitive publishing of + /// unchanged prices. + last_published_state: RwLock>, + + /// Currently known permissioned prices of this publisher along with their market hours + our_prices: RwLock>, + + /// Recent compute unit price in micro lamports (set if dynamic compute unit pricing is enabled) + recent_compute_unit_price_micro_lamports: RwLock>, +} + +impl ExporterState { + pub fn new() -> Self { + Self::default() + } +} + +#[async_trait::async_trait] +pub trait Exporter +where + Self: Keypairs, + Self: LocalStore, + Self: GlobalStore, + Self: Transactions, +{ + async fn record_publish(&self, batch_state: HashMap); + async fn get_permissioned_updates( + &self, + network: Network, + publish_keypair: &Keypair, + staleness_threshold: Duration, + unchanged_publish_threshold: Duration, + ) -> Result>; + async fn get_recent_compute_unit_price_micro_lamports(&self) -> Option; + async fn update_recent_compute_unit_price( + &self, + network: Network, + publish_keypair: &Keypair, + rpc_client: &RpcClient, + staleness_threshold: Duration, + unchanged_publish_threshold: Duration, + ) -> Result<()>; + async fn update_permissions( + &self, + network: Network, + publish_keypair: Option<&Keypair>, + publisher_permissions: HashMap>, + ) -> Result<()>; +} + +/// Allow downcasting State into ExporterState for functions that depend on the `Exporter` service. +impl<'a> From<&'a State> for &'a ExporterState { + fn from(state: &'a State) -> &'a ExporterState { + &state.exporter + } +} + +#[async_trait::async_trait] +impl Exporter for T +where + for<'a> &'a T: Into<&'a ExporterState>, + T: Sync + Send + 'static, + T: Keypairs, + T: LocalStore, + T: GlobalStore, + T: Transactions, +{ + async fn record_publish(&self, batch_state: HashMap) { + self.into() + .last_published_state + .write() + .await + .extend(batch_state); + } + + async fn get_permissioned_updates( + &self, + network: Network, + publish_keypair: &Keypair, + staleness_threshold: Duration, + unchanged_publish_threshold: Duration, + ) -> Result> { + let local_store_contents = LocalStore::get_all_price_infos(self).await; + let now = Utc::now().naive_utc(); + + { + let keys = self.into().our_prices.read().await; + let keys = keys.keys(); + tracing::debug!( + our_prices = ?keys, + publish_pubkey = publish_keypair.pubkey().to_string(), + "Exporter: filtering prices permissioned to us", + ); + } + + let last_published_state = self.into().last_published_state.read().await; + let our_prices = self.into().our_prices.read().await; + + // Filter the contents to only include information we haven't already sent, + // and to ignore stale information. + Ok(local_store_contents + .into_iter() + .filter(|(_identifier, info)| { + // Filter out timestamps that are old + now < info.timestamp + staleness_threshold + }) + .filter(|(identifier, info)| { + // Filter out unchanged price data if the max delay wasn't reached + if let Some(last_info) = last_published_state.get(identifier) { + if info.timestamp > last_info.timestamp + unchanged_publish_threshold { + true // max delay since last published state reached, we publish anyway + } else { + !last_info.cmp_no_timestamp(info) // Filter out if data is unchanged + } + } else { + true // No prior data found, letting the price through + } + }) + .filter(|(id, _data)| { + let key_from_id = Pubkey::from((*id).clone().to_bytes()); + if let Some(publisher_permission) = our_prices.get(&key_from_id) { + let now_utc = Utc::now(); + let ret = publisher_permission.schedule.can_publish_at(&now_utc); + + if !ret { + tracing::debug!( + price_account = key_from_id.to_string(), + schedule = ?publisher_permission.schedule, + utc_time = now_utc.format("%c").to_string(), + "Exporter: Attempted to publish price outside market hours", + ); + } + + ret + } else { + // Note: This message is not an error. Some + // publishers have different permissions on + // primary/secondary networks + tracing::debug!( + unpermissioned_price_account = key_from_id.to_string(), + permissioned_accounts = ?self.into().our_prices, + "Exporter: Attempted to publish a price without permission, skipping", + ); + false + } + }) + .filter(|(id, info)| { + // Filtering out prices that are being updated too frequently according to publisher_permission.publish_interval + let last_info = match last_published_state.get(id) { + Some(last_info) => last_info, + None => { + // No prior data found, letting the price through + return true; + } + }; + + let key_from_id = Pubkey::from((*id).clone().to_bytes()); + let publisher_metadata = match our_prices.get(&key_from_id) { + Some(metadata) => metadata, + None => { + // Should never happen since we have filtered out the price above + return false; + } + }; + + if let Some(publish_interval) = publisher_metadata.publish_interval { + if info.timestamp < last_info.timestamp + publish_interval { + // Updating the price too soon after the last update, skipping + return false; + } + } + true + }) + .collect::>()) + } + + async fn get_recent_compute_unit_price_micro_lamports(&self) -> Option { + *self + .into() + .recent_compute_unit_price_micro_lamports + .read() + .await + } + + async fn update_recent_compute_unit_price( + &self, + network: Network, + publish_keypair: &Keypair, + rpc_client: &RpcClient, + staleness_threshold: Duration, + unchanged_publish_threshold: Duration, + ) -> Result<()> { + let permissioned_updates = self + .get_permissioned_updates( + network, + publish_keypair, + staleness_threshold, + unchanged_publish_threshold, + ) + .await?; + let price_accounts = permissioned_updates + .iter() + .map(|(identifier, _)| Pubkey::from(identifier.to_bytes())) + .collect::>(); + + *self + .into() + .recent_compute_unit_price_micro_lamports + .write() + .await = + estimate_compute_unit_price_micro_lamports(rpc_client, &price_accounts).await?; + + Ok(()) + } + + async fn update_permissions( + &self, + network: Network, + publish_keypair: Option<&Keypair>, + publisher_permissions: HashMap>, + ) -> Result<()> { + let publish_keypair = get_publish_keypair(self, network, publish_keypair).await?; + *self.into().our_prices.write().await = publisher_permissions + .get(&publish_keypair.pubkey()) + .cloned() + .unwrap_or_else(|| { + tracing::warn!( + publish_pubkey = &publish_keypair.pubkey().to_string(), + "Exporter: No permissioned prices were found for the publishing keypair on-chain. This is expected only on startup.", + ); + HashMap::new() + }); + + Ok(()) + } +} + +pub async fn get_publish_keypair( + state: &S, + network: Network, + publish_keypair: Option<&Keypair>, +) -> Result +where + S: Exporter, + S: Keypairs, +{ + if let Some(kp) = publish_keypair.as_ref() { + // It's impossible to sanely return a &Keypair in the + // other if branch, so we clone the reference. + Ok(Keypair::from_bytes(&kp.to_bytes()) + .context("INTERNAL: Could not convert keypair to bytes and back")?) + } else { + // Request the keypair from remote keypair loader. Doing + // this here guarantees that the up to date loaded keypair + // is being used. + // + // Currently, we're guaranteed not to clog memory or block + // the keypair loader under the following assumptions: + // - The Exporter publishing loop waits for a publish + // attempt to finish before beginning the next + // one. Currently realized in run() + // - The Remote Key Loader does not read channels for + // keypairs it does not have. Currently expressed in + // handle_key_requests() in remote_keypair_loader.rs + + tracing::debug!("Exporter: Publish keypair is None, requesting remote loaded key"); + let kp = Keypairs::request_keypair(state, network).await?; + tracing::debug!("Exporter: Keypair received"); + Ok(kp) + } +} + +async fn estimate_compute_unit_price_micro_lamports( + rpc_client: &RpcClient, + price_accounts: &[Pubkey], +) -> Result> +where + for<'a> &'a S: Into<&'a ExporterState>, + S: Exporter, +{ + let mut slot_compute_fee: BTreeMap = BTreeMap::new(); + + // Maximum allowed number of accounts is 128. So we need to chunk the requests + let prioritization_fees_batches = futures_util::future::join_all( + price_accounts + .chunks(128) + .map(|price_accounts| rpc_client.get_recent_prioritization_fees(price_accounts)), + ) + .await + .into_iter() + .collect::, _>>() + .context("Failed to get recent prioritization fees")?; + + prioritization_fees_batches + .iter() + .for_each(|prioritization_fees| { + prioritization_fees.iter().for_each(|fee| { + // Get the maximum prioritaztion fee over all fees retrieved for this slot + let prioritization_fee = slot_compute_fee + .get(&fee.slot) + .map_or(fee.prioritization_fee, |other| { + fee.prioritization_fee.max(*other) + }); + slot_compute_fee.insert(fee.slot, prioritization_fee); + }) + }); + + let mut prioritization_fees = slot_compute_fee + .iter() + .rev() + .take(20) // Only take the last 20 slot priority fees + .map(|(_, fee)| *fee) + .collect::>(); + + prioritization_fees.sort(); + + let median_priority_fee = prioritization_fees + .get(prioritization_fees.len() / 2) + .cloned(); + + Ok(median_priority_fee) +} + +/// Publishes any price updates in the local store that we haven't sent to this network. +/// +/// The strategy used to do this is as follows: +/// - Fetch all the price updates currently present in the local store +/// - Filter out price updates we have previously attempted to publish, or which are +/// too old to publish. +/// - Collect the price updates into batches. +/// - Publish all the batches, staggering them evenly over the interval at which this method is called. +/// +/// This design is intended to: +/// - Decouple the rate at which the local store is updated and the rate at which we publish transactions. +/// A user sending an unusually high rate of price updates shouldn't be reflected in the transaction rate. +/// - Degrade gracefully if the blockchain RPC node exhibits poor performance. If the RPC node takes a long +/// time to respond, no internal queues grow unboundedly. At any single point in time there are at most +/// (n / batch_size) requests in flight. +pub async fn publish_batches( + state: &S, + client: Arc, + network: Network, + network_state_rx: &watch::Receiver, + accumulator_key: Option, + publish_keypair: &Keypair, + program_key: Pubkey, + max_batch_size: usize, + staleness_threshold: Duration, + compute_unit_limit: u32, + compute_unit_price_micro_lamports: Option, + maximum_compute_unit_price_micro_lamports: u64, + maximum_slot_gap_for_dynamic_compute_unit_price: u64, + dynamic_compute_unit_pricing_enabled: bool, + permissioned_updates: Vec<(pyth_sdk::Identifier, PriceInfo)>, +) -> Result<()> +where + S: Sync + Send + 'static, + S: Keypairs, + S: LocalStore, + S: GlobalStore, + S: Transactions, + S: Exporter, +{ + if permissioned_updates.is_empty() { + return Ok(()); + } + + // Split the updates up into batches + let batches = permissioned_updates.chunks(max_batch_size); + + let mut batch_state = HashMap::new(); + let mut batch_futures = vec![]; + + let network_state = network_state_rx.borrow().clone(); + for batch in batches { + batch_futures.push(publish_batch( + state, + client.clone(), + network, + network_state, + accumulator_key, + publish_keypair, + program_key, + batch, + staleness_threshold, + compute_unit_limit, + compute_unit_price_micro_lamports, + maximum_compute_unit_price_micro_lamports, + maximum_slot_gap_for_dynamic_compute_unit_price, + dynamic_compute_unit_pricing_enabled, + )); + + for (identifier, info) in batch { + batch_state.insert(*identifier, (*info).clone()); + } + } + + // Wait for all the update requests to complete. Note that this doesn't wait for the + // transactions themselves to be processed or confirmed, just the RPC requests to return. + join_all(batch_futures) + .await + .into_iter() + .collect::>>()?; + + Exporter::record_publish(state, batch_state).await; + Ok(()) +} + +async fn publish_batch( + state: &S, + client: Arc, + network: Network, + network_state: NetworkState, + accumulator_key: Option, + publish_keypair: &Keypair, + program_key: Pubkey, + batch: &[(Identifier, PriceInfo)], + staleness_threshold: Duration, + compute_unit_limit: u32, + compute_unit_price_micro_lamports_opt: Option, + maximum_compute_unit_price_micro_lamports: u64, + maximum_slot_gap_for_dynamic_compute_unit_price: u64, + dynamic_compute_unit_pricing_enabled: bool, +) -> Result<()> +where + S: Sync + Send + 'static, + S: Keypairs, + S: LocalStore, + S: GlobalStore, + S: Transactions, + S: Exporter, +{ + let mut instructions = Vec::new(); + + // Refresh the data in the batch + let local_store_contents = LocalStore::get_all_price_infos(&*state).await; + let refreshed_batch = batch.iter().map(|(identifier, _)| { + ( + identifier, + local_store_contents + .get(identifier) + .ok_or_else(|| anyhow!("price identifier not found in local store")) + .with_context(|| identifier.to_string()), + ) + }); + let price_accounts = refreshed_batch + .clone() + .map(|(identifier, _)| Pubkey::from(identifier.to_bytes())) + .collect::>(); + + for (identifier, price_info_result) in refreshed_batch { + let price_info = price_info_result?; + let now = Utc::now().naive_utc(); + + let stale_price = now > price_info.timestamp + staleness_threshold; + if stale_price { + continue; + } + + let instruction = if let Some(accumulator_program_key) = accumulator_key { + create_instruction_with_accumulator( + publish_keypair.pubkey(), + program_key, + Pubkey::from(identifier.to_bytes()), + price_info, + network_state.current_slot, + accumulator_program_key, + )? + } else { + create_instruction_without_accumulator( + publish_keypair.pubkey(), + program_key, + Pubkey::from(identifier.to_bytes()), + price_info, + network_state.current_slot, + )? + }; + + instructions.push(instruction); + } + + // Pay priority fees, if configured + let total_compute_limit: u32 = compute_unit_limit * instructions.len() as u32; + + instructions.push(ComputeBudgetInstruction::set_compute_unit_limit( + total_compute_limit, + )); + + // Calculate the compute unit price in micro lamports + let mut compute_unit_price_micro_lamports = None; + + // If the unit price value is set, use it as the minimum price + if let Some(price) = compute_unit_price_micro_lamports_opt { + compute_unit_price_micro_lamports = Some(price); + } + + // If dynamic compute unit pricing is enabled, we use the following two methods to calculate an + // estimate of the price: + // - We exponentially increase price based on the price staleness (slot gap between the + // current slot and the oldest slot amongst all the accounts in this batch). + // - We use the network recent prioritization fees to get the minimum unit price + // that landed a transaction using Pyth price accounts (permissioned to this publisher) + // as writable. We take the median over the last 20 slots and divide it by two to make + // sure that it decays over time. The API doesn't return the priority fees for the Pyth + // price reads and so, this reflects the unit price that publishers have paid in the + // pverious slots. + // + // The two methods above combined act like massively increasing the price when they cannot + // land transactions on-chain that decays over time. The decaying behaviour is important to + // keep the uptime high during congestion whereas without it we would publish price after a + // large gap and then we can publish it again after the next large gap. + if dynamic_compute_unit_pricing_enabled { + // Use the estimated previous price if it is higher + // than the current price. + let recent_compute_unit_price_micro_lamports = + Exporter::get_recent_compute_unit_price_micro_lamports(state).await; + + if let Some(estimated_recent_price) = recent_compute_unit_price_micro_lamports { + // Get the estimated compute unit price and wrap it so it stays below the maximum + // total compute unit fee. We additionally divide such price by 2 to create an + // exponential decay. This will make sure that a spike doesn't get propagated + // forever. + let estimated_price = estimated_recent_price >> 1; + + compute_unit_price_micro_lamports = compute_unit_price_micro_lamports + .map(|price| price.max(estimated_price)) + .or(Some(estimated_price)); + } + + // Use exponentially higher price if this publisher hasn't published in a while for the accounts + // in this batch. This will use the maximum total compute unit fee if the publisher + // hasn't updated for >= MAXIMUM_SLOT_GAP_FOR_DYNAMIC_COMPUTE_UNIT_PRICE slots. + let result = GlobalStore::price_accounts( + &*state, + network, + price_accounts.clone().into_iter().collect(), + ) + .await?; + + // Calculate the maximum slot difference between the publisher latest slot and + // current slot amongst all the accounts. Here, the aggregate slot is + // used instead of the publishers latest update to avoid overpaying. + let oldest_slot = result + .values() + .filter(|account| account.min_pub != 255) // Only consider live price accounts + .flat_map(|account| { + account + .comp + .iter() + .find(|c| c.publisher == publish_keypair.pubkey()) + .map(|c| c.latest.pub_slot.max(account.agg.pub_slot)) + }) + .min(); + + if let Some(oldest_slot) = oldest_slot { + let slot_gap = network_state.current_slot.saturating_sub(oldest_slot); + + // Set the dynamic price exponentially based on the slot gap. If the max slot gap is + // 25, on this number (or more) the maximum unit price is paid, and then on slot 24 it + // is half of that and gets halved each lower slot. Given that we have max total + // compute price of 10**12 and 250k compute units in one tx (12 updates) these are the + // estimated prices based on slot gaps: + // 25 (or more): 4_000_000 + // 20 : 125_000 + // 18 : 31_250 + // 15 : 3_906 + // 13 : 976 + // 10 : 122 + let exponential_price = maximum_compute_unit_price_micro_lamports + >> maximum_slot_gap_for_dynamic_compute_unit_price.saturating_sub(slot_gap); + + compute_unit_price_micro_lamports = compute_unit_price_micro_lamports + .map(|price| price.max(exponential_price)) + .or(Some(exponential_price)); + } + } + + if let Some(mut compute_unit_price_micro_lamports) = compute_unit_price_micro_lamports { + compute_unit_price_micro_lamports = + compute_unit_price_micro_lamports.min(maximum_compute_unit_price_micro_lamports); + + tracing::debug!( + unit_price = compute_unit_price_micro_lamports, + "setting compute unit price", + ); + instructions.push(ComputeBudgetInstruction::set_compute_unit_price( + compute_unit_price_micro_lamports, + )); + } + + let transaction = Transaction::new_signed_with_payer( + &instructions, + Some(&publish_keypair.pubkey()), + &vec![&publish_keypair], + network_state.blockhash, + ); + + let signature = match client + .send_transaction_with_config( + &transaction, + RpcSendTransactionConfig { + skip_preflight: true, + ..RpcSendTransactionConfig::default() + }, + ) + .await + { + Ok(signature) => signature, + Err(err) => { + tracing::error!(err = ?err, "Exporter: failed to send transaction."); + return Ok(()); + } + }; + + tracing::debug!( + signature = signature.to_string(), + instructions = instructions.len(), + price_accounts = ?price_accounts, + "Sent upd_price transaction.", + ); + + Transactions::add_transaction(&*state, signature).await; + + Ok(()) +} + +fn create_instruction_without_accumulator( + publish_pubkey: Pubkey, + program_key: Pubkey, + price_id: Pubkey, + price_info: &PriceInfo, + current_slot: u64, +) -> Result { + Ok(Instruction { + program_id: program_key, + accounts: vec![ + AccountMeta { + pubkey: publish_pubkey, + is_signer: true, + is_writable: true, + }, + AccountMeta { + pubkey: price_id, + is_signer: false, + is_writable: true, + }, + AccountMeta { + pubkey: clock::id(), + is_signer: false, + is_writable: false, + }, + ], + data: bincode::DefaultOptions::new() + .with_little_endian() + .with_fixint_encoding() + .serialize( + &(UpdPriceCmd { + version: PYTH_ORACLE_VERSION, + cmd: UPDATE_PRICE_NO_FAIL_ON_ERROR, + status: price_info.status, + unused_: 0, + price: price_info.price, + conf: price_info.conf, + pub_slot: current_slot, + }), + )?, + }) +} + +fn create_instruction_with_accumulator( + publish_pubkey: Pubkey, + program_key: Pubkey, + price_id: Pubkey, + price_info: &PriceInfo, + current_slot: u64, + accumulator_program_key: Pubkey, +) -> Result { + let (whitelist_pubkey, _whitelist_bump) = Pubkey::find_program_address( + &["message".as_bytes(), "whitelist".as_bytes()], + &accumulator_program_key, + ); + + let (oracle_auth_pda, _) = Pubkey::find_program_address( + &[b"upd_price_write", &accumulator_program_key.to_bytes()], + &program_key, + ); + + let (accumulator_data_pubkey, _accumulator_data_pubkey) = Pubkey::find_program_address( + &[ + &oracle_auth_pda.to_bytes(), + "message".as_bytes(), + &price_id.to_bytes(), + ], + &accumulator_program_key, + ); + + Ok(Instruction { + program_id: program_key, + accounts: vec![ + AccountMeta { + pubkey: publish_pubkey, + is_signer: true, + is_writable: true, + }, + AccountMeta { + pubkey: price_id, + is_signer: false, + is_writable: true, + }, + AccountMeta { + pubkey: clock::id(), + is_signer: false, + is_writable: false, + }, + // accumulator program key + AccountMeta { + pubkey: accumulator_program_key, + is_signer: false, + is_writable: false, + }, + // whitelist + AccountMeta { + pubkey: whitelist_pubkey, + is_signer: false, + is_writable: false, + }, + // oracle_auth_pda + AccountMeta { + pubkey: oracle_auth_pda, + is_signer: false, + is_writable: false, + }, + // accumulator_data + AccountMeta { + pubkey: accumulator_data_pubkey, + is_signer: false, + is_writable: true, + }, + ], + data: bincode::DefaultOptions::new() + .with_little_endian() + .with_fixint_encoding() + .serialize( + &(UpdPriceCmd { + version: PYTH_ORACLE_VERSION, + cmd: UPDATE_PRICE_NO_FAIL_ON_ERROR, + status: price_info.status, + unused_: 0, + price: price_info.price, + conf: price_info.conf, + pub_slot: current_slot, + }), + )?, + }) +} diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index 9063c88..c73cedd 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -1,5 +1,8 @@ use { - super::super::solana::network::Network, + super::{ + super::solana::network::Network, + exporter::Exporter, + }, crate::agent::{ legacy_schedule::LegacySchedule, market_schedule::MarketSchedule, @@ -32,6 +35,7 @@ use { account::Account, commitment_config::CommitmentLevel, pubkey::Pubkey, + signature::Keypair, }, std::{ collections::{ @@ -40,10 +44,7 @@ use { }, time::Duration, }, - tokio::sync::{ - watch::Sender, - RwLock, - }, + tokio::sync::RwLock, }; #[derive(Debug, Clone)] @@ -185,10 +186,11 @@ pub trait Oracle { async fn sync_global_store(&self, network: Network) -> Result<()>; async fn poll_updates( &self, + network: Network, mapping_key: Pubkey, + publish_keypair: Option<&Keypair>, rpc_client: &RpcClient, max_lookup_batch_size: usize, - publisher_permissions_tx: Sender>>, ) -> Result<()>; async fn handle_price_account_update( &self, @@ -209,8 +211,9 @@ impl<'a> From<&'a State> for &'a OracleState { impl Oracle for T where for<'a> &'a T: Into<&'a OracleState>, + T: Send + Sync + 'static, T: Prices, - T: Sync, + T: Exporter, { async fn handle_price_account_update( &self, @@ -258,10 +261,11 @@ where /// Poll target Solana based chain for Pyth related accounts. async fn poll_updates( &self, + network: Network, mapping_key: Pubkey, + publish_keypair: Option<&Keypair>, rpc_client: &RpcClient, max_lookup_batch_size: usize, - publisher_permissions_tx: Sender>>, ) -> Result<()> { let mut publisher_permissions = HashMap::new(); let mapping_accounts = fetch_mapping_accounts(rpc_client, mapping_key).await?; @@ -313,7 +317,13 @@ where log_data_diff(&data, &new_data); *data = new_data; - publisher_permissions_tx.send_replace(data.publisher_permissions.clone()); + Exporter::update_permissions( + self, + network, + publish_keypair, + data.publisher_permissions.clone(), + ) + .await?; Ok(()) } diff --git a/src/agent/state/transactions.rs b/src/agent/state/transactions.rs new file mode 100644 index 0000000..22505e7 --- /dev/null +++ b/src/agent/state/transactions.rs @@ -0,0 +1,111 @@ +use { + super::State, + anyhow::Result, + solana_client::nonblocking::rpc_client::RpcClient, + solana_sdk::{ + commitment_config::CommitmentConfig, + signature::Signature, + }, + std::collections::VecDeque, + tokio::sync::RwLock, +}; + +#[derive(Default)] +pub struct TransactionsState { + sent_transactions: RwLock>, + max_transactions: usize, +} + +impl TransactionsState { + pub fn new(max_transactions: usize) -> Self { + Self { + sent_transactions: Default::default(), + max_transactions, + } + } +} + +#[async_trait::async_trait] +pub trait Transactions { + async fn add_transaction(&self, signature: Signature); + async fn poll_transactions_status(&self, rpc: &RpcClient) -> Result<()>; +} + +/// Allow downcasting State into TransactionsState for functions that depend on the `Transactions` service. +impl<'a> From<&'a State> for &'a TransactionsState { + fn from(state: &'a State) -> &'a TransactionsState { + &state.transactions + } +} + +#[async_trait::async_trait] +impl Transactions for T +where + for<'a> &'a T: Into<&'a TransactionsState>, + T: Sync + Send + 'static, +{ + async fn add_transaction(&self, signature: Signature) { + tracing::debug!( + signature = signature.to_string(), + "Monitoring new transaction.", + ); + + // Add the new transaction to the list + let mut txs = self.into().sent_transactions.write().await; + txs.push_back(signature); + + // Pop off the oldest transaction if necessary + if txs.len() > self.into().max_transactions { + txs.pop_front(); + } + } + + async fn poll_transactions_status(&self, rpc: &RpcClient) -> Result<()> { + let mut txs = self.into().sent_transactions.write().await; + if txs.is_empty() { + return Ok(()); + } + + let signatures_contiguous = txs.make_contiguous(); + + // Poll the status of each transaction, in a single RPC request + let statuses = rpc + .get_signature_statuses(signatures_contiguous) + .await? + .value; + + tracing::debug!( + statuses = ?statuses, + "Processing Signature Statuses", + ); + + // Determine the percentage of the recently sent transactions that have successfully been committed + // TODO: expose as metric + let confirmed = statuses + .into_iter() + .zip(signatures_contiguous) + .map(|(status, sig)| status.map(|some_status| (some_status, sig))) // Collate Some() statuses with their tx signatures before flatten() + .flatten() + .filter(|(status, sig)| { + if let Some(err) = status.err.as_ref() { + tracing::warn!( + error = err.to_string(), + tx_signature = sig.to_string(), + "TX status has err value", + ); + } + + status.satisfies_commitment(CommitmentConfig::confirmed()) + }) + .count(); + + let percentage_confirmed = ((confirmed as f64) / (txs.len() as f64)) * 100.0; + + tracing::info!( + percentage_confirmed = format!("{:.}", percentage_confirmed), + "monitoring transaction hit rate", + ); + + Ok(()) + } +}