diff --git a/Dockerfile b/Dockerfile index 8fdf4aca..d674e641 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,7 +12,7 @@ ENV PATH="${PATH}:/root/.local/bin" RUN poetry config virtualenvs.in-project true # Install Solana Tool Suite -RUN sh -c "$(curl -sSfL https://release.solana.com/v1.14.11/install)" +RUN sh -c "$(curl -sSfL https://release.solana.com/v1.14.17/install)" ENV PATH="${PATH}:/root/.local/share/solana/install/active_release/bin" ADD . /agent diff --git a/src/agent.rs b/src/agent.rs index bc3a153e..3f2c299f 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -118,26 +118,29 @@ impl Agent { // Create the channels // TODO: make all components listen to shutdown signal let (shutdown_tx, _) = broadcast::channel(self.config.channel_capacities.shutdown); - let (primary_oracle_updates_tx, primary_oracle_updates_rx) = - mpsc::channel(self.config.channel_capacities.primary_oracle_updates); - let (secondary_oracle_updates_tx, secondary_oracle_updates_rx) = - mpsc::channel(self.config.channel_capacities.secondary_oracle_updates); - let (global_store_lookup_tx, global_store_lookup_rx) = - mpsc::channel(self.config.channel_capacities.global_store_lookup); let (local_store_tx, local_store_rx) = mpsc::channel(self.config.channel_capacities.local_store); let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10); let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10); + // Create the Pythd Adapter. + let adapter = Arc::new( + pythd::adapter::Adapter::new( + self.config.pythd_adapter.clone(), + local_store_tx.clone(), + logger.clone(), + ) + .await, + ); + // Spawn the primary network jhs.extend(network::spawn_network( self.config.primary_network.clone(), network::Network::Primary, local_store_tx.clone(), - global_store_lookup_tx.clone(), - primary_oracle_updates_tx, primary_keypair_loader_tx, logger.new(o!("primary" => true)), + adapter.clone(), )?); // Spawn the secondary network, if needed @@ -146,36 +149,18 @@ impl Agent { config.clone(), network::Network::Secondary, local_store_tx.clone(), - global_store_lookup_tx.clone(), - secondary_oracle_updates_tx, secondary_keypair_loader_tx, logger.new(o!("primary" => false)), + adapter.clone(), )?); } - // Create the Pythd Adapter. - let adapter = Arc::new(pythd::adapter::Adapter::new( - self.config.pythd_adapter.clone(), - global_store_lookup_tx.clone(), - local_store_tx.clone(), - logger.clone(), - )); - // Create the Notifier task for the Pythd RPC. jhs.push(tokio::spawn(notifier( adapter.clone(), shutdown_tx.subscribe(), ))); - // Spawn the Global Store - jhs.push(store::global::spawn_store( - global_store_lookup_rx, - primary_oracle_updates_rx, - secondary_oracle_updates_rx, - adapter.clone(), - logger.clone(), - )); - // Spawn the Local Store jhs.push(store::local::spawn_store(local_store_rx, logger.clone())); @@ -183,7 +168,7 @@ impl Agent { jhs.push(tokio::spawn(rpc::run( self.config.pythd_api_server.clone(), logger.clone(), - adapter, + adapter.clone(), shutdown_tx.subscribe(), ))); @@ -191,8 +176,8 @@ impl Agent { jhs.push(tokio::spawn(metrics::MetricsServer::spawn( self.config.metrics_server.bind_address, local_store_tx, - global_store_lookup_tx, logger.clone(), + adapter, ))); // Spawn the remote keypair loader endpoint for both networks diff --git a/src/agent/dashboard.rs b/src/agent/dashboard.rs index d8eedf31..bb4b45f9 100644 --- a/src/agent/dashboard.rs +++ b/src/agent/dashboard.rs @@ -1,20 +1,23 @@ use { super::{ - solana::oracle::PriceEntry, - store::{ - global::{ - AllAccountsData, - AllAccountsMetadata, - Lookup, - PriceAccountMetadata, - }, - local::{ - Message, - PriceInfo, - }, + pythd::adapter::global::GlobalStore, + solana::{ + network::Network, + oracle::PriceEntry, + }, + store::local::{ + Message, + PriceInfo, + }, + }, + crate::agent::{ + metrics::MetricsServer, + pythd::adapter::global::{ + AllAccountsData, + AllAccountsMetadata, + PriceAccountMetadata, }, }, - crate::agent::metrics::MetricsServer, chrono::DateTime, pyth_sdk::{ Identifier, @@ -44,8 +47,6 @@ impl MetricsServer { pub async fn render_dashboard(&self) -> Result> { // Prepare response channel for requests let (local_tx, local_rx) = oneshot::channel(); - let (global_data_tx, global_data_rx) = oneshot::channel(); - let (global_metadata_tx, global_metadata_rx) = oneshot::channel(); // Request price data from local and global store self.local_store_tx @@ -54,23 +55,11 @@ impl MetricsServer { }) .await?; - self.global_store_lookup_tx - .send(Lookup::LookupAllAccountsData { - network: super::solana::network::Network::Primary, - result_tx: global_data_tx, - }) - .await?; - - self.global_store_lookup_tx - .send(Lookup::LookupAllAccountsMetadata { - result_tx: global_metadata_tx, - }) - .await?; + let global_data = GlobalStore::accounts_data(&*self.adapter, Network::Primary).await?; + let global_metadata = GlobalStore::accounts_metadata(&*self.adapter).await?; // Await the results let local_data = local_rx.await?; - let global_data = global_data_rx.await??; - let global_metadata = global_metadata_rx.await??; let symbol_view = build_dashboard_data(local_data, global_data, global_metadata, &self.logger); diff --git a/src/agent/metrics.rs b/src/agent/metrics.rs index f7742b5e..43dd052f 100644 --- a/src/agent/metrics.rs +++ b/src/agent/metrics.rs @@ -1,7 +1,7 @@ use { - super::store::{ - global::Lookup, - local::Message, + super::{ + pythd::adapter::Adapter, + store::local::Message, }, crate::agent::{ solana::oracle::PriceEntry, @@ -74,10 +74,10 @@ lazy_static! { /// dashboard and metrics. pub struct MetricsServer { /// Used to pull the state of all symbols in local store - pub local_store_tx: mpsc::Sender, - pub global_store_lookup_tx: mpsc::Sender, - pub start_time: Instant, - pub logger: Logger, + pub local_store_tx: mpsc::Sender, + pub start_time: Instant, + pub logger: Logger, + pub adapter: Arc, } impl MetricsServer { @@ -85,14 +85,14 @@ impl MetricsServer { pub async fn spawn( addr: impl Into + 'static, local_store_tx: mpsc::Sender, - global_store_lookup_tx: mpsc::Sender, logger: Logger, + adapter: Arc, ) { let server = MetricsServer { local_store_tx, - global_store_lookup_tx, start_time: Instant::now(), logger, + adapter, }; let shared_state = Arc::new(Mutex::new(server)); @@ -109,7 +109,7 @@ impl MetricsServer { .await .unwrap_or_else(|e| { // Add logging here - error!(locked_state.logger,"Dashboard: Rendering failed"; "error" => e.to_string()); + error!(locked_state.logger,"Dashboard: Rendering failed"; "error" => e.to_string()); // Withhold failure details from client "Could not render dashboard! See the logs for details".to_owned() diff --git a/src/agent/pythd/adapter.rs b/src/agent/pythd/adapter.rs index 4eaf0cfe..04cdd328 100644 --- a/src/agent/pythd/adapter.rs +++ b/src/agent/pythd/adapter.rs @@ -1,7 +1,6 @@ use { super::{ super::store::{ - global, local, PriceIdentifier, }, @@ -11,6 +10,7 @@ use { SubscriptionID, }, }, + crate::agent::metrics::PROMETHEUS_REGISTRY, serde::{ Deserialize, Serialize, @@ -27,7 +27,8 @@ use { }, }; -mod api; +pub mod api; +pub mod global; pub use api::{ notifier, AdapterApi, @@ -67,14 +68,14 @@ pub struct Adapter { /// The fixed interval at which Notify Price Sched notifications are sent notify_price_sched_interval_duration: Duration, - /// Channel on which to communicate with the global store - global_store_lookup_tx: mpsc::Sender, - /// Channel on which to communicate with the local store local_store_tx: mpsc::Sender, /// The logger logger: Logger, + + /// Global store for managing the unified state of Pyth-on-Solana networks. + global_store: global::Store, } /// Represents a single Notify Price Sched subscription @@ -94,18 +95,18 @@ struct NotifyPriceSubscription { } impl Adapter { - pub fn new( + pub async fn new( config: Config, - global_store_lookup_tx: mpsc::Sender, local_store_tx: mpsc::Sender, logger: Logger, ) -> Self { + let registry = &mut *PROMETHEUS_REGISTRY.lock().await; Adapter { + global_store: global::Store::new(logger.clone(), registry), subscription_id_seq: 1.into(), notify_price_sched_subscriptions: RwLock::new(HashMap::new()), notify_price_subscriptions: RwLock::new(HashMap::new()), notify_price_sched_interval_duration: config.notify_price_sched_interval_duration, - global_store_lookup_tx, local_store_tx, logger, } @@ -116,6 +117,10 @@ impl Adapter { mod tests { use { super::{ + global::{ + self, + AllAccountsData, + }, notifier, Adapter, AdapterApi, @@ -134,15 +139,8 @@ mod tests { PublisherAccount, }, }, - solana::{ - self, - network::Network, - }, - store::{ - global, - global::AllAccountsData, - local, - }, + solana, + store::local, }, iobuffer::IoBuffer, pyth_sdk::Identifier, @@ -174,28 +172,21 @@ mod tests { }; struct TestAdapter { - adapter: Arc, - global_store_lookup_rx: mpsc::Receiver, - local_store_rx: mpsc::Receiver, - shutdown_tx: broadcast::Sender<()>, - jh: JoinHandle<()>, + adapter: Arc, + local_store_rx: mpsc::Receiver, + shutdown_tx: broadcast::Sender<()>, + jh: JoinHandle<()>, } async fn setup() -> TestAdapter { // Create and spawn an adapter - let (global_store_lookup_tx, global_store_lookup_rx) = mpsc::channel(1000); let (local_store_tx, local_store_rx) = mpsc::channel(1000); let notify_price_sched_interval_duration = Duration::from_nanos(10); let logger = slog_test::new_test_logger(IoBuffer::new()); let config = Config { notify_price_sched_interval_duration, }; - let adapter = Arc::new(Adapter::new( - config, - global_store_lookup_tx, - local_store_tx, - logger, - )); + let adapter = Arc::new(Adapter::new(config, local_store_tx, logger).await); let (shutdown_tx, _) = broadcast::channel(1); // Spawn Price Notifier @@ -203,7 +194,6 @@ mod tests { TestAdapter { adapter, - global_store_lookup_rx, local_store_rx, shutdown_tx, jh, @@ -359,19 +349,12 @@ mod tests { async fn test_get_product_list() { // Start the test adapter let test_adapter = setup().await; - - // Return the product list to the adapter, from the global store - { - let mut global_store_lookup_rx = test_adapter.global_store_lookup_rx; - tokio::spawn(async move { - match global_store_lookup_rx.recv().await.unwrap() { - global::Lookup::LookupAllAccountsMetadata { result_tx } => result_tx - .send(Ok(get_test_all_accounts_metadata())) - .unwrap(), - _ => panic!("Uexpected message received from adapter"), - }; - }); - } + let accounts_metadata = get_test_all_accounts_metadata(); + test_adapter + .adapter + .global_store + ._account_metadata(accounts_metadata) + .await; // Send a Get Product List message let mut product_list = test_adapter.adapter.get_product_list().await.unwrap(); @@ -1073,20 +1056,12 @@ mod tests { async fn test_get_all_products() { // Start the test adapter let test_adapter = setup().await; - - // Return the account data to the adapter, from the global store - { - let mut global_store_lookup_rx = test_adapter.global_store_lookup_rx; - tokio::spawn(async move { - match global_store_lookup_rx.recv().await.unwrap() { - global::Lookup::LookupAllAccountsData { - network: Network::Primary, - result_tx, - } => result_tx.send(Ok(get_all_accounts_data())).unwrap(), - _ => panic!("Uexpected message received from adapter"), - }; - }); - } + let accounts_data = get_all_accounts_data(); + test_adapter + .adapter + .global_store + ._account_data_primary(accounts_data) + .await; // Send a Get All Products message let mut all_products = test_adapter.adapter.get_all_products().await.unwrap(); @@ -1296,20 +1271,12 @@ mod tests { async fn test_get_product() { // Start the test adapter let test_adapter = setup().await; - - // Return the account data to the adapter, from the global store - { - let mut global_store_lookup_rx = test_adapter.global_store_lookup_rx; - tokio::spawn(async move { - match global_store_lookup_rx.recv().await.unwrap() { - global::Lookup::LookupAllAccountsData { - network: Network::Primary, - result_tx, - } => result_tx.send(Ok(get_all_accounts_data())).unwrap(), - _ => panic!("Uexpected message received from adapter"), - }; - }); - } + let accounts_data = get_all_accounts_data(); + test_adapter + .adapter + .global_store + ._account_data_primary(accounts_data) + .await; // Send a Get Product message let account = "CkMrDWtmFJZcmAUC11qNaWymbXQKvnRx4cq1QudLav7t" diff --git a/src/agent/pythd/adapter/api.rs b/src/agent/pythd/adapter/api.rs index bf221eb7..f01cd2fe 100644 --- a/src/agent/pythd/adapter/api.rs +++ b/src/agent/pythd/adapter/api.rs @@ -1,5 +1,10 @@ use { super::{ + global::{ + AllAccountsData, + AllAccountsMetadata, + GlobalStore, + }, Adapter, NotifyPriceSchedSubscription, NotifyPriceSubscription, @@ -20,14 +25,10 @@ use { }, solana::{ self, + network::Network, oracle::PriceEntry, }, store::{ - global::{ - self, - AllAccountsData, - AllAccountsMetadata, - }, local, PriceIdentifier, }, @@ -46,7 +47,6 @@ use { tokio::sync::{ broadcast, mpsc, - oneshot, }, }; @@ -229,12 +229,8 @@ impl AdapterApi for Adapter { } // Fetches the Solana-specific Oracle data from the global store - async fn lookup_all_accounts_metadata(&self) -> Result { - let (result_tx, result_rx) = oneshot::channel(); - self.global_store_lookup_tx - .send(global::Lookup::LookupAllAccountsMetadata { result_tx }) - .await?; - result_rx.await? + async fn lookup_all_accounts_metadata(&self) -> Result { + GlobalStore::accounts_metadata(self).await } async fn get_all_products(&self) -> Result> { @@ -255,14 +251,7 @@ impl AdapterApi for Adapter { } async fn lookup_all_accounts_data(&self) -> Result { - let (result_tx, result_rx) = oneshot::channel(); - self.global_store_lookup_tx - .send(global::Lookup::LookupAllAccountsData { - network: solana::network::Network::Primary, - result_tx, - }) - .await?; - result_rx.await? + GlobalStore::accounts_data(self, Network::Primary).await } async fn get_product( diff --git a/src/agent/pythd/adapter/global.rs b/src/agent/pythd/adapter/global.rs new file mode 100644 index 00000000..b0b91319 --- /dev/null +++ b/src/agent/pythd/adapter/global.rs @@ -0,0 +1,340 @@ +// The Global Store stores a copy of all the product and price information held in the Pyth +// on-chain aggregation contracts, across both the primary and secondary networks. +// This enables this data to be easily queried by other components. +use { + super::Adapter, + crate::agent::{ + metrics::{ + PriceGlobalMetrics, + ProductGlobalMetrics, + }, + pythd::adapter::AdapterApi, + solana::{ + network::Network, + oracle::{ + self, + PriceEntry, + ProductEntry, + }, + }, + }, + anyhow::{ + anyhow, + Result, + }, + prometheus_client::registry::Registry, + pyth_sdk::Identifier, + slog::Logger, + solana_sdk::pubkey::Pubkey, + std::collections::{ + BTreeMap, + HashMap, + HashSet, + }, + tokio::sync::RwLock, +}; + +/// AllAccountsData contains the full data for the price and product accounts, sourced +/// from the primary network. +#[derive(Debug, Clone, Default)] +pub struct AllAccountsData { + pub product_accounts: HashMap, + pub price_accounts: HashMap, +} + +/// AllAccountsMetadata contains the metadata for all the price and product accounts. +/// +/// Important: this relies on the metadata for all accounts being consistent across both networks. +#[derive(Debug, Clone, Default)] +pub struct AllAccountsMetadata { + pub product_accounts_metadata: HashMap, + pub price_accounts_metadata: HashMap, +} + +/// ProductAccountMetadata contains the metadata for a product account. +#[derive(Debug, Clone, Default)] +pub struct ProductAccountMetadata { + /// Attribute dictionary + pub attr_dict: BTreeMap, + /// Price accounts associated with this product + pub price_accounts: Vec, +} + +impl From for ProductAccountMetadata { + fn from(product_account: oracle::ProductEntry) -> Self { + ProductAccountMetadata { + attr_dict: product_account + .account_data + .iter() + .map(|(key, val)| (key.to_owned(), val.to_owned())) + .collect(), + price_accounts: product_account.price_accounts, + } + } +} + +/// PriceAccountMetadata contains the metadata for a price account. +#[derive(Debug, Clone)] +pub struct PriceAccountMetadata { + /// Exponent + pub expo: i32, +} + +impl From for PriceAccountMetadata { + fn from(price_account: oracle::PriceEntry) -> Self { + PriceAccountMetadata { + expo: price_account.expo, + } + } +} + +#[derive(Debug)] +pub enum Update { + ProductAccountUpdate { + account_key: Pubkey, + account: ProductEntry, + }, + PriceAccountUpdate { + account_key: Pubkey, + account: PriceEntry, + }, +} + +pub struct Store { + /// The actual data on primary network + account_data_primary: RwLock, + + /// The actual data on secondary network + /// This data is not necessarily consistent across both networks, so we need to store it + /// separately. + account_data_secondary: RwLock, + + /// The account metadata for both networks + /// The metadata is consistent across both networks, so we only need to store it once. + account_metadata: RwLock, + + /// Prometheus metrics for products + product_metrics: ProductGlobalMetrics, + + /// Prometheus metrics for prices + price_metrics: PriceGlobalMetrics, + + /// Shared logger configuration. + logger: Logger, +} + +impl Store { + pub fn new(logger: Logger, registry: &mut Registry) -> Self { + Store { + account_data_primary: Default::default(), + account_data_secondary: Default::default(), + account_metadata: Default::default(), + product_metrics: ProductGlobalMetrics::new(registry), + price_metrics: PriceGlobalMetrics::new(registry), + logger, + } + } +} + +#[cfg(test)] +impl Store { + // Allow Setting Fields during Tests. + pub async fn _account_data_primary(&self, data: AllAccountsData) { + *self.account_data_primary.write().await = data; + } + + pub async fn _account_data_secondary(&self, data: AllAccountsData) { + *self.account_data_secondary.write().await = data; + } + + pub async fn _account_metadata(&self, data: AllAccountsMetadata) { + *self.account_metadata.write().await = data; + } +} + +#[async_trait::async_trait] +pub trait GlobalStore { + async fn update(&self, network: Network, update: &Update) -> Result<()>; + async fn accounts_metadata(&self) -> Result; + async fn accounts_data(&self, network: Network) -> Result; + async fn price_accounts( + &self, + network: Network, + price_ids: HashSet, + ) -> Result>; +} + +// Allow downcasting Adapter into GlobalStore for functions that depend on the `GlobalStore` service. +impl<'a> From<&'a Adapter> for &'a Store { + fn from(adapter: &'a Adapter) -> &'a Store { + &adapter.global_store + } +} + +#[async_trait::async_trait] +impl GlobalStore for T +where + for<'a> &'a T: Into<&'a Store>, + T: AdapterApi, + T: Sync, +{ + async fn update(&self, network: Network, update: &Update) -> Result<()> { + update_data(self, network, &update).await?; + update_metadata(self, &update).await?; + Ok(()) + } + + async fn accounts_metadata(&self) -> Result { + Ok(self.into().account_metadata.read().await.clone()) + } + + async fn accounts_data(&self, network: Network) -> Result { + match network { + Network::Primary => Ok(self.into().account_data_primary.read().await.clone()), + Network::Secondary => Ok(self.into().account_data_secondary.read().await.clone()), + } + } + + async fn price_accounts( + &self, + network: Network, + price_ids: HashSet, + ) -> Result> { + let account_data = match network { + Network::Primary => &self.into().account_data_primary, + Network::Secondary => &self.into().account_data_secondary, + } + .read() + .await; + + price_ids + .into_iter() + .map(|id| { + account_data + .price_accounts + .get(&id) + .cloned() + .map(|v| (id, v)) + .ok_or(anyhow!("price id not found")) + }) + .collect() + } +} + +async fn update_data(state: &S, network: Network, update: &Update) -> Result<()> +where + S: AdapterApi, + for<'a> &'a S: Into<&'a Store>, +{ + let store: &Store = state.into(); + + // Choose the right account data to update + let account_data = match network { + Network::Primary => &store.account_data_primary, + Network::Secondary => &store.account_data_secondary, + }; + + match update { + Update::ProductAccountUpdate { + account_key, + account, + } => { + let attr_dict = ProductAccountMetadata::from(account.clone()).attr_dict; + let maybe_symbol = attr_dict.get("symbol").cloned(); + store.product_metrics.update(account_key, maybe_symbol); + + // Update the stored data + account_data + .write() + .await + .product_accounts + .insert(*account_key, account.clone()); + } + Update::PriceAccountUpdate { + account_key, + account, + } => { + // Sanity-check that we are updating with more recent data + if let Some(existing_price) = account_data.read().await.price_accounts.get(account_key) + { + if existing_price.timestamp > account.timestamp { + // This message is not an error. It is common + // for primary and secondary network to have + // slight difference in their timestamps. + debug!(store.logger, "Global store: ignoring stale update of an existing newer price"; + "price_key" => account_key.to_string(), + "existing_timestamp" => existing_price.timestamp, + "new_timestamp" => account.timestamp, + ); + return Ok(()); + } + } + + // Update metrics + store.price_metrics.update(account_key, account); + + // Update the stored data + account_data + .write() + .await + .price_accounts + .insert(*account_key, *account); + + // Notify the Pythd API adapter that this account has changed. + // As the account data might differ between the two networks + // we only notify the adapter of the primary network updates. + if let Network::Primary = network { + AdapterApi::global_store_update( + state, + Identifier::new(account_key.to_bytes()), + account.agg.price, + account.agg.conf, + account.agg.status, + account.valid_slot, + account.agg.pub_slot, + ) + .await + .map_err(|_| anyhow!("failed to notify pythd adapter of account update"))?; + } + } + } + + Ok(()) +} + +async fn update_metadata(state: &S, update: &Update) -> Result<()> +where + S: AdapterApi, + for<'a> &'a S: Into<&'a Store>, +{ + let store: &Store = state.into(); + + match update { + Update::ProductAccountUpdate { + account_key, + account, + } => { + store + .account_metadata + .write() + .await + .product_accounts_metadata + .insert(*account_key, account.clone().into()); + + Ok(()) + } + Update::PriceAccountUpdate { + account_key, + account, + } => { + store + .account_metadata + .write() + .await + .price_accounts_metadata + .insert(*account_key, (*account).into()); + + Ok(()) + } + } +} diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 2b5c65ba..1f810c62 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -5,13 +5,8 @@ pub mod oracle; /// - The Oracle, which reads data from the network /// - The Exporter, which publishes data to the network pub mod network { - use { super::{ - super::store::{ - self, - global, - }, exporter, key_store::{ self, @@ -19,14 +14,21 @@ pub mod network { }, oracle, }, - crate::agent::remote_keypair_loader::KeypairRequest, + crate::agent::{ + pythd::adapter::Adapter, + remote_keypair_loader::KeypairRequest, + store, + }, anyhow::Result, serde::{ Deserialize, Serialize, }, slog::Logger, - std::time::Duration, + std::{ + sync::Arc, + time::Duration, + }, tokio::{ sync::{ mpsc::Sender, @@ -80,10 +82,9 @@ pub mod network { config: Config, network: Network, local_store_tx: Sender, - global_store_lookup_tx: Sender, - global_store_update_tx: Sender, keypair_request_tx: Sender, logger: Logger, + adapter: Arc, ) -> Result>> { // Publisher permissions updates between oracle and exporter let (publisher_permissions_tx, publisher_permissions_rx) = watch::channel(<_>::default()); @@ -91,13 +92,14 @@ pub mod network { // Spawn the Oracle let mut jhs = oracle::spawn_oracle( config.oracle.clone(), + network, &config.rpc_url, &config.wss_url, config.rpc_timeout, - global_store_update_tx.clone(), publisher_permissions_tx, KeyStore::new(config.key_store.clone(), &logger)?, logger.clone(), + adapter.clone(), ); // Spawn the Exporter @@ -109,10 +111,11 @@ pub mod network { publisher_permissions_rx, KeyStore::new(config.key_store.clone(), &logger)?, local_store_tx, - global_store_lookup_tx, keypair_request_tx, logger, + adapter, )?; + jhs.extend(exporter_jhs); Ok(jhs) diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index f0efd46e..ec0d46d1 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -10,9 +10,15 @@ use { network::Network, oracle::PricePublishingMetadata, }, - crate::agent::remote_keypair_loader::{ - KeypairRequest, - RemoteKeypairLoader, + crate::agent::{ + pythd::adapter::{ + global::GlobalStore, + Adapter, + }, + remote_keypair_loader::{ + KeypairRequest, + RemoteKeypairLoader, + }, }, anyhow::{ anyhow, @@ -176,9 +182,9 @@ pub fn spawn_exporter( >, key_store: KeyStore, local_store_tx: Sender, - global_store_tx: Sender, keypair_request_tx: mpsc::Sender, logger: Logger, + adapter: Arc, ) -> Result>> { // Create and spawn the network state querier let (network_state_tx, network_state_rx) = watch::channel(Default::default()); @@ -211,12 +217,12 @@ pub fn spawn_exporter( rpc_timeout, key_store, local_store_tx, - global_store_tx, network_state_rx, transactions_tx, publisher_permissions_rx, keypair_request_tx, logger, + adapter, ); let exporter_jh = tokio::spawn(async move { exporter.run().await }); @@ -246,9 +252,6 @@ pub struct Exporter { /// Channel on which to communicate with the local store local_store_tx: Sender, - /// Channel on which to communicate with the global store - global_store_tx: Sender, - /// The last state published for each price identifier. Used to /// rule out stale data and prevent repetitive publishing of /// unchanged prices. @@ -276,6 +279,8 @@ pub struct Exporter { keypair_request_tx: Sender, logger: Logger, + + adapter: Arc, } impl Exporter { @@ -286,7 +291,6 @@ impl Exporter { rpc_timeout: Duration, key_store: KeyStore, local_store_tx: Sender, - global_store_tx: Sender, network_state_rx: watch::Receiver, inflight_transactions_tx: Sender, publisher_permissions_rx: watch::Receiver< @@ -294,6 +298,7 @@ impl Exporter { >, keypair_request_tx: mpsc::Sender, logger: Logger, + adapter: Arc, ) -> Self { let publish_interval = time::interval(config.publish_interval_duration); Exporter { @@ -306,7 +311,6 @@ impl Exporter { publish_interval, key_store, local_store_tx, - global_store_tx, last_published_state: HashMap::new(), network_state_rx, inflight_transactions_tx, @@ -318,6 +322,7 @@ impl Exporter { recent_compute_unit_price_micro_lamports: None, keypair_request_tx, logger, + adapter, } } @@ -720,16 +725,12 @@ impl Exporter { // Use exponentially higher price if this publisher hasn't published in a while for the accounts // in this batch. This will use the maximum total compute unit fee if the publisher // hasn't updated for >= MAXIMUM_SLOT_GAP_FOR_DYNAMIC_COMPUTE_UNIT_PRICE slots. - let (result_tx, result_rx) = oneshot::channel(); - self.global_store_tx - .send(store::global::Lookup::LookupPriceAccounts { - network: self.network, - price_ids: price_accounts.clone().into_iter().collect(), - result_tx, - }) - .await?; - - let result = result_rx.await??; + let result = GlobalStore::price_accounts( + &*self.adapter, + self.network, + price_accounts.clone().into_iter().collect(), + ) + .await?; // Calculate the maximum slot difference between the publisher latest slot and // current slot amongst all the accounts. Here, the aggregate slot is diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index 9bcda936..d2f9fb56 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -2,11 +2,20 @@ // on-chain Oracle program accounts from Solana. use { self::subscriber::Subscriber, - super::key_store::KeyStore, + super::{ + key_store::KeyStore, + network::Network, + }, crate::agent::{ legacy_schedule::LegacySchedule, market_schedule::MarketSchedule, - store::global, + pythd::adapter::{ + global::{ + GlobalStore, + Update, + }, + Adapter, + }, }, anyhow::{ anyhow, @@ -40,6 +49,7 @@ use { HashMap, HashSet, }, + sync::Arc, time::Duration, }, tokio::{ @@ -166,10 +176,11 @@ pub struct Oracle { /// Channel on which account updates are received from the Subscriber updates_rx: mpsc::Receiver<(Pubkey, solana_sdk::account::Account)>, - /// Channel on which updates are sent to the global store - global_store_tx: mpsc::Sender, + network: Network, logger: Logger, + + adapter: Arc, } #[derive(Clone, Serialize, Deserialize, Debug)] @@ -210,15 +221,16 @@ impl Default for Config { pub fn spawn_oracle( config: Config, + network: Network, rpc_url: &str, wss_url: &str, rpc_timeout: Duration, - global_store_update_tx: mpsc::Sender, publisher_permissions_tx: watch::Sender< HashMap>, >, key_store: KeyStore, logger: Logger, + adapter: Arc, ) -> Vec> { let mut jhs = vec![]; @@ -251,7 +263,7 @@ pub fn spawn_oracle( jhs.push(tokio::spawn(async move { poller.run().await })); // Create and spawn the Oracle - let mut oracle = Oracle::new(data_rx, updates_rx, global_store_update_tx, logger); + let mut oracle = Oracle::new(data_rx, updates_rx, network, logger, adapter); jhs.push(tokio::spawn(async move { oracle.run().await })); jhs @@ -261,15 +273,17 @@ impl Oracle { pub fn new( data_rx: mpsc::Receiver, updates_rx: mpsc::Receiver<(Pubkey, solana_sdk::account::Account)>, - global_store_tx: mpsc::Sender, + network: Network, logger: Logger, + adapter: Arc, ) -> Self { Oracle { data: Default::default(), data_rx, updates_rx, - global_store_tx, + network, logger, + adapter, } } @@ -402,13 +416,16 @@ impl Oracle { account_key: &Pubkey, account: &ProductEntry, ) -> Result<()> { - self.global_store_tx - .send(global::Update::ProductAccountUpdate { + GlobalStore::update( + &*self.adapter, + self.network, + &Update::ProductAccountUpdate { account_key: *account_key, account: account.clone(), - }) - .await - .map_err(|_| anyhow!("failed to notify product account update")) + }, + ) + .await + .map_err(|_| anyhow!("failed to notify product account update")) } async fn notify_price_account_update( @@ -416,13 +433,16 @@ impl Oracle { account_key: &Pubkey, account: &PriceEntry, ) -> Result<()> { - self.global_store_tx - .send(global::Update::PriceAccountUpdate { + GlobalStore::update( + &*self.adapter, + self.network, + &Update::PriceAccountUpdate { account_key: *account_key, account: account.clone(), - }) - .await - .map_err(|_| anyhow!("failed to notify price account update")) + }, + ) + .await + .map_err(|_| anyhow!("failed to notify price account update")) } } @@ -495,7 +515,6 @@ impl Poller { async fn poll_and_send(&mut self) -> Result<()> { let fresh_data = self.poll().await?; - self.publisher_permissions_tx .send_replace(fresh_data.publisher_permissions.clone()); diff --git a/src/agent/store.rs b/src/agent/store.rs index 99f3fa49..bbbcfec7 100644 --- a/src/agent/store.rs +++ b/src/agent/store.rs @@ -1,4 +1,3 @@ -pub mod global; pub mod local; pub type PriceIdentifier = pyth_sdk::Identifier; diff --git a/src/agent/store/global.rs b/src/agent/store/global.rs deleted file mode 100644 index cb534a2c..00000000 --- a/src/agent/store/global.rs +++ /dev/null @@ -1,376 +0,0 @@ -// The Global Store stores a copy of all the product and price information held in the Pyth -// on-chain aggregation contracts, across both the primary and secondary networks. -// This enables this data to be easily queried by other components. -use { - super::super::solana::oracle::{ - self, - PriceEntry, - ProductEntry, - }, - crate::agent::{ - metrics::{ - PriceGlobalMetrics, - ProductGlobalMetrics, - PROMETHEUS_REGISTRY, - }, - pythd::adapter::AdapterApi, - solana::network::Network, - }, - anyhow::{ - anyhow, - Result, - }, - pyth_sdk::Identifier, - slog::Logger, - solana_sdk::pubkey::Pubkey, - std::{ - collections::{ - BTreeMap, - HashMap, - HashSet, - }, - sync::Arc, - }, - tokio::{ - sync::{ - mpsc, - oneshot, - }, - task::JoinHandle, - }, -}; - -/// AllAccountsData contains the full data for the price and product accounts, sourced -/// from the primary network. -#[derive(Debug, Clone, Default)] -pub struct AllAccountsData { - pub product_accounts: HashMap, - pub price_accounts: HashMap, -} - -/// AllAccountsMetadata contains the metadata for all the price and product accounts. -/// -/// Important: this relies on the metadata for all accounts being consistent across both networks. -#[derive(Debug, Clone, Default)] -pub struct AllAccountsMetadata { - pub product_accounts_metadata: HashMap, - pub price_accounts_metadata: HashMap, -} - -/// ProductAccountMetadata contains the metadata for a product account. -#[derive(Debug, Clone, Default)] -pub struct ProductAccountMetadata { - /// Attribute dictionary - pub attr_dict: BTreeMap, - /// Price accounts associated with this product - pub price_accounts: Vec, -} - -impl From for ProductAccountMetadata { - fn from(product_account: oracle::ProductEntry) -> Self { - ProductAccountMetadata { - attr_dict: product_account - .account_data - .iter() - .map(|(key, val)| (key.to_owned(), val.to_owned())) - .collect(), - price_accounts: product_account.price_accounts, - } - } -} - -/// PriceAccountMetadata contains the metadata for a price account. -#[derive(Debug, Clone)] -pub struct PriceAccountMetadata { - /// Exponent - pub expo: i32, -} - -impl From for PriceAccountMetadata { - fn from(price_account: oracle::PriceEntry) -> Self { - PriceAccountMetadata { - expo: price_account.expo, - } - } -} - -#[derive(Debug)] -pub enum Update { - ProductAccountUpdate { - account_key: Pubkey, - account: ProductEntry, - }, - PriceAccountUpdate { - account_key: Pubkey, - account: PriceEntry, - }, -} - -#[derive(Debug)] -pub enum Lookup { - LookupAllAccountsMetadata { - result_tx: oneshot::Sender>, - }, - LookupAllAccountsData { - network: Network, - result_tx: oneshot::Sender>, - }, - LookupPriceAccounts { - network: Network, - price_ids: HashSet, - result_tx: oneshot::Sender>>, - }, -} - -pub struct Store { - /// The actual data on primary network - account_data_primary: AllAccountsData, - - /// The actual data on secondary network - /// This data is not necessarily consistent across both networks, so we need to store it - /// separately. - account_data_secondary: AllAccountsData, - - /// The account metadata for both networks - /// The metadata is consistent across both networks, so we only need to store it once. - account_metadata: AllAccountsMetadata, - - /// Prometheus metrics for products - product_metrics: ProductGlobalMetrics, - - /// Prometheus metrics for prices - price_metrics: PriceGlobalMetrics, - - /// Channel on which lookup requests are received - lookup_rx: mpsc::Receiver, - - /// Channel on which account updates are received from the primary network - primary_updates_rx: mpsc::Receiver, - - /// Channel on which account updates are received from the secondary network - secondary_updates_rx: mpsc::Receiver, - - /// Reference the Pythd Adapter. - pythd_adapter: Arc, - - logger: Logger, -} - -pub fn spawn_store( - lookup_rx: mpsc::Receiver, - primary_updates_rx: mpsc::Receiver, - secondary_updates_rx: mpsc::Receiver, - pythd_adapter: Arc, - logger: Logger, -) -> JoinHandle<()> -where - S: AdapterApi, - S: Send, - S: Sync, - S: 'static, -{ - tokio::spawn(async move { - Store::new( - lookup_rx, - primary_updates_rx, - secondary_updates_rx, - pythd_adapter, - logger, - ) - .await - .run() - .await - }) -} - -impl Store -where - S: AdapterApi, - S: Send, - S: Sync, -{ - pub async fn new( - lookup_rx: mpsc::Receiver, - primary_updates_rx: mpsc::Receiver, - secondary_updates_rx: mpsc::Receiver, - pythd_adapter: Arc, - logger: Logger, - ) -> Self { - let prom_registry_ref = &mut &mut PROMETHEUS_REGISTRY.lock().await; - - Store { - account_data_primary: Default::default(), - account_data_secondary: Default::default(), - account_metadata: Default::default(), - product_metrics: ProductGlobalMetrics::new(prom_registry_ref), - price_metrics: PriceGlobalMetrics::new(prom_registry_ref), - lookup_rx, - primary_updates_rx, - secondary_updates_rx, - pythd_adapter, - logger, - } - } - - pub async fn run(&mut self) { - loop { - if let Err(err) = self.handle_next().await { - error!(self.logger, "{}", err); - debug!(self.logger, "error context"; "context" => format!("{:?}", err)); - } - } - } - - async fn handle_next(&mut self) -> Result<()> { - tokio::select! { - Some(update) = self.primary_updates_rx.recv() => { - self.update_data(Network::Primary, &update).await?; - self.update_metadata(&update)?; - } - Some(update) = self.secondary_updates_rx.recv() => { - self.update_data(Network::Secondary, &update).await?; - self.update_metadata(&update)?; - } - Some(lookup) = self.lookup_rx.recv() => { - self.handle_lookup(lookup).await? - } - }; - - Ok(()) - } - - async fn update_data(&mut self, network: Network, update: &Update) -> Result<()> { - // Choose the right account data to update - let account_data = match network { - Network::Primary => &mut self.account_data_primary, - Network::Secondary => &mut self.account_data_secondary, - }; - - match update { - Update::ProductAccountUpdate { - account_key, - account, - } => { - let attr_dict = ProductAccountMetadata::from(account.clone()).attr_dict; - - let maybe_symbol = attr_dict.get("symbol").cloned(); - - self.product_metrics.update(account_key, maybe_symbol); - - // Update the stored data - account_data - .product_accounts - .insert(*account_key, account.clone()); - } - Update::PriceAccountUpdate { - account_key, - account, - } => { - // Sanity-check that we are updating with more recent data - if let Some(existing_price) = account_data.price_accounts.get(account_key) { - if existing_price.timestamp > account.timestamp { - // This message is not an error. It is common - // for primary and secondary network to have - // slight difference in their timestamps. - debug!(self.logger, "Global store: ignoring stale update of an existing newer price"; - "price_key" => account_key.to_string(), - "existing_timestamp" => existing_price.timestamp, - "new_timestamp" => account.timestamp, - ); - return Ok(()); - } - } - - // Update metrics - self.price_metrics.update(account_key, account); - - // Update the stored data - account_data.price_accounts.insert(*account_key, *account); - - // Notify the Pythd API adapter that this account has changed. - // As the account data might differ between the two networks - // we only notify the adapter of the primary network updates. - if let Network::Primary = network { - self.pythd_adapter - .global_store_update( - Identifier::new(account_key.to_bytes()), - account.agg.price, - account.agg.conf, - account.agg.status, - account.valid_slot, - account.agg.pub_slot, - ) - .await - .map_err(|_| anyhow!("failed to notify pythd adapter of account update"))?; - } - } - } - - Ok(()) - } - - fn update_metadata(&mut self, update: &Update) -> Result<()> { - match update { - Update::ProductAccountUpdate { - account_key, - account, - } => { - self.account_metadata - .product_accounts_metadata - .insert(*account_key, account.clone().into()); - - Ok(()) - } - Update::PriceAccountUpdate { - account_key, - account, - } => { - self.account_metadata - .price_accounts_metadata - .insert(*account_key, (*account).into()); - - Ok(()) - } - } - } - - async fn handle_lookup(&self, lookup: Lookup) -> Result<()> { - match lookup { - Lookup::LookupAllAccountsMetadata { result_tx } => result_tx - .send(Ok(self.account_metadata.clone())) - .map_err(|_| anyhow!("failed to send metadata to pythd adapter")), - Lookup::LookupAllAccountsData { network, result_tx } => result_tx - .send(Ok(match network { - Network::Primary => self.account_data_primary.clone(), - Network::Secondary => self.account_data_secondary.clone(), - })) - .map_err(|_| anyhow!("failed to send data to pythd adapter")), - Lookup::LookupPriceAccounts { - network, - price_ids, - result_tx, - } => { - let account_data = match network { - Network::Primary => &self.account_data_primary, - Network::Secondary => &self.account_data_secondary, - }; - - result_tx - .send( - price_ids - .into_iter() - .map(|id| { - account_data - .price_accounts - .get(&id) - .cloned() - .map(|v| (id, v)) - .ok_or(anyhow!("price id not found")) - }) - .collect::>>(), - ) - .map_err(|_| anyhow!("failed to send price accounts data")) - } - } - } -}