Skip to content

Commit

Permalink
fix: retry on io broken pipe & additive retry duration
Browse files Browse the repository at this point in the history
  • Loading branch information
rumblefrog committed May 9, 2022
1 parent 5f8be7b commit 587443b
Showing 1 changed file with 24 additions and 6 deletions.
30 changes: 24 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::convert::TryInto;
use std::str::FromStr;
use std::sync::Arc;
use std::error::Error as StdError;
use std::time::{Duration, Instant};

use bytes::Bytes;
Expand Down Expand Up @@ -459,30 +460,47 @@ impl ErrorHandler<Error> for Attempter {
type OutError = Error;

fn handle(&mut self, _attempt: usize, e: Error) -> RetryPolicy<Self::OutError> {
if self.attempts == self.max_attempts {
if self.attempts >= self.max_attempts {
debug!(
"Reached max attempts of {}, forwarding error",
&self.max_attempts
);
return RetryPolicy::ForwardError(e);
}

// Check if the error is io::ErrorKind::BrokenPipe
let mut source = e.source();
let mut is_broken_pipe = false;

while let Some(err_source) = source {
if let Some(io_error) = err_source.downcast_ref::<std::io::Error>() {
if io_error.kind() == std::io::ErrorKind::BrokenPipe {
is_broken_pipe = true;
break;
}
}

source = err_source.source();
}

// https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1
if !self.method.is_idempotent() {
if !self.method.is_idempotent() && !e.is_connect() && !is_broken_pipe {
debug!("Request method is non-idempotent, forwarding error");
return RetryPolicy::ForwardError(e);
}

self.attempts += 1;

let retry_duration = RETRY_DURATION * self.attempts as u32;

match e {
_ if e.is_connect() => {
debug!("Request connection error, retrying");
RetryPolicy::WaitRetry(RETRY_DURATION)
RetryPolicy::WaitRetry(retry_duration)
}
_ if e.is_timeout() => {
debug!("Request timeout error, retrying");
RetryPolicy::WaitRetry(RETRY_DURATION)
RetryPolicy::WaitRetry(retry_duration)
}
_ if e.is_status() => {
let status = e.status().unwrap();
Expand All @@ -491,7 +509,7 @@ impl ErrorHandler<Error> for Attempter {
match status.as_u16() {
408 | 413 | 429 | 500 | 502 | 503 | 504 | 521 | 522 | 524 => {
debug!("Request status error: {}, retrying", &status);
RetryPolicy::WaitRetry(RETRY_DURATION)
RetryPolicy::WaitRetry(retry_duration)
}
_ => {
debug!("Request status error: {}, forwarding error", &status);
Expand All @@ -502,7 +520,7 @@ impl ErrorHandler<Error> for Attempter {

_ => {
debug!("Request error: {}, retrying", &e);
RetryPolicy::WaitRetry(RETRY_DURATION)
RetryPolicy::WaitRetry(retry_duration)
}
}
}
Expand Down

0 comments on commit 587443b

Please sign in to comment.