diff --git a/src/agent.rs b/src/agent.rs index c3b44c6c..f6bd263f 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -69,15 +69,14 @@ pub mod metrics; pub mod pythd; pub mod remote_keypair_loader; pub mod solana; +pub mod state; pub mod store; use { self::{ config::Config, - pythd::{ - adapter::notifier, - api::rpc, - }, + pythd::api::rpc, solana::network, + state::notifier, }, anyhow::Result, futures_util::future::join_all, @@ -122,9 +121,8 @@ impl Agent { let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10); // Create the Pythd Adapter. - let adapter = Arc::new( - pythd::adapter::Adapter::new(self.config.pythd_adapter.clone(), logger.clone()).await, - ); + let adapter = + Arc::new(state::State::new(self.config.pythd_adapter.clone(), logger.clone()).await); // Spawn the primary network jhs.extend(network::spawn_network( @@ -197,6 +195,7 @@ pub mod config { pythd, remote_keypair_loader, solana::network, + state, }, anyhow::Result, config as config_rs, @@ -216,7 +215,7 @@ pub mod config { pub primary_network: network::Config, pub secondary_network: Option, #[serde(default)] - pub pythd_adapter: pythd::adapter::Config, + pub pythd_adapter: state::Config, #[serde(default)] pub pythd_api_server: pythd::api::rpc::Config, #[serde(default)] diff --git a/src/agent/dashboard.rs b/src/agent/dashboard.rs index d7af08cc..b6ca7c9d 100644 --- a/src/agent/dashboard.rs +++ b/src/agent/dashboard.rs @@ -1,20 +1,20 @@ use { super::{ - pythd::adapter::{ + solana::{ + network::Network, + oracle::PriceEntry, + }, + state::{ global::GlobalStore, local::{ LocalStore, PriceInfo, }, }, - solana::{ - network::Network, - oracle::PriceEntry, - }, }, crate::agent::{ metrics::MetricsServer, - pythd::adapter::global::{ + state::global::{ AllAccountsData, AllAccountsMetadata, PriceAccountMetadata, diff --git a/src/agent/metrics.rs b/src/agent/metrics.rs index 921796c5..0beb932c 100644 --- a/src/agent/metrics.rs +++ b/src/agent/metrics.rs @@ -1,7 +1,7 @@ use { - super::pythd::adapter::{ + super::state::{ local::PriceInfo, - Adapter, + State, }, crate::agent::{ solana::oracle::PriceEntry, @@ -69,16 +69,12 @@ lazy_static! { pub struct MetricsServer { pub start_time: Instant, pub logger: Logger, - pub adapter: Arc, + pub adapter: Arc, } impl MetricsServer { /// Instantiate a metrics API with a dashboard - pub async fn spawn( - addr: impl Into + 'static, - logger: Logger, - adapter: Arc, - ) { + pub async fn spawn(addr: impl Into + 'static, logger: Logger, adapter: Arc) { let server = MetricsServer { start_time: Instant::now(), logger, diff --git a/src/agent/pythd.rs b/src/agent/pythd.rs index f7697d34..e5fdf85e 100644 --- a/src/agent/pythd.rs +++ b/src/agent/pythd.rs @@ -1,2 +1 @@ -pub mod adapter; pub mod api; diff --git a/src/agent/pythd/api/rpc.rs b/src/agent/pythd/api/rpc.rs index 4316ca6f..9b3b38ac 100644 --- a/src/agent/pythd/api/rpc.rs +++ b/src/agent/pythd/api/rpc.rs @@ -6,7 +6,6 @@ use { super::{ - super::adapter, Conf, NotifyPrice, NotifyPriceSched, @@ -14,6 +13,7 @@ use { Pubkey, SubscriptionID, }, + crate::agent::state, anyhow::{ anyhow, Result, @@ -120,7 +120,7 @@ async fn handle_connection( notify_price_sched_tx_buffer: usize, logger: Logger, ) where - S: adapter::AdapterApi, + S: state::StateApi, S: Send, S: Sync, S: 'static, @@ -168,7 +168,7 @@ async fn handle_next( notify_price_sched_rx: &mut mpsc::Receiver, ) -> Result<()> where - S: adapter::AdapterApi, + S: state::StateApi, { tokio::select! { msg = ws_rx.next() => { @@ -210,7 +210,7 @@ async fn handle( msg: Message, ) -> Result<()> where - S: adapter::AdapterApi, + S: state::StateApi, { // Ignore control and binary messages if !msg.is_text() { @@ -296,7 +296,7 @@ async fn dispatch_and_catch_error( request: &Request, ) -> Response where - S: adapter::AdapterApi, + S: state::StateApi, { debug!( logger, @@ -436,7 +436,7 @@ pub async fn run( adapter: Arc, shutdown_rx: broadcast::Receiver<()>, ) where - S: adapter::AdapterApi, + S: state::StateApi, S: Send, S: Sync, S: 'static, @@ -454,7 +454,7 @@ async fn serve( mut shutdown_rx: broadcast::Receiver<()>, ) -> Result<()> where - S: adapter::AdapterApi, + S: state::StateApi, S: Send, S: Sync, S: 'static, diff --git a/src/agent/pythd/api/rpc/get_all_products.rs b/src/agent/pythd/api/rpc/get_all_products.rs index 9806a00b..be7b39bf 100644 --- a/src/agent/pythd/api/rpc/get_all_products.rs +++ b/src/agent/pythd/api/rpc/get_all_products.rs @@ -1,11 +1,11 @@ use { - crate::agent::pythd::adapter, + crate::agent::state, anyhow::Result, }; pub async fn get_all_products(adapter: &S) -> Result where - S: adapter::AdapterApi, + S: state::StateApi, { let products = adapter.get_all_products().await?; Ok(serde_json::to_value(products)?) diff --git a/src/agent/pythd/api/rpc/get_product.rs b/src/agent/pythd/api/rpc/get_product.rs index 2a6d8c0a..8ff49dc8 100644 --- a/src/agent/pythd/api/rpc/get_product.rs +++ b/src/agent/pythd/api/rpc/get_product.rs @@ -3,7 +3,7 @@ use { GetProductParams, Method, }, - crate::agent::pythd::adapter, + crate::agent::state, anyhow::{ anyhow, Result, @@ -19,7 +19,7 @@ pub async fn get_product( request: &Request, ) -> Result where - S: adapter::AdapterApi, + S: state::StateApi, { let params: GetProductParams = { let value = request.params.clone(); diff --git a/src/agent/pythd/api/rpc/get_product_list.rs b/src/agent/pythd/api/rpc/get_product_list.rs index ff3e4982..833b2688 100644 --- a/src/agent/pythd/api/rpc/get_product_list.rs +++ b/src/agent/pythd/api/rpc/get_product_list.rs @@ -1,11 +1,11 @@ use { - crate::agent::pythd::adapter, + crate::agent::state, anyhow::Result, }; pub async fn get_product_list(adapter: &S) -> Result where - S: adapter::AdapterApi, + S: state::StateApi, { let product_list = adapter.get_product_list().await?; Ok(serde_json::to_value(product_list)?) diff --git a/src/agent/pythd/api/rpc/subscribe_price.rs b/src/agent/pythd/api/rpc/subscribe_price.rs index bed8e22e..f2319b1c 100644 --- a/src/agent/pythd/api/rpc/subscribe_price.rs +++ b/src/agent/pythd/api/rpc/subscribe_price.rs @@ -5,7 +5,7 @@ use { SubscribePriceParams, SubscribeResult, }, - crate::agent::pythd::adapter, + crate::agent::state, anyhow::{ anyhow, Result, @@ -23,7 +23,7 @@ pub async fn subscribe_price( request: &Request, ) -> Result where - S: adapter::AdapterApi, + S: state::StateApi, { let params: SubscribePriceParams = serde_json::from_value( request diff --git a/src/agent/pythd/api/rpc/subscribe_price_sched.rs b/src/agent/pythd/api/rpc/subscribe_price_sched.rs index 619f7c14..c11ffa8d 100644 --- a/src/agent/pythd/api/rpc/subscribe_price_sched.rs +++ b/src/agent/pythd/api/rpc/subscribe_price_sched.rs @@ -5,7 +5,7 @@ use { SubscribePriceSchedParams, SubscribeResult, }, - crate::agent::pythd::adapter, + crate::agent::state, anyhow::{ anyhow, Result, @@ -23,7 +23,7 @@ pub async fn subscribe_price_sched( request: &Request, ) -> Result where - S: adapter::AdapterApi, + S: state::StateApi, { let params: SubscribePriceSchedParams = serde_json::from_value( request diff --git a/src/agent/pythd/api/rpc/update_price.rs b/src/agent/pythd/api/rpc/update_price.rs index 8fd122f2..0eb532ea 100644 --- a/src/agent/pythd/api/rpc/update_price.rs +++ b/src/agent/pythd/api/rpc/update_price.rs @@ -3,7 +3,7 @@ use { Method, UpdatePriceParams, }, - crate::agent::pythd::adapter, + crate::agent::state, anyhow::{ anyhow, Result, @@ -19,7 +19,7 @@ pub async fn update_price( request: &Request, ) -> Result where - S: adapter::AdapterApi, + S: state::StateApi, { let params: UpdatePriceParams = serde_json::from_value( request diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 5a822af2..6a58fdc4 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -15,9 +15,8 @@ pub mod network { oracle, }, crate::agent::{ - pythd::adapter::Adapter, remote_keypair_loader::KeypairRequest, - store, + state::State, }, anyhow::Result, serde::{ @@ -83,7 +82,7 @@ pub mod network { network: Network, keypair_request_tx: Sender, logger: Logger, - adapter: Arc, + adapter: Arc, ) -> Result>> { // Publisher permissions updates between oracle and exporter let (publisher_permissions_tx, publisher_permissions_rx) = watch::channel(<_>::default()); diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index 28e94750..6818a4e6 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -1,26 +1,23 @@ use { self::transaction_monitor::TransactionMonitor, super::{ - super::store::{ - self, - PriceIdentifier, - }, + super::store::PriceIdentifier, key_store, network::Network, oracle::PricePublishingMetadata, }, crate::agent::{ - pythd::adapter::{ + remote_keypair_loader::{ + KeypairRequest, + RemoteKeypairLoader, + }, + state::{ global::GlobalStore, local::{ LocalStore, PriceInfo, }, - Adapter, - }, - remote_keypair_loader::{ - KeypairRequest, - RemoteKeypairLoader, + State, }, }, anyhow::{ @@ -77,7 +74,6 @@ use { self, Sender, }, - oneshot, watch, }, task::JoinHandle, @@ -186,7 +182,7 @@ pub fn spawn_exporter( key_store: KeyStore, keypair_request_tx: mpsc::Sender, logger: Logger, - adapter: Arc, + adapter: Arc, ) -> Result>> { // Create and spawn the network state querier let (network_state_tx, network_state_rx) = watch::channel(Default::default()); @@ -278,7 +274,7 @@ pub struct Exporter { logger: Logger, - adapter: Arc, + adapter: Arc, } impl Exporter { @@ -295,7 +291,7 @@ impl Exporter { >, keypair_request_tx: mpsc::Sender, logger: Logger, - adapter: Arc, + adapter: Arc, ) -> Self { let publish_interval = time::interval(config.publish_interval_duration); Exporter { diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index d2f9fb56..81408079 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -9,12 +9,12 @@ use { crate::agent::{ legacy_schedule::LegacySchedule, market_schedule::MarketSchedule, - pythd::adapter::{ + state::{ global::{ GlobalStore, Update, }, - Adapter, + State, }, }, anyhow::{ @@ -180,7 +180,7 @@ pub struct Oracle { logger: Logger, - adapter: Arc, + adapter: Arc, } #[derive(Clone, Serialize, Deserialize, Debug)] @@ -230,7 +230,7 @@ pub fn spawn_oracle( >, key_store: KeyStore, logger: Logger, - adapter: Arc, + adapter: Arc, ) -> Vec> { let mut jhs = vec![]; @@ -275,7 +275,7 @@ impl Oracle { updates_rx: mpsc::Receiver<(Pubkey, solana_sdk::account::Account)>, network: Network, logger: Logger, - adapter: Arc, + adapter: Arc, ) -> Self { Oracle { data: Default::default(), diff --git a/src/agent/pythd/adapter.rs b/src/agent/state.rs similarity index 98% rename from src/agent/pythd/adapter.rs rename to src/agent/state.rs index 4a63a510..50c0f2eb 100644 --- a/src/agent/pythd/adapter.rs +++ b/src/agent/state.rs @@ -1,11 +1,11 @@ use { super::{ - super::store::PriceIdentifier, - api::{ + pythd::api::{ NotifyPrice, NotifyPriceSched, SubscriptionID, }, + store::PriceIdentifier, }, crate::agent::metrics::PROMETHEUS_REGISTRY, serde::{ @@ -29,7 +29,7 @@ pub mod global; pub mod local; pub use api::{ notifier, - AdapterApi, + StateApi, }; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -52,7 +52,7 @@ impl Default for Config { /// Adapter is the adapter between the pythd websocket API, and the stores. /// It is responsible for implementing the business logic for responding to /// the pythd websocket API calls. -pub struct Adapter { +pub struct State { /// Subscription ID sequencer. subscription_id_seq: AtomicI64, @@ -92,10 +92,10 @@ struct NotifyPriceSubscription { notify_price_tx: mpsc::Sender, } -impl Adapter { +impl State { pub async fn new(config: Config, logger: Logger) -> Self { let registry = &mut *PROMETHEUS_REGISTRY.lock().await; - Adapter { + State { global_store: global::Store::new(logger.clone(), registry), local_store: local::Store::new(logger.clone(), registry), subscription_id_seq: 1.into(), @@ -116,25 +116,23 @@ mod tests { AllAccountsData, }, notifier, - Adapter, - AdapterApi, Config, + State, + StateApi, }, crate::agent::{ - pythd::{ - adapter::local::LocalStore, - api::{ - self, - NotifyPrice, - NotifyPriceSched, - PriceAccountMetadata, - PriceUpdate, - ProductAccount, - ProductAccountMetadata, - PublisherAccount, - }, + pythd::api::{ + self, + NotifyPrice, + NotifyPriceSched, + PriceAccountMetadata, + PriceUpdate, + ProductAccount, + ProductAccountMetadata, + PublisherAccount, }, solana, + state::local::LocalStore, }, iobuffer::IoBuffer, pyth_sdk::Identifier, @@ -166,7 +164,7 @@ mod tests { }; struct TestAdapter { - adapter: Arc, + adapter: Arc, shutdown_tx: broadcast::Sender<()>, jh: JoinHandle<()>, } @@ -178,7 +176,7 @@ mod tests { let config = Config { notify_price_sched_interval_duration, }; - let adapter = Arc::new(Adapter::new(config, logger).await); + let adapter = Arc::new(State::new(config, logger).await); let (shutdown_tx, _) = broadcast::channel(1); // Spawn Price Notifier diff --git a/src/agent/pythd/adapter/api.rs b/src/agent/state/api.rs similarity index 98% rename from src/agent/pythd/adapter/api.rs rename to src/agent/state/api.rs index a8a061b4..a117b7ab 100644 --- a/src/agent/pythd/adapter/api.rs +++ b/src/agent/state/api.rs @@ -9,9 +9,9 @@ use { self, LocalStore, }, - Adapter, NotifyPriceSchedSubscription, NotifyPriceSubscription, + State, }, crate::agent::{ pythd::api::{ @@ -130,7 +130,7 @@ fn solana_price_account_to_pythd_api_price_account( } #[async_trait::async_trait] -pub trait AdapterApi { +pub trait StateApi { async fn get_product_list(&self) -> Result>; async fn lookup_all_accounts_metadata(&self) -> Result; async fn get_all_products(&self) -> Result>; @@ -172,7 +172,7 @@ pub trait AdapterApi { ) -> Result<()>; } -pub async fn notifier(adapter: Arc, mut shutdown_rx: broadcast::Receiver<()>) { +pub async fn notifier(adapter: Arc, mut shutdown_rx: broadcast::Receiver<()>) { let mut interval = tokio::time::interval(adapter.notify_price_sched_interval_duration); loop { adapter.drop_closed_subscriptions().await; @@ -192,7 +192,7 @@ pub async fn notifier(adapter: Arc, mut shutdown_rx: broadcast::Receive } #[async_trait::async_trait] -impl AdapterApi for Adapter { +impl StateApi for State { async fn get_product_list(&self) -> Result> { let all_accounts_metadata = self.lookup_all_accounts_metadata().await?; @@ -362,7 +362,7 @@ impl AdapterApi for Adapter { self, pyth_sdk::Identifier::new(account.to_bytes()), local::PriceInfo { - status: Adapter::map_status(&status)?, + status: State::map_status(&status)?, price, conf, timestamp: Utc::now().naive_utc(), diff --git a/src/agent/pythd/adapter/global.rs b/src/agent/state/global.rs similarity index 97% rename from src/agent/pythd/adapter/global.rs rename to src/agent/state/global.rs index b0b91319..97c444d8 100644 --- a/src/agent/pythd/adapter/global.rs +++ b/src/agent/state/global.rs @@ -2,13 +2,12 @@ // on-chain aggregation contracts, across both the primary and secondary networks. // This enables this data to be easily queried by other components. use { - super::Adapter, + super::State, crate::agent::{ metrics::{ PriceGlobalMetrics, ProductGlobalMetrics, }, - pythd::adapter::AdapterApi, solana::{ network::Network, oracle::{ @@ -17,6 +16,7 @@ use { ProductEntry, }, }, + state::StateApi, }, anyhow::{ anyhow, @@ -165,8 +165,8 @@ pub trait GlobalStore { } // Allow downcasting Adapter into GlobalStore for functions that depend on the `GlobalStore` service. -impl<'a> From<&'a Adapter> for &'a Store { - fn from(adapter: &'a Adapter) -> &'a Store { +impl<'a> From<&'a State> for &'a Store { + fn from(adapter: &'a State) -> &'a Store { &adapter.global_store } } @@ -175,7 +175,7 @@ impl<'a> From<&'a Adapter> for &'a Store { impl GlobalStore for T where for<'a> &'a T: Into<&'a Store>, - T: AdapterApi, + T: StateApi, T: Sync, { async fn update(&self, network: Network, update: &Update) -> Result<()> { @@ -223,7 +223,7 @@ where async fn update_data(state: &S, network: Network, update: &Update) -> Result<()> where - S: AdapterApi, + S: StateApi, for<'a> &'a S: Into<&'a Store>, { let store: &Store = state.into(); @@ -284,7 +284,7 @@ where // As the account data might differ between the two networks // we only notify the adapter of the primary network updates. if let Network::Primary = network { - AdapterApi::global_store_update( + StateApi::global_store_update( state, Identifier::new(account_key.to_bytes()), account.agg.price, @@ -304,7 +304,7 @@ where async fn update_metadata(state: &S, update: &Update) -> Result<()> where - S: AdapterApi, + S: StateApi, for<'a> &'a S: Into<&'a Store>, { let store: &Store = state.into(); diff --git a/src/agent/pythd/adapter/local.rs b/src/agent/state/local.rs similarity index 95% rename from src/agent/pythd/adapter/local.rs rename to src/agent/state/local.rs index 4679060a..8d05654e 100644 --- a/src/agent/pythd/adapter/local.rs +++ b/src/agent/state/local.rs @@ -3,9 +3,9 @@ // it to the networks. use { super::{ - Adapter, - AdapterApi, PriceIdentifier, + State, + StateApi, }, crate::agent::metrics::PriceLocalMetrics, anyhow::{ @@ -68,8 +68,8 @@ pub trait LocalStore { } // Allow downcasting Adapter into GlobalStore for functions that depend on the `GlobalStore` service. -impl<'a> From<&'a Adapter> for &'a Store { - fn from(adapter: &'a Adapter) -> &'a Store { +impl<'a> From<&'a State> for &'a Store { + fn from(adapter: &'a State) -> &'a Store { &adapter.local_store } } @@ -78,7 +78,7 @@ impl<'a> From<&'a Adapter> for &'a Store { impl LocalStore for T where for<'a> &'a T: Into<&'a Store>, - T: AdapterApi, + T: StateApi, T: Sync, { async fn update(&self, price_identifier: PriceIdentifier, price_info: PriceInfo) -> Result<()> {