diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index f30c4d1..cb58b6f 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -1,48 +1,23 @@ #[allow(deprecated)] use crate::agent::legacy_schedule::LegacySchedule; use { - super::{ - super::solana::network::Network, - exporter::Exporter, - }, + super::{super::solana::network::Network, exporter::Exporter}, crate::agent::{ market_schedule::MarketSchedule, - state::{ - global::Update, - Prices, - State, - }, - }, - anyhow::{ - anyhow, - Context, - Result, + state::{global::Update, Prices, State}, }, + anyhow::{anyhow, Context, Result}, pyth_sdk_solana::state::{ - load_mapping_account, - load_product_account, - GenericPriceAccount, - MappingAccount, - PriceComp, - PythnetPriceAccount, - SolanaPriceAccount, - }, - serde::{ - Deserialize, - Serialize, + load_mapping_account, load_product_account, GenericPriceAccount, MappingAccount, PriceComp, + PythnetPriceAccount, SolanaPriceAccount, }, + serde::{Deserialize, Serialize}, solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::{ - account::Account, - commitment_config::CommitmentLevel, - pubkey::Pubkey, - signature::Keypair, + account::Account, commitment_config::CommitmentLevel, pubkey::Pubkey, signature::Keypair, }, std::{ - collections::{ - HashMap, - HashSet, - }, + collections::{HashMap, HashSet}, time::Duration, }, tokio::sync::RwLock, @@ -51,15 +26,15 @@ use { #[derive(Debug, Clone)] pub struct ProductEntry { - pub account_data: pyth_sdk_solana::state::ProductAccount, - pub schedule: MarketSchedule, - pub price_accounts: Vec, + pub account_data: pyth_sdk_solana::state::ProductAccount, + pub schedule: MarketSchedule, + pub price_accounts: Vec, pub publish_interval: Option, } #[derive(Default, Debug, Clone)] pub struct PricePublishingMetadata { - pub schedule: MarketSchedule, + pub schedule: MarketSchedule, pub publish_interval: Option, } @@ -77,7 +52,7 @@ pub struct PricePublishingMetadata { #[derive(Copy, Clone, Debug)] pub struct PriceEntry { // We intentionally act as if we have a truncated account where the underlying memory is unavailable. - account: GenericPriceAccount<0, ()>, + account: GenericPriceAccount<0, ()>, pub comp: [PriceComp; 64], } @@ -129,9 +104,9 @@ impl std::ops::Deref for PriceEntry { #[derive(Default, Debug, Clone)] pub struct Data { - pub mapping_accounts: HashMap, - pub product_accounts: HashMap, - pub price_accounts: HashMap, + pub mapping_accounts: HashMap, + pub product_accounts: HashMap, + pub price_accounts: HashMap, /// publisher => {their permissioned price accounts => price publishing metadata} pub publisher_permissions: HashMap>, } @@ -140,16 +115,16 @@ pub struct Data { #[serde(default)] pub struct Config { /// The commitment level to use when reading data from the RPC node. - pub commitment: CommitmentLevel, + pub commitment: CommitmentLevel, /// The interval with which to poll account information. #[serde(with = "humantime_serde")] - pub poll_interval_duration: Duration, + pub poll_interval_duration: Duration, /// Whether subscribing to account updates over websocket is enabled - pub subscriber_enabled: bool, + pub subscriber_enabled: bool, /// Capacity of the channel over which the Subscriber sends updates to the Oracle pub updates_channel_capacity: usize, /// Capacity of the channel over which the Poller sends data to the Oracle - pub data_channel_capacity: usize, + pub data_channel_capacity: usize, /// Ask the RPC for up to this many product/price accounts in a /// single request. Tune this setting if you're experiencing @@ -162,12 +137,12 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { - commitment: CommitmentLevel::Confirmed, - poll_interval_duration: Duration::from_secs(5), - subscriber_enabled: true, + commitment: CommitmentLevel::Confirmed, + poll_interval_duration: Duration::from_secs(5), + subscriber_enabled: true, updates_channel_capacity: 10000, - data_channel_capacity: 10000, - max_lookup_batch_size: 100, + data_channel_capacity: 10000, + max_lookup_batch_size: 100, } } } @@ -178,6 +153,7 @@ pub struct OracleState { impl OracleState { pub fn new() -> Self { + tracing::info!("Initializing OracleState"); Self { data: Default::default(), } @@ -232,6 +208,7 @@ where // We are only interested in price account updates, all other types of updates // will be fetched using polling. if !data.price_accounts.contains_key(account_key) { + tracing::info!("Account key not found in price accounts, skipping update."); return Ok(()); } @@ -247,13 +224,14 @@ where ); data.price_accounts.insert(*account_key, price_entry); + tracing::info!("Updated price account for key: {}", account_key); Prices::update_global_price( self, network, &Update::PriceAccountUpdate { account_key: *account_key, - account: price_entry, + account: price_entry, }, ) .await?; @@ -271,14 +249,17 @@ where rpc_client: &RpcClient, max_lookup_batch_size: usize, ) -> Result<()> { + tracing::info!("Polling updates for network: {:?}", network); let mut publisher_permissions = HashMap::new(); let mapping_accounts = fetch_mapping_accounts(rpc_client, mapping_key).await?; + tracing::info!("Fetched mapping accounts."); let (product_accounts, price_accounts) = fetch_product_and_price_accounts( rpc_client, max_lookup_batch_size, mapping_accounts.values(), ) .await?; + tracing::info!("Fetched product and price accounts."); for (price_key, price_entry) in price_accounts.iter() { for component in price_entry.comp { @@ -294,7 +275,7 @@ where product_accounts.get(&price_entry.prod) { PricePublishingMetadata { - schedule: prod_entry.schedule.clone(), + schedule: prod_entry.schedule.clone(), publish_interval: prod_entry.publish_interval, } } else { @@ -320,6 +301,7 @@ where let mut data = self.into().data.write().await; log_data_diff(&data, &new_data); *data = new_data; + tracing::info!("Updated OracleState data."); Exporter::update_permissions( self, @@ -335,6 +317,7 @@ where /// Sync Product/Price Accounts found by polling to the Global Store. #[instrument(skip(self))] async fn sync_global_store(&self, network: Network) -> Result<()> { + tracing::info!("Syncing global store for network: {:?}", network); for (product_account_key, product_account) in &self.into().data.read().await.product_accounts { @@ -343,7 +326,7 @@ where network, &Update::ProductAccountUpdate { account_key: *product_account_key, - account: product_account.clone(), + account: product_account.clone(), }, ) .await @@ -356,13 +339,14 @@ where network, &Update::PriceAccountUpdate { account_key: *price_account_key, - account: *price_account, + account: *price_account, }, ) .await .map_err(|_| anyhow!("failed to notify price account update"))?; } + tracing::info!("Global store sync completed."); Ok(()) } } @@ -372,6 +356,10 @@ async fn fetch_mapping_accounts( rpc_client: &RpcClient, mapping_account_key: Pubkey, ) -> Result> { + tracing::info!( + "Fetching mapping accounts starting from key: {}", + mapping_account_key + ); let mut accounts = HashMap::new(); let mut account_key = mapping_account_key; while account_key != Pubkey::default() { @@ -384,6 +372,7 @@ async fn fetch_mapping_accounts( accounts.insert(account_key, account); account_key = account.next; } + tracing::info!("Fetched {} mapping accounts.", accounts.len()); Ok(accounts) } @@ -396,6 +385,7 @@ async fn fetch_product_and_price_accounts<'a, A>( where A: IntoIterator, { + tracing::info!("Fetching product and price accounts."); let mut product_keys = vec![]; // Get all product keys @@ -421,6 +411,11 @@ where price_entries.extend(batch_prices.drain()); } + tracing::info!( + "Fetched {} product entries and {} price entries.", + product_entries.len(), + price_entries.len() + ); Ok((product_entries, price_entries)) } @@ -428,6 +423,7 @@ async fn fetch_batch_of_product_and_price_accounts( rpc_client: &RpcClient, product_key_batch: &[Pubkey], ) -> Result<(HashMap, HashMap)> { + tracing::info!("Fetching batch of product and price accounts."); let mut product_entries = HashMap::new(); let product_keys = product_key_batch; @@ -568,6 +564,7 @@ async fn fetch_batch_of_product_and_price_accounts( todo = next_todo; } + tracing::info!("Fetched batch of product and price accounts."); Ok((product_entries, price_entries)) }