Skip to content

Commit

Permalink
refactor: modularized handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
Reisen committed Apr 17, 2024
1 parent 4caa320 commit 0d6b36a
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 114 deletions.
128 changes: 14 additions & 114 deletions src/agent/pythd/api/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use {
tokio::sync::{
broadcast,
mpsc,
oneshot,
},
warp::{
ws::{
Expand Down Expand Up @@ -327,119 +326,20 @@ async fn dispatch_and_catch_error(
}
}

async fn get_product_list(
adapter_tx: &mpsc::Sender<adapter::Message>,
) -> Result<serde_json::Value> {
let (result_tx, result_rx) = oneshot::channel();
adapter_tx
.send(adapter::Message::GetProductList { result_tx })
.await?;
Ok(serde_json::to_value(result_rx.await??)?)
}

async fn get_product(
adapter_tx: &mpsc::Sender<adapter::Message>,
request: &Request<Method, Value>,
) -> Result<serde_json::Value> {
let params: GetProductParams = {
let value = request.params.clone();
serde_json::from_value(value.ok_or_else(|| anyhow!("Missing request parameters"))?)
}?;

let (result_tx, result_rx) = oneshot::channel();
adapter_tx
.send(adapter::Message::GetProduct {
account: params.account,
result_tx,
})
.await?;
Ok(serde_json::to_value(result_rx.await??)?)
}

async fn get_all_products(
adapter_tx: &mpsc::Sender<adapter::Message>,
) -> Result<serde_json::Value> {
let (result_tx, result_rx) = oneshot::channel();
adapter_tx
.send(adapter::Message::GetAllProducts { result_tx })
.await?;
Ok(serde_json::to_value(result_rx.await??)?)
}

async fn subscribe_price(
adapter_tx: &mpsc::Sender<adapter::Message>,
notify_price_tx: &mpsc::Sender<NotifyPrice>,
request: &Request<Method, Value>,
) -> Result<serde_json::Value> {
let params: SubscribePriceParams = serde_json::from_value(
request
.params
.clone()
.ok_or_else(|| anyhow!("Missing request parameters"))?,
)?;

let (result_tx, result_rx) = oneshot::channel();
adapter_tx
.send(adapter::Message::SubscribePrice {
result_tx,
account: params.account,
notify_price_tx: notify_price_tx.clone(),
})
.await?;

Ok(serde_json::to_value(SubscribeResult {
subscription: result_rx.await??,
})?)
}

async fn subscribe_price_sched(
adapter_tx: &mpsc::Sender<adapter::Message>,
notify_price_sched_tx: &mpsc::Sender<NotifyPriceSched>,
request: &Request<Method, Value>,
) -> Result<serde_json::Value> {
let params: SubscribePriceSchedParams = serde_json::from_value(
request
.params
.clone()
.ok_or_else(|| anyhow!("Missing request parameters"))?,
)?;

let (result_tx, result_rx) = oneshot::channel();
adapter_tx
.send(adapter::Message::SubscribePriceSched {
result_tx,
account: params.account,
notify_price_sched_tx: notify_price_sched_tx.clone(),
})
.await?;

Ok(serde_json::to_value(SubscribeResult {
subscription: result_rx.await??,
})?)
}

async fn update_price(
adapter_tx: &mpsc::Sender<adapter::Message>,
request: &Request<Method, Value>,
) -> Result<serde_json::Value> {
let params: UpdatePriceParams = serde_json::from_value(
request
.params
.clone()
.ok_or_else(|| anyhow!("Missing request parameters"))?,
)?;

adapter_tx
.send(adapter::Message::UpdatePrice {
account: params.account,
price: params.price,
conf: params.conf,
status: params.status,
})
.await?;

Ok(serde_json::to_value(0)?)
}
mod get_all_products;
mod get_product;
mod get_product_list;
mod subscribe_price;
mod subscribe_price_sched;
mod update_price;
use {
get_all_products::*,
get_product::*,
get_product_list::*,
subscribe_price::*,
subscribe_price_sched::*,
update_price::*,
};

async fn send_error(
ws_tx: &mut SplitSink<WebSocket, Message>,
Expand Down
18 changes: 18 additions & 0 deletions src/agent/pythd/api/rpc/get_all_products.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use {
crate::agent::pythd::adapter,
anyhow::Result,
tokio::sync::{
mpsc,
oneshot,
},
};

pub async fn get_all_products(
adapter_tx: &mpsc::Sender<adapter::Message>,
) -> Result<serde_json::Value> {
let (result_tx, result_rx) = oneshot::channel();
adapter_tx
.send(adapter::Message::GetAllProducts { result_tx })
.await?;
Ok(serde_json::to_value(result_rx.await??)?)
}
38 changes: 38 additions & 0 deletions src/agent/pythd/api/rpc/get_product.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use {
super::{
GetProductParams,
Method,
},
crate::agent::pythd::adapter,
anyhow::{
anyhow,
Result,
},
jrpc::{
Request,
Value,
},
tokio::sync::{
mpsc,
oneshot,
},
};

pub async fn get_product(
adapter_tx: &mpsc::Sender<adapter::Message>,
request: &Request<Method, Value>,
) -> Result<serde_json::Value> {
let params: GetProductParams = {
let value = request.params.clone();
serde_json::from_value(value.ok_or_else(|| anyhow!("Missing request parameters"))?)
}?;

let (result_tx, result_rx) = oneshot::channel();
adapter_tx
.send(adapter::Message::GetProduct {
account: params.account,
result_tx,
})
.await?;
Ok(serde_json::to_value(result_rx.await??)?)
}
18 changes: 18 additions & 0 deletions src/agent/pythd/api/rpc/get_product_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use {
crate::agent::pythd::adapter,
anyhow::Result,
tokio::sync::{
mpsc,
oneshot,
},
};

pub async fn get_product_list(
adapter_tx: &mpsc::Sender<adapter::Message>,
) -> Result<serde_json::Value> {
let (result_tx, result_rx) = oneshot::channel();
adapter_tx
.send(adapter::Message::GetProductList { result_tx })
.await?;
Ok(serde_json::to_value(result_rx.await??)?)
}
47 changes: 47 additions & 0 deletions src/agent/pythd/api/rpc/subscribe_price.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use {
super::{
Method,
NotifyPrice,
SubscribePriceParams,
SubscribeResult,
},
crate::agent::pythd::adapter,
anyhow::{
anyhow,
Result,
},
jrpc::{
Request,
Value,
},
tokio::sync::{
mpsc,
oneshot,
},
};

pub async fn subscribe_price(
adapter_tx: &mpsc::Sender<adapter::Message>,
notify_price_tx: &mpsc::Sender<NotifyPrice>,
request: &Request<Method, Value>,
) -> Result<serde_json::Value> {
let params: SubscribePriceParams = serde_json::from_value(
request
.params
.clone()
.ok_or_else(|| anyhow!("Missing request parameters"))?,
)?;

let (result_tx, result_rx) = oneshot::channel();
adapter_tx
.send(adapter::Message::SubscribePrice {
result_tx,
account: params.account,
notify_price_tx: notify_price_tx.clone(),
})
.await?;

Ok(serde_json::to_value(SubscribeResult {
subscription: result_rx.await??,
})?)
}
47 changes: 47 additions & 0 deletions src/agent/pythd/api/rpc/subscribe_price_sched.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use {
super::{
Method,
NotifyPriceSched,
SubscribePriceSchedParams,
SubscribeResult,
},
crate::agent::pythd::adapter,
anyhow::{
anyhow,
Result,
},
jrpc::{
Request,
Value,
},
tokio::sync::{
mpsc,
oneshot,
},
};

pub async fn subscribe_price_sched(
adapter_tx: &mpsc::Sender<adapter::Message>,
notify_price_sched_tx: &mpsc::Sender<NotifyPriceSched>,
request: &Request<Method, Value>,
) -> Result<serde_json::Value> {
let params: SubscribePriceSchedParams = serde_json::from_value(
request
.params
.clone()
.ok_or_else(|| anyhow!("Missing request parameters"))?,
)?;

let (result_tx, result_rx) = oneshot::channel();
adapter_tx
.send(adapter::Message::SubscribePriceSched {
result_tx,
account: params.account,
notify_price_sched_tx: notify_price_sched_tx.clone(),
})
.await?;

Ok(serde_json::to_value(SubscribeResult {
subscription: result_rx.await??,
})?)
}
39 changes: 39 additions & 0 deletions src/agent/pythd/api/rpc/update_price.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use {
super::{
Method,
UpdatePriceParams,
},
crate::agent::pythd::adapter,
anyhow::{
anyhow,
Result,
},
jrpc::{
Request,
Value,
},
tokio::sync::mpsc,
};

pub async fn update_price(
adapter_tx: &mpsc::Sender<adapter::Message>,
request: &Request<Method, Value>,
) -> Result<serde_json::Value> {
let params: UpdatePriceParams = serde_json::from_value(
request
.params
.clone()
.ok_or_else(|| anyhow!("Missing request parameters"))?,
)?;

adapter_tx
.send(adapter::Message::UpdatePrice {
account: params.account,
price: params.price,
conf: params.conf,
status: params.status,
})
.await?;

Ok(serde_json::to_value(0)?)
}

0 comments on commit 0d6b36a

Please sign in to comment.