Skip to content

Commit

Permalink
task processing timeout (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci authored Oct 26, 2023
1 parent 97026e5 commit f42d4ac
Showing 1 changed file with 128 additions and 99 deletions.
227 changes: 128 additions & 99 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ impl Client {
let tx = message_tx_bg.clone();
let request_backoff_counter = request_backoff_counter.clone();

// total timeout for a request
let task_timeout = request_timeout
.unwrap_or(Duration::from_secs(30))
// buffer 5 seconds for the request to be processed
.saturating_add(Duration::from_secs(5));

tokio::spawn(async move {
match message {
Message::Request {
Expand All @@ -176,61 +182,71 @@ impl Client {
return;
}

let result = ws.request(&method, params.clone()).await;
match result {
result @ Ok(_) => {
request_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed);
// make sure it's still connected
if response.is_closed() {
return;
if let Ok(result) =
tokio::time::timeout(task_timeout, ws.request(&method, params.clone())).await
{
match result {
result @ Ok(_) => {
request_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed);
// make sure it's still connected
if response.is_closed() {
return;
}
let _ = response.send(result);
}
let _ = response.send(result);
}
Err(err) => {
tracing::debug!("Request failed: {:?}", err);
match err {
Error::RequestTimeout
| Error::Transport(_)
| Error::RestartNeeded(_)
| Error::MaxSlotsExceeded => {
tokio::time::sleep(get_backoff_time(&request_backoff_counter)).await;

// make sure it's still connected
if response.is_closed() {
return;
}

// make sure we still have retries left
if retries == 0 {
let _ = response.send(Err(Error::RequestTimeout));
return;
Err(err) => {
tracing::debug!("Request failed: {:?}", err);
match err {
Error::RequestTimeout
| Error::Transport(_)
| Error::RestartNeeded(_)
| Error::MaxSlotsExceeded => {
tokio::time::sleep(get_backoff_time(&request_backoff_counter)).await;

// make sure it's still connected
if response.is_closed() {
return;
}

// make sure we still have retries left
if retries == 0 {
let _ = response.send(Err(Error::RequestTimeout));
return;
}

if matches!(err, Error::RequestTimeout) {
tx.send(Message::RotateEndpoint)
.await
.expect("Failed to send rotate message");
}

tx.send(Message::Request {
method,
params,
response,
retries,
})
.await
.expect("Failed to send request message");
}

if matches!(err, Error::RequestTimeout) {
tx.send(Message::RotateEndpoint)
.await
.expect("Failed to send rotate message");
err => {
// make sure it's still connected
if response.is_closed() {
return;
}
// not something we can handle, send it back to the caller
let _ = response.send(Err(err));
}

tx.send(Message::Request {
method,
params,
response,
retries,
})
.await
.expect("Failed to send request message");
}
err => {
// make sure it's still connected
if response.is_closed() {
return;
}
// not something we can handle, send it back to the caller
let _ = response.send(Err(err));
}
}
}
} else {
tracing::error!("request timed out method: {} params: {:?}", method, params);
// make sure it's still connected
if response.is_closed() {
return;
}
let _ = response.send(Err(Error::RequestTimeout));
}
}
Message::Subscribe {
Expand All @@ -242,62 +258,75 @@ impl Client {
} => {
retries = retries.saturating_sub(1);

let result = ws.subscribe(&subscribe, params.clone(), &unsubscribe).await;
match result {
result @ Ok(_) => {
request_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed);
// make sure it's still connected
if response.is_closed() {
return;
if let Ok(result) = tokio::time::timeout(
task_timeout,
ws.subscribe(&subscribe, params.clone(), &unsubscribe),
)
.await
{
match result {
result @ Ok(_) => {
request_backoff_counter.store(0, std::sync::atomic::Ordering::Relaxed);
// make sure it's still connected
if response.is_closed() {
return;
}
let _ = response.send(result);
}
let _ = response.send(result);
}
Err(err) => {
tracing::debug!("Subscribe failed: {:?}", err);
match err {
Error::RequestTimeout
| Error::Transport(_)
| Error::RestartNeeded(_)
| Error::MaxSlotsExceeded => {
tokio::time::sleep(get_backoff_time(&request_backoff_counter)).await;

// make sure it's still connected
if response.is_closed() {
return;
Err(err) => {
tracing::debug!("Subscribe failed: {:?}", err);
match err {
Error::RequestTimeout
| Error::Transport(_)
| Error::RestartNeeded(_)
| Error::MaxSlotsExceeded => {
tokio::time::sleep(get_backoff_time(&request_backoff_counter)).await;

// make sure it's still connected
if response.is_closed() {
return;
}

// make sure we still have retries left
if retries == 0 {
let _ = response.send(Err(Error::RequestTimeout));
return;
}

if matches!(err, Error::RequestTimeout) {
tx.send(Message::RotateEndpoint)
.await
.expect("Failed to send rotate message");
}

tx.send(Message::Subscribe {
subscribe,
params,
unsubscribe,
response,
retries,
})
.await
.expect("Failed to send subscribe message")
}

// make sure we still have retries left
if retries == 0 {
let _ = response.send(Err(Error::RequestTimeout));
return;
}

if matches!(err, Error::RequestTimeout) {
tx.send(Message::RotateEndpoint)
.await
.expect("Failed to send rotate message");
err => {
// make sure it's still connected
if response.is_closed() {
return;
}
// not something we can handle, send it back to the caller
let _ = response.send(Err(err));
}

tx.send(Message::Subscribe {
subscribe,
params,
unsubscribe,
response,
retries,
})
.await
.expect("Failed to send subscribe message")
}
err => {
// make sure it's still connected
if response.is_closed() {
return;
}
// not something we can handle, send it back to the caller
let _ = response.send(Err(err));
}
}
}
} else {
tracing::error!("subscribe timed out subscribe: {} params: {:?}", subscribe, params);
// make sure it's still connected
if response.is_closed() {
return;
}
let _ = response.send(Err(Error::RequestTimeout));
}
}
Message::RotateEndpoint => {
Expand Down

0 comments on commit f42d4ac

Please sign in to comment.