Skip to content

Commit

Permalink
fix error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
xlc committed Jan 31, 2023
1 parent 1026a51 commit bbfe2d4
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 16 deletions.
1 change: 1 addition & 0 deletions set-log.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export RUST_LOG=trace,rustls=warn,soketto=warn,hyper=warn,jsonrpsee_server::transport=debug,mio=warn
108 changes: 98 additions & 10 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ enum Message {
unsubscribe: String,
response: tokio::sync::oneshot::Sender<Result<Subscription<JsonValue>, Error>>,
},
RotateEndpoint,
}

impl Client {
Expand All @@ -48,7 +49,13 @@ impl Client {

let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(100);

let tx2 = tx.clone();

let (disconnect_tx, mut disconnect_rx) = tokio::sync::mpsc::channel::<()>(10);

tokio::spawn(async move {
let tx = tx2;

let current_endpoint = AtomicUsize::new(0);

let build_ws = || async {
Expand All @@ -60,15 +67,28 @@ impl Client {
log::debug!("Connecting to endpoint: {}", url);

WsClientBuilder::default()
.request_timeout(std::time::Duration::from_secs(60))
.connection_timeout(std::time::Duration::from_secs(30))
.request_timeout(std::time::Duration::from_secs(30))
.connection_timeout(std::time::Duration::from_secs(10))
.max_notifs_per_subscription(1024)
.build(url)
};

let disconnect_tx = disconnect_tx.clone();

loop {
match build().await {
Ok(ws) => break Arc::new(ws),
Ok(ws) => {
let ws = Arc::new(ws);
let ws2 = ws.clone();

tokio::spawn(async move {
ws2.on_disconnect().await;
if let Err(e) = disconnect_tx.send(()).await {
log::debug!("Unable to send disconnect: {}", e);
}
});
break ws;
}
Err(e) => {
log::debug!("Unable to connect to endpoint: {}", e);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
Expand All @@ -80,17 +100,71 @@ impl Client {
let mut ws = build_ws().await;

let handle_message = |message: Message, ws: Arc<WsClient>| {
let tx = tx.clone();

tokio::spawn(async move {
match message {
Message::Request {
method,
params,
response,
} => {
let result = ws.request(&method, params);
if let Err(e) = response.send(result.await) {
// TODO: retry error / timeout
log::warn!("Failed to send response: {:?}", e);
let result = ws.request(&method, params.clone()).await;
match result {
result @ Ok(_) => {
if let Err(e) = response.send(result) {
log::warn!("Failed to send response: {:?}", e);
}
}
Err(err) => {
log::debug!("Request failed: {:?}", err);
match err {
Error::RequestTimeout => {
if let Err(e) = tx.send(Message::RotateEndpoint).await {
log::warn!(
"Failed to send rotate message: {:?}",
e
);
}
if let Err(e) = tx
.send(Message::Request {
method,
params,
response,
})
.await
{
log::warn!(
"Failed to send request message: {:?}",
e
);
}
}
Error::Transport(_)
| Error::RestartNeeded(_)
| Error::Internal(_) => {
if let Err(e) = tx
.send(Message::Request {
method,
params,
response,
})
.await
{
log::warn!(
"Failed to send request message: {:?}",
e
);
}
}
err => {
// not something we can handle, send it back to the caller
if let Err(e) = response.send(Err(err)) {
log::warn!("Failed to send response: {:?}", e);
}
}
}
}
}
}
Message::Subscribe {
Expand All @@ -99,24 +173,31 @@ impl Client {
unsubscribe,
response,
} => {
let result = ws.subscribe(&subscribe, params, &unsubscribe);
if let Err(e) = response.send(result.await) {
let result = ws.subscribe(&subscribe, params, &unsubscribe).await;
if let Err(e) = response.send(result) {
// TODO: retry error / timeout
log::warn!("Failed to send response: {:?}", e);
}
}
Message::RotateEndpoint => {
unreachable!()
}
}
});
};

loop {
tokio::select! {
_ = ws.on_disconnect() => {
_ = disconnect_rx.recv() => {
log::debug!("Disconnected from endpoint");
ws = build_ws().await;
}
message = rx.recv() => {
log::info!("Received message {message:?}");
match message {
Some(Message::RotateEndpoint) => {
ws = build_ws().await;
}
Some(message) => handle_message(message, ws.clone()),
None => {
log::debug!("Client dropped");
Expand Down Expand Up @@ -166,6 +247,13 @@ impl Client {

rx.await.map_err(|e| CallError::Failed(e.into()))?
}

pub async fn rotate_endpoint(&self) -> Result<(), ()> {
self.sender
.send(Message::RotateEndpoint)
.await
.map_err(|_| ())
}
}

pub async fn create_client(config: &Config) -> anyhow::Result<Client> {
Expand Down
70 changes: 67 additions & 3 deletions src/client/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{net::SocketAddr, str::FromStr};
use std::{net::SocketAddr, str::FromStr, time::Duration};

use crate::enable_logger;

use super::*;

Expand All @@ -15,6 +17,8 @@ async fn dummy_server() -> (
mpsc::Receiver<(JsonValue, oneshot::Sender<JsonValue>)>,
mpsc::Receiver<(JsonValue, SubscriptionSink)>,
) {
enable_logger();

let mut module = RpcModule::new(());

let (tx, rx) = mpsc::channel::<(JsonValue, oneshot::Sender<JsonValue>)>(100);
Expand Down Expand Up @@ -77,7 +81,7 @@ async fn basic_request() {
assert_eq!(result.to_string(), "[1]");

handle.stop().unwrap();
tokio::join!(handler).0.unwrap();
let _ = tokio::join!(handler);
}

#[tokio::test]
Expand Down Expand Up @@ -107,5 +111,65 @@ async fn basic_subscription() {
assert_eq!(result, ["10", "11", "12"]);

handle.stop().unwrap();
tokio::join!(handler).0.unwrap();
let _ = tokio::join!(handler);
}

#[tokio::test]
async fn multiple_endpoints() {
// create 3 dummy servers
let (addr1, handle1, rx1, _) = dummy_server().await;
let (addr2, handle2, rx2, _) = dummy_server().await;
let (addr3, handle3, rx3, _) = dummy_server().await;

let client = Client::new(&[
format!("ws://{addr1}"),
format!("ws://{addr2}"),
format!("ws://{addr3}"),
])
.await
.unwrap();

let handle_requests = |mut rx: mpsc::Receiver<(JsonValue, oneshot::Sender<JsonValue>)>, n: u32| {
tokio::spawn(async move {
loop {
if let Some((_, resp_tx)) = rx.recv().await {
resp_tx.send(JsonValue::Number(n.into())).unwrap();
} else {
break;
}
}
})
};

let handler1 = handle_requests(rx1, 1);
let handler2 = handle_requests(rx2, 2);
let handler3 = handle_requests(rx3, 3);

let result = client.request("mock_rpc", Params::new(Some("[11]"))).await.unwrap();

assert_eq!(result.to_string(), "1");

handle1.stop().unwrap();

let result = client.request("mock_rpc", Params::new(Some("[22]"))).await.unwrap();

assert_eq!(result.to_string(), "2");

client.rotate_endpoint().await.unwrap();

tokio::time::sleep(Duration::from_millis(100)).await;

let result = client.request("mock_rpc", Params::new(Some("[33]"))).await.unwrap();

assert_eq!(result.to_string(), "3");

handle3.stop().unwrap();

let result = client.request("mock_rpc", Params::new(Some("[44]"))).await.unwrap();

assert_eq!(result.to_string(), "2");

handle2.stop().unwrap();

let _ = tokio::join!(handler1, handler2, handler3);
}
5 changes: 2 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ mod middleware;
mod server;

fn enable_logger() {
tracing_subscriber::FmtSubscriber::builder()
let _ = tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(
tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing::Level::INFO.into())
.from_env_lossy(),
)
.try_init()
.expect("setting default subscriber failed");
.try_init();
}

#[tokio::main]
Expand Down

0 comments on commit bbfe2d4

Please sign in to comment.