From 87a38be3d04666d38f2be70f588d697ca8a22d50 Mon Sep 17 00:00:00 2001 From: Reisen Date: Wed, 5 Jun 2024 08:56:11 +0000 Subject: [PATCH] refactor(agent): refactor all references to adapter to state --- src/agent.rs | 20 ++-- src/agent/metrics.rs | 6 +- src/agent/pythd/api/rpc.rs | 42 ++++---- src/agent/pythd/api/rpc/get_all_products.rs | 4 +- src/agent/pythd/api/rpc/get_product.rs | 4 +- src/agent/pythd/api/rpc/get_product_list.rs | 4 +- src/agent/pythd/api/rpc/subscribe_price.rs | 4 +- .../pythd/api/rpc/subscribe_price_sched.rs | 4 +- src/agent/pythd/api/rpc/update_price.rs | 4 +- src/agent/solana.rs | 6 +- src/agent/solana/exporter.rs | 16 +-- src/agent/solana/oracle.rs | 14 +-- src/agent/state.rs | 101 ++++++++---------- src/agent/state/api.rs | 8 +- src/agent/state/global.rs | 6 +- src/agent/state/keypairs.rs | 6 +- src/agent/state/local.rs | 6 +- 17 files changed, 122 insertions(+), 133 deletions(-) diff --git a/src/agent.rs b/src/agent.rs index ed4d18d..2c436d2 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -123,16 +123,15 @@ impl Agent { // job handles let mut jhs = vec![]; - // Create the Pythd Adapter. - let adapter = - Arc::new(state::State::new(self.config.pythd_adapter.clone(), logger.clone()).await); + // Create the Application State. + let state = Arc::new(state::State::new(self.config.state.clone(), logger.clone()).await); // Spawn the primary network jhs.extend(network::spawn_network( self.config.primary_network.clone(), network::Network::Primary, logger.new(o!("primary" => true)), - adapter.clone(), + state.clone(), )?); // Spawn the secondary network, if needed @@ -141,25 +140,25 @@ impl Agent { config.clone(), network::Network::Secondary, logger.new(o!("primary" => false)), - adapter.clone(), + state.clone(), )?); } // Create the Notifier task for the Pythd RPC. - jhs.push(tokio::spawn(notifier(logger.clone(), adapter.clone()))); + jhs.push(tokio::spawn(notifier(logger.clone(), state.clone()))); // Spawn the Pythd API Server jhs.push(tokio::spawn(rpc::run( self.config.pythd_api_server.clone(), logger.clone(), - adapter.clone(), + state.clone(), ))); // Spawn the metrics server jhs.push(tokio::spawn(metrics::MetricsServer::spawn( self.config.metrics_server.bind_address, logger.clone(), - adapter.clone(), + state.clone(), ))); // Spawn the remote keypair loader endpoint for both networks @@ -172,7 +171,7 @@ impl Agent { .map(|c| c.rpc_url.clone()), self.config.remote_keypair_loader.clone(), logger, - adapter, + state, ) .await, ); @@ -210,7 +209,8 @@ pub mod config { pub primary_network: network::Config, pub secondary_network: Option, #[serde(default)] - pub pythd_adapter: state::Config, + #[serde(rename = "pythd_adapter")] + pub state: state::Config, #[serde(default)] pub pythd_api_server: pythd::api::rpc::Config, #[serde(default)] diff --git a/src/agent/metrics.rs b/src/agent/metrics.rs index d88c6d6..812acd4 100644 --- a/src/agent/metrics.rs +++ b/src/agent/metrics.rs @@ -69,16 +69,16 @@ lazy_static! { pub struct MetricsServer { pub start_time: Instant, pub logger: Logger, - pub adapter: Arc, + pub state: Arc, } impl MetricsServer { /// Instantiate a metrics API. - pub async fn spawn(addr: impl Into + 'static, logger: Logger, adapter: Arc) { + pub async fn spawn(addr: impl Into + 'static, logger: Logger, state: Arc) { let server = MetricsServer { start_time: Instant::now(), logger, - adapter, + state, }; let shared_state = Arc::new(Mutex::new(server)); diff --git a/src/agent/pythd/api/rpc.rs b/src/agent/pythd/api/rpc.rs index 72fb51e..29473af 100644 --- a/src/agent/pythd/api/rpc.rs +++ b/src/agent/pythd/api/rpc.rs @@ -112,7 +112,7 @@ enum ConnectionError { async fn handle_connection( ws_conn: WebSocket, - adapter: Arc, + state: Arc, notify_price_tx_buffer: usize, notify_price_sched_tx_buffer: usize, logger: Logger, @@ -131,7 +131,7 @@ async fn handle_connection( loop { if let Err(err) = handle_next( &logger, - &*adapter, + &*state, &mut ws_tx, &mut ws_rx, &mut notify_price_tx, @@ -156,7 +156,7 @@ async fn handle_connection( async fn handle_next( logger: &Logger, - adapter: &S, + state: &S, ws_tx: &mut SplitSink, ws_rx: &mut SplitStream, notify_price_tx: &mut mpsc::Sender, @@ -175,7 +175,7 @@ where handle( logger, ws_tx, - adapter, + state, notify_price_tx, notify_price_sched_tx, msg, @@ -201,7 +201,7 @@ where async fn handle( logger: &Logger, ws_tx: &mut SplitSink, - adapter: &S, + state: &S, notify_price_tx: &mpsc::Sender, notify_price_sched_tx: &mpsc::Sender, msg: Message, @@ -224,7 +224,7 @@ where for request in requests { let response = dispatch_and_catch_error( logger, - adapter, + state, notify_price_tx, notify_price_sched_tx, &request, @@ -287,7 +287,7 @@ async fn parse(msg: Message) -> Result<(Vec>, bool)> { async fn dispatch_and_catch_error( logger: &Logger, - adapter: &S, + state: &S, notify_price_tx: &mpsc::Sender, notify_price_sched_tx: &mpsc::Sender, request: &Request, @@ -302,13 +302,13 @@ where ); let result = match request.method { - Method::GetProductList => get_product_list(adapter).await, - Method::GetProduct => get_product(adapter, request).await, - Method::GetAllProducts => get_all_products(adapter).await, - Method::UpdatePrice => update_price(adapter, request).await, - Method::SubscribePrice => subscribe_price(adapter, notify_price_tx, request).await, + Method::GetProductList => get_product_list(state).await, + Method::GetProduct => get_product(state, request).await, + Method::GetAllProducts => get_all_products(state).await, + Method::UpdatePrice => update_price(state, request).await, + Method::SubscribePrice => subscribe_price(state, notify_price_tx, request).await, Method::SubscribePriceSched => { - subscribe_price_sched(adapter, notify_price_sched_tx, request).await + subscribe_price_sched(state, notify_price_sched_tx, request).await } Method::NotifyPrice | Method::NotifyPriceSched => { Err(anyhow!("unsupported method: {:?}", request.method)) @@ -410,10 +410,10 @@ pub struct Config { /// The address which the websocket API server will listen on. pub listen_address: String, /// Size of the buffer of each Server's channel on which `notify_price` events are - /// received from the Adapter. + /// received from the Price state. pub notify_price_tx_buffer: usize, /// Size of the buffer of each Server's channel on which `notify_price_sched` events are - /// received from the Adapter. + /// received from the Price state. pub notify_price_sched_tx_buffer: usize, } @@ -427,20 +427,20 @@ impl Default for Config { } } -pub async fn run(config: Config, logger: Logger, adapter: Arc) +pub async fn run(config: Config, logger: Logger, state: Arc) where S: state::Prices, S: Send, S: Sync, S: 'static, { - if let Err(err) = serve(config, &logger, adapter).await { + if let Err(err) = serve(config, &logger, state).await { error!(logger, "{}", err); debug!(logger, "error context"; "context" => format!("{:?}", err)); } } -async fn serve(config: Config, logger: &Logger, adapter: Arc) -> Result<()> +async fn serve(config: Config, logger: &Logger, state: Arc) -> Result<()> where S: state::Prices, S: Send, @@ -456,16 +456,16 @@ where let config = config.clone(); warp::path::end() .and(warp::ws()) - .and(warp::any().map(move || adapter.clone())) + .and(warp::any().map(move || state.clone())) .and(warp::any().map(move || with_logger.clone())) .and(warp::any().map(move || config.clone())) .map( - |ws: Ws, adapter: Arc, with_logger: WithLogger, config: Config| { + |ws: Ws, state: Arc, with_logger: WithLogger, config: Config| { ws.on_upgrade(move |conn| async move { info!(with_logger.logger, "websocket user connected"); handle_connection( conn, - adapter, + state, config.notify_price_tx_buffer, config.notify_price_sched_tx_buffer, with_logger.logger, diff --git a/src/agent/pythd/api/rpc/get_all_products.rs b/src/agent/pythd/api/rpc/get_all_products.rs index 71e5dc4..7a34278 100644 --- a/src/agent/pythd/api/rpc/get_all_products.rs +++ b/src/agent/pythd/api/rpc/get_all_products.rs @@ -3,10 +3,10 @@ use { anyhow::Result, }; -pub async fn get_all_products(adapter: &S) -> Result +pub async fn get_all_products(state: &S) -> Result where S: state::Prices, { - let products = adapter.get_all_products().await?; + let products = state.get_all_products().await?; Ok(serde_json::to_value(products)?) } diff --git a/src/agent/pythd/api/rpc/get_product.rs b/src/agent/pythd/api/rpc/get_product.rs index 8eb11bc..19c69a5 100644 --- a/src/agent/pythd/api/rpc/get_product.rs +++ b/src/agent/pythd/api/rpc/get_product.rs @@ -15,7 +15,7 @@ use { }; pub async fn get_product( - adapter: &S, + state: &S, request: &Request, ) -> Result where @@ -27,6 +27,6 @@ where }?; let account = params.account.parse::()?; - let product = adapter.get_product(&account).await?; + let product = state.get_product(&account).await?; Ok(serde_json::to_value(product)?) } diff --git a/src/agent/pythd/api/rpc/get_product_list.rs b/src/agent/pythd/api/rpc/get_product_list.rs index e8efa1e..30cde6e 100644 --- a/src/agent/pythd/api/rpc/get_product_list.rs +++ b/src/agent/pythd/api/rpc/get_product_list.rs @@ -3,10 +3,10 @@ use { anyhow::Result, }; -pub async fn get_product_list(adapter: &S) -> Result +pub async fn get_product_list(state: &S) -> Result where S: state::Prices, { - let product_list = adapter.get_product_list().await?; + let product_list = state.get_product_list().await?; Ok(serde_json::to_value(product_list)?) } diff --git a/src/agent/pythd/api/rpc/subscribe_price.rs b/src/agent/pythd/api/rpc/subscribe_price.rs index 253bba6..5936505 100644 --- a/src/agent/pythd/api/rpc/subscribe_price.rs +++ b/src/agent/pythd/api/rpc/subscribe_price.rs @@ -18,7 +18,7 @@ use { }; pub async fn subscribe_price( - adapter: &S, + state: &S, notify_price_tx: &mpsc::Sender, request: &Request, ) -> Result @@ -33,7 +33,7 @@ where )?; let account = params.account.parse::()?; - let subscription = adapter + let subscription = state .subscribe_price(&account, notify_price_tx.clone()) .await; diff --git a/src/agent/pythd/api/rpc/subscribe_price_sched.rs b/src/agent/pythd/api/rpc/subscribe_price_sched.rs index 999f2a4..608a489 100644 --- a/src/agent/pythd/api/rpc/subscribe_price_sched.rs +++ b/src/agent/pythd/api/rpc/subscribe_price_sched.rs @@ -18,7 +18,7 @@ use { }; pub async fn subscribe_price_sched( - adapter: &S, + state: &S, notify_price_sched_tx: &mpsc::Sender, request: &Request, ) -> Result @@ -33,7 +33,7 @@ where )?; let account = params.account.parse::()?; - let subscription = adapter + let subscription = state .subscribe_price_sched(&account, notify_price_sched_tx.clone()) .await; diff --git a/src/agent/pythd/api/rpc/update_price.rs b/src/agent/pythd/api/rpc/update_price.rs index 4af0c29..c5748af 100644 --- a/src/agent/pythd/api/rpc/update_price.rs +++ b/src/agent/pythd/api/rpc/update_price.rs @@ -15,7 +15,7 @@ use { }; pub async fn update_price( - adapter: &S, + state: &S, request: &Request, ) -> Result where @@ -28,7 +28,7 @@ where .ok_or_else(|| anyhow!("Missing request parameters"))?, )?; - adapter + state .update_local_price( ¶ms.account.parse::()?, params.price, diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 4c2d9b3..1b4456c 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -75,7 +75,7 @@ pub mod network { config: Config, network: Network, logger: Logger, - adapter: Arc, + state: Arc, ) -> Result>> { // Publisher permissions updates between oracle and exporter let (publisher_permissions_tx, publisher_permissions_rx) = watch::channel(<_>::default()); @@ -90,7 +90,7 @@ pub mod network { publisher_permissions_tx, KeyStore::new(config.key_store.clone(), &logger)?, logger.clone(), - adapter.clone(), + state.clone(), ); // Spawn the Exporter @@ -102,7 +102,7 @@ pub mod network { publisher_permissions_rx, KeyStore::new(config.key_store.clone(), &logger)?, logger, - adapter, + state, )?; jhs.extend(exporter_jhs); diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index 10a52cb..ed412d1 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -176,7 +176,7 @@ pub fn spawn_exporter( >, key_store: KeyStore, logger: Logger, - adapter: Arc, + state: Arc, ) -> Result>> { // Create and spawn the network state querier let (network_state_tx, network_state_rx) = watch::channel(Default::default()); @@ -212,7 +212,7 @@ pub fn spawn_exporter( transactions_tx, publisher_permissions_rx, logger, - adapter, + state, ); let exporter_jh = tokio::spawn(async move { exporter.run().await }); @@ -265,7 +265,7 @@ pub struct Exporter { logger: Logger, - adapter: Arc, + state: Arc, } impl Exporter { @@ -281,7 +281,7 @@ impl Exporter { HashMap>, >, logger: Logger, - adapter: Arc, + state: Arc, ) -> Self { let publish_interval = time::interval(config.publish_interval_duration); Exporter { @@ -303,7 +303,7 @@ impl Exporter { ), recent_compute_unit_price_micro_lamports: None, logger, - adapter, + state, } } @@ -411,7 +411,7 @@ impl Exporter { self.logger, "Exporter: Publish keypair is None, requesting remote loaded key" ); - let kp = Keypairs::request_keypair(&*self.adapter, self.network).await?; + let kp = Keypairs::request_keypair(&*self.state, self.network).await?; debug!(self.logger, "Exporter: Keypair received"); Ok(kp) } @@ -596,7 +596,7 @@ impl Exporter { } async fn fetch_local_store_contents(&self) -> Result> { - Ok(LocalStore::get_all_price_infos(&*self.adapter).await) + Ok(LocalStore::get_all_price_infos(&*self.state).await) } async fn publish_batch(&self, batch: &[(Identifier, PriceInfo)]) -> Result<()> { @@ -700,7 +700,7 @@ impl Exporter { // in this batch. This will use the maximum total compute unit fee if the publisher // hasn't updated for >= MAXIMUM_SLOT_GAP_FOR_DYNAMIC_COMPUTE_UNIT_PRICE slots. let result = GlobalStore::price_accounts( - &*self.adapter, + &*self.state, self.network, price_accounts.clone().into_iter().collect(), ) diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index b9b7408..e8cba19 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -178,7 +178,7 @@ pub struct Oracle { logger: Logger, - adapter: Arc, + state: Arc, } #[derive(Clone, Serialize, Deserialize, Debug)] @@ -228,7 +228,7 @@ pub fn spawn_oracle( >, key_store: KeyStore, logger: Logger, - adapter: Arc, + state: Arc, ) -> Vec> { let mut jhs = vec![]; @@ -261,7 +261,7 @@ pub fn spawn_oracle( jhs.push(tokio::spawn(async move { poller.run().await })); // Create and spawn the Oracle - let mut oracle = Oracle::new(data_rx, updates_rx, network, logger, adapter); + let mut oracle = Oracle::new(data_rx, updates_rx, network, logger, state); jhs.push(tokio::spawn(async move { oracle.run().await })); jhs @@ -273,7 +273,7 @@ impl Oracle { updates_rx: mpsc::Receiver<(Pubkey, solana_sdk::account::Account)>, network: Network, logger: Logger, - adapter: Arc, + state: Arc, ) -> Self { Oracle { data: Default::default(), @@ -281,7 +281,7 @@ impl Oracle { updates_rx, network, logger, - adapter, + state, } } @@ -415,7 +415,7 @@ impl Oracle { account: &ProductEntry, ) -> Result<()> { Prices::update_global_price( - &*self.adapter, + &*self.state, self.network, &Update::ProductAccountUpdate { account_key: *account_key, @@ -432,7 +432,7 @@ impl Oracle { account: &PriceEntry, ) -> Result<()> { Prices::update_global_price( - &*self.adapter, + &*self.state, self.network, &Update::PriceAccountUpdate { account_key: *account_key, diff --git a/src/agent/state.rs b/src/agent/state.rs index ace805a..32d6752 100644 --- a/src/agent/state.rs +++ b/src/agent/state.rs @@ -43,9 +43,7 @@ impl Default for Config { } } -/// Adapter is the adapter between the pythd websocket API, and the stores. -/// It is responsible for implementing the business logic for responding to -/// the pythd websocket API calls. +/// State contains all relevant shared application state. pub struct State { /// Global store for managing the unified state of Pyth-on-Solana networks. global_store: global::Store, @@ -151,27 +149,26 @@ mod tests { }, }; - struct TestAdapter { - adapter: Arc, + struct TestState { + state: Arc, shutdown_tx: broadcast::Sender<()>, jh: JoinHandle<()>, } - async fn setup() -> TestAdapter { - // Create and spawn an adapter + async fn setup() -> TestState { let notify_price_sched_interval_duration = Duration::from_nanos(10); let logger = slog_test::new_test_logger(IoBuffer::new()); let config = Config { notify_price_sched_interval_duration, }; - let adapter = Arc::new(State::new(config, logger.clone()).await); + let state = Arc::new(State::new(config, logger.clone()).await); let (shutdown_tx, _) = broadcast::channel(1); // Spawn Price Notifier - let jh = tokio::spawn(notifier(logger, adapter.clone())); + let jh = tokio::spawn(notifier(logger, state.clone())); - TestAdapter { - adapter, + TestState { + state, shutdown_tx, jh, } @@ -179,15 +176,15 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_subscribe_price_sched() { - let test_adapter = setup().await; + let state = setup().await; // Send a Subscribe Price Sched message let account = "2wrWGm63xWubz7ue4iYR3qvBbaUJhZVi4eSpNuU8k8iF" .parse::() .unwrap(); let (notify_price_sched_tx, mut notify_price_sched_rx) = mpsc::channel(1000); - let subscription_id = test_adapter - .adapter + let subscription_id = state + .state .subscribe_price_sched(&account, notify_price_sched_tx) .await; @@ -201,8 +198,8 @@ mod tests { ) } - let _ = test_adapter.shutdown_tx.send(()); - test_adapter.jh.abort(); + let _ = state.shutdown_tx.send(()); + state.jh.abort(); } fn get_test_all_accounts_metadata() -> global::AllAccountsMetadata { @@ -324,17 +321,16 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_get_product_list() { - // Start the test adapter - let test_adapter = setup().await; + let state = setup().await; let accounts_metadata = get_test_all_accounts_metadata(); - test_adapter - .adapter + state + .state .global_store ._account_metadata(accounts_metadata) .await; // Send a Get Product List message - let mut product_list = test_adapter.adapter.get_product_list().await.unwrap(); + let mut product_list = state.state.get_product_list().await.unwrap(); // Check that the result is what we expected let expected = vec![ @@ -404,8 +400,8 @@ mod tests { product_list.sort(); assert_eq!(product_list, expected); - let _ = test_adapter.shutdown_tx.send(()); - test_adapter.jh.abort(); + let _ = state.shutdown_tx.send(()); + state.jh.abort(); } fn pad_price_comps(mut inputs: Vec) -> [PriceComp; 32] { @@ -1031,17 +1027,16 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_get_all_products() { - // Start the test adapter - let test_adapter = setup().await; + let state = setup().await; let accounts_data = get_all_accounts_data(); - test_adapter - .adapter + state + .state .global_store ._account_data_primary(accounts_data) .await; // Send a Get All Products message - let mut all_products = test_adapter.adapter.get_all_products().await.unwrap(); + let mut all_products = state.state.get_all_products().await.unwrap(); // Check that the result of the conversion to the Pythd API format is what we expected let expected = vec![ @@ -1240,17 +1235,16 @@ mod tests { all_products.sort(); assert_eq!(all_products, expected); - let _ = test_adapter.shutdown_tx.send(()); - test_adapter.jh.abort(); + let _ = state.shutdown_tx.send(()); + state.jh.abort(); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_get_product() { - // Start the test adapter - let test_adapter = setup().await; + let state = setup().await; let accounts_data = get_all_accounts_data(); - test_adapter - .adapter + state + .state .global_store ._account_data_primary(accounts_data) .await; @@ -1259,7 +1253,7 @@ mod tests { let account = "CkMrDWtmFJZcmAUC11qNaWymbXQKvnRx4cq1QudLav7t" .parse::() .unwrap(); - let product = test_adapter.adapter.get_product(&account).await.unwrap(); + let product = state.state.get_product(&account).await.unwrap(); // Check that the result of the conversion to the Pythd API format is what we expected let expected = ProductAccount { @@ -1349,14 +1343,13 @@ mod tests { }; assert_eq!(product, expected); - let _ = test_adapter.shutdown_tx.send(()); - test_adapter.jh.abort(); + let _ = state.shutdown_tx.send(()); + state.jh.abort(); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_update_price() { - // Start the test adapter - let test_adapter = setup().await; + let state = setup().await; // Send an Update Price message let account = "CkMrDWtmFJZcmAUC11qNaWymbXQKvnRx4cq1QudLav7t" @@ -1364,14 +1357,14 @@ mod tests { .unwrap(); let price = 2365; let conf = 98754; - let _ = test_adapter - .adapter + let _ = state + .state .update_local_price(&account, price, conf, "trading".to_string()) .await .unwrap(); // Check that the local store indeed received the correct update - let price_infos = LocalStore::get_all_price_infos(&*test_adapter.adapter).await; + let price_infos = LocalStore::get_all_price_infos(&*state.state).await; let price_info = price_infos .get(&Identifier::new(account.to_bytes())) .unwrap(); @@ -1380,26 +1373,22 @@ mod tests { assert_eq!(price_info.conf, conf); assert_eq!(price_info.status, PriceStatus::Trading); - let _ = test_adapter.shutdown_tx.send(()); - test_adapter.jh.abort(); + let _ = state.shutdown_tx.send(()); + state.jh.abort(); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_subscribe_notify_price() { - // Start the test adapter - let test_adapter = setup().await; + let state = setup().await; // Send a Subscribe Price message let account = "2wrWGm63xWubz7ue4iYR3qvBbaUJhZVi4eSpNuU8k8iF" .parse::() .unwrap(); let (notify_price_tx, mut notify_price_rx) = mpsc::channel(1000); - let subscription_id = test_adapter - .adapter - .subscribe_price(&account, notify_price_tx) - .await; + let subscription_id = state.state.subscribe_price(&account, notify_price_tx).await; - // Send an update from the global store to the adapter + // Send an update via the global store. let test_price: PriceEntry = SolanaPriceAccount { magic: 0xa1b2c3d4, ver: 7, @@ -1450,8 +1439,8 @@ mod tests { } .into(); - let _ = test_adapter - .adapter + let _ = state + .state .update_global_price( Network::Primary, &Update::PriceAccountUpdate { @@ -1467,7 +1456,7 @@ mod tests { .await .unwrap(); - // Check that the adapter sends a notify price message with the corresponding subscription id + // Check that the application sends a notify price message with the corresponding subscription id // to the expected channel. assert_eq!( notify_price_rx.recv().await.unwrap(), @@ -1483,7 +1472,7 @@ mod tests { } ); - let _ = test_adapter.shutdown_tx.send(()); - test_adapter.jh.abort(); + let _ = state.shutdown_tx.send(()); + state.jh.abort(); } } diff --git a/src/agent/state/api.rs b/src/agent/state/api.rs index d5bb1d8..5501a24 100644 --- a/src/agent/state/api.rs +++ b/src/agent/state/api.rs @@ -193,10 +193,10 @@ pub trait Prices { fn map_status(status: &str) -> Result; } -// Allow downcasting Adapter into Keypairs for functions that depend on the `Keypairs` service. +// Allow downcasting State into Keypairs for functions that depend on the `Keypairs` service. impl<'a> From<&'a State> for &'a PricesState { - fn from(adapter: &'a State) -> &'a PricesState { - &adapter.prices + fn from(state: &'a State) -> &'a PricesState { + &state.prices } } @@ -333,7 +333,7 @@ where { // Send the notify price sched update without awaiting. This results in raising errors // if the channel is full which normally should not happen. This is because we do not - // want to block the adapter if the channel is full. + // want to block the API if the channel is full. subscription .notify_price_sched_tx .try_send(NotifyPriceSched { diff --git a/src/agent/state/global.rs b/src/agent/state/global.rs index b6f408b..cdfad98 100644 --- a/src/agent/state/global.rs +++ b/src/agent/state/global.rs @@ -162,10 +162,10 @@ pub trait GlobalStore { ) -> Result>; } -// Allow downcasting Adapter into GlobalStore for functions that depend on the `GlobalStore` service. +// Allow downcasting State into GlobalStore for functions that depend on the `GlobalStore` service. impl<'a> From<&'a State> for &'a Store { - fn from(adapter: &'a State) -> &'a Store { - &adapter.global_store + fn from(state: &'a State) -> &'a Store { + &state.global_store } } diff --git a/src/agent/state/keypairs.rs b/src/agent/state/keypairs.rs index f8f91da..73d4da8 100644 --- a/src/agent/state/keypairs.rs +++ b/src/agent/state/keypairs.rs @@ -76,10 +76,10 @@ pub trait Keypairs { async fn update_keypair(&self, network: Network, new_keypair: Keypair); } -// Allow downcasting Adapter into Keypairs for functions that depend on the `Keypairs` service. +// Allow downcasting State into Keypairs for functions that depend on the `Keypairs` service. impl<'a> From<&'a State> for &'a KeypairState { - fn from(adapter: &'a State) -> &'a KeypairState { - &adapter.keypairs + fn from(state: &'a State) -> &'a KeypairState { + &state.keypairs } } diff --git a/src/agent/state/local.rs b/src/agent/state/local.rs index 0fec3b2..60e3f7b 100644 --- a/src/agent/state/local.rs +++ b/src/agent/state/local.rs @@ -66,10 +66,10 @@ pub trait LocalStore { async fn get_all_price_infos(&self) -> HashMap; } -// Allow downcasting Adapter into GlobalStore for functions that depend on the `GlobalStore` service. +// Allow downcasting State into GlobalStore for functions that depend on the `GlobalStore` service. impl<'a> From<&'a State> for &'a Store { - fn from(adapter: &'a State) -> &'a Store { - &adapter.local_store + fn from(state: &'a State) -> &'a Store { + &state.local_store } }