From 6ce6cbc74956b6848730d108c14451579e9ba389 Mon Sep 17 00:00:00 2001 From: Reisen Date: Wed, 17 Apr 2024 09:26:49 +0000 Subject: [PATCH] refactor: modularized handlers --- src/agent/pythd/api/rpc.rs | 419 +++++------------- src/agent/pythd/api/rpc/get_all_products.rs | 15 + src/agent/pythd/api/rpc/get_product.rs | 26 ++ src/agent/pythd/api/rpc/get_product_list.rs | 15 + src/agent/pythd/api/rpc/subscribe_price.rs | 33 ++ .../pythd/api/rpc/subscribe_price_sched.rs | 33 ++ src/agent/pythd/api/rpc/update_price.rs | 30 ++ 7 files changed, 275 insertions(+), 296 deletions(-) create mode 100644 src/agent/pythd/api/rpc/get_all_products.rs create mode 100644 src/agent/pythd/api/rpc/get_product.rs create mode 100644 src/agent/pythd/api/rpc/get_product_list.rs create mode 100644 src/agent/pythd/api/rpc/subscribe_price.rs create mode 100644 src/agent/pythd/api/rpc/subscribe_price_sched.rs create mode 100644 src/agent/pythd/api/rpc/update_price.rs diff --git a/src/agent/pythd/api/rpc.rs b/src/agent/pythd/api/rpc.rs index 177aa69..20736ff 100644 --- a/src/agent/pythd/api/rpc.rs +++ b/src/agent/pythd/api/rpc.rs @@ -5,61 +5,20 @@ // accepts messages and can return responses in the expected format. use { - super::{ - super::adapter, - Conf, - NotifyPrice, - NotifyPriceSched, - Price, - Pubkey, - SubscriptionID, - }, - anyhow::{ - anyhow, - Result, - }, + super::{super::adapter, Conf, NotifyPrice, NotifyPriceSched, Price, Pubkey, SubscriptionID}, + anyhow::{anyhow, Result}, futures_util::{ - stream::{ - SplitSink, - SplitStream, - StreamExt, - }, + stream::{SplitSink, SplitStream, StreamExt}, SinkExt, }, - jrpc::{ - parse_request, - ErrorCode, - Id, - IdReq, - Request, - Response, - Value, - }, - serde::{ - de::DeserializeOwned, - Deserialize, - Serialize, - }, - serde_this_or_that::{ - as_i64, - as_u64, - }, + jrpc::{parse_request, ErrorCode, Id, IdReq, Request, Response, Value}, + serde::{de::DeserializeOwned, Deserialize, Serialize}, + serde_this_or_that::{as_i64, as_u64}, slog::Logger, - std::{ - fmt::Debug, - net::SocketAddr, - }, - tokio::sync::{ - broadcast, - mpsc, - oneshot, - }, + std::{fmt::Debug, net::SocketAddr}, + tokio::sync::{broadcast, mpsc}, warp::{ - ws::{ - Message, - WebSocket, - Ws, - }, + ws::{Message, WebSocket, Ws}, Filter, }, }; @@ -96,10 +55,10 @@ struct SubscribePriceSchedParams { struct UpdatePriceParams { account: Pubkey, #[serde(deserialize_with = "as_i64")] - price: Price, + price: Price, #[serde(deserialize_with = "as_u64")] - conf: Conf, - status: String, + conf: Conf, + status: String, } #[derive(Serialize, Deserialize, Debug, PartialEq)] @@ -327,119 +286,18 @@ async fn dispatch_and_catch_error( } } -async fn get_product_list( - adapter_tx: &mpsc::Sender, -) -> Result { - let (result_tx, result_rx) = oneshot::channel(); - adapter_tx - .send(adapter::Message::GetProductList { result_tx }) - .await?; - Ok(serde_json::to_value(result_rx.await??)?) -} - -async fn get_product( - adapter_tx: &mpsc::Sender, - request: &Request, -) -> Result { - let params: GetProductParams = { - let value = request.params.clone(); - serde_json::from_value(value.ok_or_else(|| anyhow!("Missing request parameters"))?) - }?; - - let (result_tx, result_rx) = oneshot::channel(); - adapter_tx - .send(adapter::Message::GetProduct { - account: params.account, - result_tx, - }) - .await?; - Ok(serde_json::to_value(result_rx.await??)?) -} - -async fn get_all_products( - adapter_tx: &mpsc::Sender, -) -> Result { - let (result_tx, result_rx) = oneshot::channel(); - adapter_tx - .send(adapter::Message::GetAllProducts { result_tx }) - .await?; - Ok(serde_json::to_value(result_rx.await??)?) -} - -async fn subscribe_price( - adapter_tx: &mpsc::Sender, - notify_price_tx: &mpsc::Sender, - request: &Request, -) -> Result { - let params: SubscribePriceParams = serde_json::from_value( - request - .params - .clone() - .ok_or_else(|| anyhow!("Missing request parameters"))?, - )?; - - let (result_tx, result_rx) = oneshot::channel(); - adapter_tx - .send(adapter::Message::SubscribePrice { - result_tx, - account: params.account, - notify_price_tx: notify_price_tx.clone(), - }) - .await?; - - Ok(serde_json::to_value(SubscribeResult { - subscription: result_rx.await??, - })?) -} - -async fn subscribe_price_sched( - adapter_tx: &mpsc::Sender, - notify_price_sched_tx: &mpsc::Sender, - request: &Request, -) -> Result { - let params: SubscribePriceSchedParams = serde_json::from_value( - request - .params - .clone() - .ok_or_else(|| anyhow!("Missing request parameters"))?, - )?; - - let (result_tx, result_rx) = oneshot::channel(); - adapter_tx - .send(adapter::Message::SubscribePriceSched { - result_tx, - account: params.account, - notify_price_sched_tx: notify_price_sched_tx.clone(), - }) - .await?; - - Ok(serde_json::to_value(SubscribeResult { - subscription: result_rx.await??, - })?) -} - -async fn update_price( - adapter_tx: &mpsc::Sender, - request: &Request, -) -> Result { - let params: UpdatePriceParams = serde_json::from_value( - request - .params - .clone() - .ok_or_else(|| anyhow!("Missing request parameters"))?, - )?; - - adapter_tx - .send(adapter::Message::UpdatePrice { - account: params.account, - price: params.price, - conf: params.conf, - status: params.status, - }) - .await?; - - Ok(serde_json::to_value(0)?) -} +mod get_all_products; +mod get_product; +mod get_product_list; +mod subscribe_price; +mod subscribe_price_sched; +mod update_price; +use get_all_products::*; +use get_product::*; +use get_product_list::*; +use subscribe_price::*; +use subscribe_price_sched::*; +use update_price::*; async fn send_error( ws_tx: &mut SplitSink, @@ -496,10 +354,10 @@ struct WithLogger { #[serde(default)] pub struct Config { /// The address which the websocket API server will listen on. - pub listen_address: String, + pub listen_address: String, /// Size of the buffer of each Server's channel on which `notify_price` events are /// received from the Adapter. - pub notify_price_tx_buffer: usize, + pub notify_price_tx_buffer: usize, /// Size of the buffer of each Server's channel on which `notify_price_sched` events are /// received from the Adapter. pub notify_price_sched_tx_buffer: usize, @@ -508,8 +366,8 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { - listen_address: "127.0.0.1:8910".to_string(), - notify_price_tx_buffer: 10000, + listen_address: "127.0.0.1:8910".to_string(), + notify_price_tx_buffer: 10000, notify_price_sched_tx_buffer: 10000, } } @@ -583,64 +441,33 @@ mod tests { use { super::{ super::{ - rpc::GetProductParams, - Attrs, - PriceAccount, - PriceAccountMetadata, - ProductAccount, - ProductAccountMetadata, - Pubkey, - PublisherAccount, - SubscriptionID, + rpc::GetProductParams, Attrs, PriceAccount, PriceAccountMetadata, ProductAccount, + ProductAccountMetadata, Pubkey, PublisherAccount, SubscriptionID, }, Config, }, crate::agent::pythd::{ adapter, api::{ - rpc::{ - SubscribePriceParams, - SubscribePriceSchedParams, - UpdatePriceParams, - }, - NotifyPrice, - NotifyPriceSched, - PriceUpdate, + rpc::{SubscribePriceParams, SubscribePriceSchedParams, UpdatePriceParams}, + NotifyPrice, NotifyPriceSched, PriceUpdate, }, }, anyhow::anyhow, iobuffer::IoBuffer, - jrpc::{ - Id, - Request, - }, + jrpc::{Id, Request}, rand::Rng, - serde::{ - de::DeserializeOwned, - Serialize, - }, + serde::{de::DeserializeOwned, Serialize}, slog_extlog::slog_test, - soketto::handshake::{ - Client, - ServerResponse, - }, + soketto::handshake::{Client, ServerResponse}, std::str::from_utf8, tokio::{ net::TcpStream, - sync::{ - broadcast, - mpsc, - }, + sync::{broadcast, mpsc}, task::JoinHandle, }, - tokio_retry::{ - strategy::FixedInterval, - Retry, - }, - tokio_util::compat::{ - Compat, - TokioAsyncReadCompatExt, - }, + tokio_retry::{strategy::FixedInterval, Retry}, + tokio_util::compat::{Compat, TokioAsyncReadCompatExt}, }; struct TestAdapter { @@ -655,7 +482,7 @@ mod tests { struct TestServer { shutdown_tx: broadcast::Sender<()>, - jh: JoinHandle<()>, + jh: JoinHandle<()>, } impl Drop for TestServer { @@ -666,7 +493,7 @@ mod tests { } struct TestClient { - sender: soketto::Sender>, + sender: soketto::Sender>, receiver: soketto::Receiver>, } @@ -748,8 +575,8 @@ mod tests { // Define the product account we expect to receive back let product_account_key = "some_product_account".to_string(); let product_account = ProductAccount { - account: product_account_key.clone(), - attr_dict: Attrs::from( + account: product_account_key.clone(), + attr_dict: Attrs::from( [ ("symbol", "BTC/USD"), ("asset_type", "Crypto"), @@ -760,33 +587,33 @@ mod tests { .map(|(k, v)| (k.to_string(), v.to_string())), ), price_accounts: vec![PriceAccount { - account: "some_price_account".to_string(), - price_type: "price".to_string(), - price_exponent: 8, - status: "trading".to_string(), - price: 536, - conf: 67, - twap: 276, - twac: 463, - valid_slot: 4628, - pub_slot: 4736, - prev_slot: 3856, - prev_price: 400, - prev_conf: 45, + account: "some_price_account".to_string(), + price_type: "price".to_string(), + price_exponent: 8, + status: "trading".to_string(), + price: 536, + conf: 67, + twap: 276, + twac: 463, + valid_slot: 4628, + pub_slot: 4736, + prev_slot: 3856, + prev_price: 400, + prev_conf: 45, publisher_accounts: vec![ PublisherAccount { account: "some_publisher_account".to_string(), - status: "trading".to_string(), - price: 500, - conf: 24, - slot: 3563, + status: "trading".to_string(), + price: 500, + conf: 24, + slot: 3563, }, PublisherAccount { account: "another_publisher_account".to_string(), - status: "halted".to_string(), - price: 300, - conf: 683, - slot: 5834, + status: "halted".to_string(), + price: 300, + conf: 683, + slot: 5834, }, ], }], @@ -945,9 +772,9 @@ mod tests { let status = "trading"; let params = UpdatePriceParams { account: Pubkey::from("some_price_account"), - price: 7467, - conf: 892, - status: status.to_string(), + price: 7467, + conf: 892, + status: status.to_string(), }; test_client .send(Request::with_params( @@ -984,7 +811,7 @@ mod tests { // Define the data we are working with let product_account = Pubkey::from("some_product_account"); let data = vec![ProductAccountMetadata { - account: product_account.clone(), + account: product_account.clone(), attr_dict: Attrs::from( [ ("symbol", "BTC/USD"), @@ -995,15 +822,15 @@ mod tests { ] .map(|(k, v)| (k.to_string(), v.to_string())), ), - price: vec![ + price: vec![ PriceAccountMetadata { - account: Pubkey::from("some_price_account"), - price_type: "price".to_string(), + account: Pubkey::from("some_price_account"), + price_type: "price".to_string(), price_exponent: 4, }, PriceAccountMetadata { - account: Pubkey::from("another_price_account"), - price_type: "special".to_string(), + account: Pubkey::from("another_price_account"), + price_type: "special".to_string(), price_exponent: 6, }, ], @@ -1035,8 +862,8 @@ mod tests { // Define the data we are working with let data = vec![ProductAccount { - account: Pubkey::from("some_product_account"), - attr_dict: Attrs::from( + account: Pubkey::from("some_product_account"), + attr_dict: Attrs::from( [ ("symbol", "LTC/USD"), ("asset_type", "Crypto"), @@ -1047,33 +874,33 @@ mod tests { .map(|(k, v)| (k.to_string(), v.to_string())), ), price_accounts: vec![PriceAccount { - account: Pubkey::from("some_price_account"), - price_type: "price".to_string(), - price_exponent: 7463, - status: "trading".to_string(), - price: 6453, - conf: 3434, - twap: 6454, - twac: 365, - valid_slot: 3646, - pub_slot: 2857, - prev_slot: 7463, - prev_price: 3784, - prev_conf: 9879, + account: Pubkey::from("some_price_account"), + price_type: "price".to_string(), + price_exponent: 7463, + status: "trading".to_string(), + price: 6453, + conf: 3434, + twap: 6454, + twac: 365, + valid_slot: 3646, + pub_slot: 2857, + prev_slot: 7463, + prev_price: 3784, + prev_conf: 9879, publisher_accounts: vec![ PublisherAccount { account: Pubkey::from("some_publisher_account"), - status: "trading".to_string(), - price: 756, - conf: 8787, - slot: 2209, + status: "trading".to_string(), + price: 756, + conf: 8787, + slot: 2209, }, PublisherAccount { account: Pubkey::from("another_publisher_account"), - status: "halted".to_string(), - price: 0, - conf: 0, - slot: 6676, + status: "halted".to_string(), + price: 0, + conf: 0, + slot: 6676, }, ], }], @@ -1135,12 +962,12 @@ mod tests { // Send a Notify Price event from the adapter to the server, with the corresponding subscription id let notify_price_update = NotifyPrice { subscription: subscription_id, - result: PriceUpdate { - price: 74, - conf: 24, - status: "trading".to_string(), + result: PriceUpdate { + price: 74, + conf: 24, + status: "trading".to_string(), valid_slot: 6786, - pub_slot: 9897, + pub_slot: 9897, }, }; notify_price_tx.send(notify_price_update).await.unwrap(); @@ -1241,8 +1068,8 @@ mod tests { .await; let product_account = ProductAccount { - account: product_account_key, - attr_dict: Attrs::from( + account: product_account_key, + attr_dict: Attrs::from( [ ("symbol", "BTC/USD"), ("asset_type", "Crypto"), @@ -1253,33 +1080,33 @@ mod tests { .map(|(k, v)| (k.to_string(), v.to_string())), ), price_accounts: vec![PriceAccount { - account: "some_price_account".to_string(), - price_type: "price".to_string(), - price_exponent: 8, - status: "trading".to_string(), - price: 536, - conf: 67, - twap: 276, - twac: 463, - valid_slot: 4628, - pub_slot: 4736, - prev_slot: 3856, - prev_price: 400, - prev_conf: 45, + account: "some_price_account".to_string(), + price_type: "price".to_string(), + price_exponent: 8, + status: "trading".to_string(), + price: 536, + conf: 67, + twap: 276, + twac: 463, + valid_slot: 4628, + pub_slot: 4736, + prev_slot: 3856, + prev_price: 400, + prev_conf: 45, publisher_accounts: vec![ PublisherAccount { account: "some_publisher_account".to_string(), - status: "trading".to_string(), - price: 500, - conf: 24, - slot: 3563, + status: "trading".to_string(), + price: 500, + conf: 24, + slot: 3563, }, PublisherAccount { account: "another_publisher_account".to_string(), - status: "halted".to_string(), - price: 300, - conf: 683, - slot: 5834, + status: "halted".to_string(), + price: 300, + conf: 683, + slot: 5834, }, ], }], diff --git a/src/agent/pythd/api/rpc/get_all_products.rs b/src/agent/pythd/api/rpc/get_all_products.rs new file mode 100644 index 0000000..85b22c9 --- /dev/null +++ b/src/agent/pythd/api/rpc/get_all_products.rs @@ -0,0 +1,15 @@ +use { + crate::agent::pythd::adapter, + anyhow::Result, + tokio::sync::{mpsc, oneshot}, +}; + +pub async fn get_all_products( + adapter_tx: &mpsc::Sender, +) -> Result { + let (result_tx, result_rx) = oneshot::channel(); + adapter_tx + .send(adapter::Message::GetAllProducts { result_tx }) + .await?; + Ok(serde_json::to_value(result_rx.await??)?) +} diff --git a/src/agent/pythd/api/rpc/get_product.rs b/src/agent/pythd/api/rpc/get_product.rs new file mode 100644 index 0000000..8e43f5c --- /dev/null +++ b/src/agent/pythd/api/rpc/get_product.rs @@ -0,0 +1,26 @@ +use { + super::{GetProductParams, Method}, + crate::agent::pythd::adapter, + anyhow::{anyhow, Result}, + jrpc::{Request, Value}, + tokio::sync::{mpsc, oneshot}, +}; + +pub async fn get_product( + adapter_tx: &mpsc::Sender, + request: &Request, +) -> Result { + let params: GetProductParams = { + let value = request.params.clone(); + serde_json::from_value(value.ok_or_else(|| anyhow!("Missing request parameters"))?) + }?; + + let (result_tx, result_rx) = oneshot::channel(); + adapter_tx + .send(adapter::Message::GetProduct { + account: params.account, + result_tx, + }) + .await?; + Ok(serde_json::to_value(result_rx.await??)?) +} diff --git a/src/agent/pythd/api/rpc/get_product_list.rs b/src/agent/pythd/api/rpc/get_product_list.rs new file mode 100644 index 0000000..b01d2e0 --- /dev/null +++ b/src/agent/pythd/api/rpc/get_product_list.rs @@ -0,0 +1,15 @@ +use { + crate::agent::pythd::adapter, + anyhow::Result, + tokio::sync::{mpsc, oneshot}, +}; + +pub async fn get_product_list( + adapter_tx: &mpsc::Sender, +) -> Result { + let (result_tx, result_rx) = oneshot::channel(); + adapter_tx + .send(adapter::Message::GetProductList { result_tx }) + .await?; + Ok(serde_json::to_value(result_rx.await??)?) +} diff --git a/src/agent/pythd/api/rpc/subscribe_price.rs b/src/agent/pythd/api/rpc/subscribe_price.rs new file mode 100644 index 0000000..84db4c6 --- /dev/null +++ b/src/agent/pythd/api/rpc/subscribe_price.rs @@ -0,0 +1,33 @@ +use { + super::{Method, NotifyPrice, SubscribePriceParams, SubscribeResult}, + crate::agent::pythd::adapter, + anyhow::{anyhow, Result}, + jrpc::{Request, Value}, + tokio::sync::{mpsc, oneshot}, +}; + +pub async fn subscribe_price( + adapter_tx: &mpsc::Sender, + notify_price_tx: &mpsc::Sender, + request: &Request, +) -> Result { + let params: SubscribePriceParams = serde_json::from_value( + request + .params + .clone() + .ok_or_else(|| anyhow!("Missing request parameters"))?, + )?; + + let (result_tx, result_rx) = oneshot::channel(); + adapter_tx + .send(adapter::Message::SubscribePrice { + result_tx, + account: params.account, + notify_price_tx: notify_price_tx.clone(), + }) + .await?; + + Ok(serde_json::to_value(SubscribeResult { + subscription: result_rx.await??, + })?) +} diff --git a/src/agent/pythd/api/rpc/subscribe_price_sched.rs b/src/agent/pythd/api/rpc/subscribe_price_sched.rs new file mode 100644 index 0000000..c0c9015 --- /dev/null +++ b/src/agent/pythd/api/rpc/subscribe_price_sched.rs @@ -0,0 +1,33 @@ +use { + super::{Method, NotifyPriceSched, SubscribePriceSchedParams, SubscribeResult}, + crate::agent::pythd::adapter, + anyhow::{anyhow, Result}, + jrpc::{Request, Value}, + tokio::sync::{mpsc, oneshot}, +}; + +pub async fn subscribe_price_sched( + adapter_tx: &mpsc::Sender, + notify_price_sched_tx: &mpsc::Sender, + request: &Request, +) -> Result { + let params: SubscribePriceSchedParams = serde_json::from_value( + request + .params + .clone() + .ok_or_else(|| anyhow!("Missing request parameters"))?, + )?; + + let (result_tx, result_rx) = oneshot::channel(); + adapter_tx + .send(adapter::Message::SubscribePriceSched { + result_tx, + account: params.account, + notify_price_sched_tx: notify_price_sched_tx.clone(), + }) + .await?; + + Ok(serde_json::to_value(SubscribeResult { + subscription: result_rx.await??, + })?) +} diff --git a/src/agent/pythd/api/rpc/update_price.rs b/src/agent/pythd/api/rpc/update_price.rs new file mode 100644 index 0000000..d55dd19 --- /dev/null +++ b/src/agent/pythd/api/rpc/update_price.rs @@ -0,0 +1,30 @@ +use { + super::{Method, UpdatePriceParams}, + crate::agent::pythd::adapter, + anyhow::{anyhow, Result}, + jrpc::{Request, Value}, + tokio::sync::mpsc, +}; + +pub async fn update_price( + adapter_tx: &mpsc::Sender, + request: &Request, +) -> Result { + let params: UpdatePriceParams = serde_json::from_value( + request + .params + .clone() + .ok_or_else(|| anyhow!("Missing request parameters"))?, + )?; + + adapter_tx + .send(adapter::Message::UpdatePrice { + account: params.account, + price: params.price, + conf: params.conf, + status: params.status, + }) + .await?; + + Ok(serde_json::to_value(0)?) +}