From 587443bf7c99c49e1ffa989ae9630557a168333a Mon Sep 17 00:00:00 2001 From: rumblefrog Date: Mon, 9 May 2022 19:23:05 -0400 Subject: [PATCH] fix: retry on io broken pipe & additive retry duration --- src/client.rs | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/client.rs b/src/client.rs index 595a985..9427e83 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::convert::TryInto; use std::str::FromStr; use std::sync::Arc; +use std::error::Error as StdError; use std::time::{Duration, Instant}; use bytes::Bytes; @@ -459,7 +460,7 @@ impl ErrorHandler for Attempter { type OutError = Error; fn handle(&mut self, _attempt: usize, e: Error) -> RetryPolicy { - if self.attempts == self.max_attempts { + if self.attempts >= self.max_attempts { debug!( "Reached max attempts of {}, forwarding error", &self.max_attempts @@ -467,22 +468,39 @@ impl ErrorHandler for Attempter { return RetryPolicy::ForwardError(e); } + // Check if the error is io::ErrorKind::BrokenPipe + let mut source = e.source(); + let mut is_broken_pipe = false; + + while let Some(err_source) = source { + if let Some(io_error) = err_source.downcast_ref::() { + if io_error.kind() == std::io::ErrorKind::BrokenPipe { + is_broken_pipe = true; + break; + } + } + + source = err_source.source(); + } + // https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1 - if !self.method.is_idempotent() { + if !self.method.is_idempotent() && !e.is_connect() && !is_broken_pipe { debug!("Request method is non-idempotent, forwarding error"); return RetryPolicy::ForwardError(e); } self.attempts += 1; + let retry_duration = RETRY_DURATION * self.attempts as u32; + match e { _ if e.is_connect() => { debug!("Request connection error, retrying"); - RetryPolicy::WaitRetry(RETRY_DURATION) + RetryPolicy::WaitRetry(retry_duration) } _ if e.is_timeout() => { debug!("Request timeout error, retrying"); - RetryPolicy::WaitRetry(RETRY_DURATION) + RetryPolicy::WaitRetry(retry_duration) } _ if e.is_status() => { let status = e.status().unwrap(); @@ -491,7 +509,7 @@ impl ErrorHandler for Attempter { match status.as_u16() { 408 | 413 | 429 | 500 | 502 | 503 | 504 | 521 | 522 | 524 => { debug!("Request status error: {}, retrying", &status); - RetryPolicy::WaitRetry(RETRY_DURATION) + RetryPolicy::WaitRetry(retry_duration) } _ => { debug!("Request status error: {}, forwarding error", &status); @@ -502,7 +520,7 @@ impl ErrorHandler for Attempter { _ => { debug!("Request error: {}, retrying", &e); - RetryPolicy::WaitRetry(RETRY_DURATION) + RetryPolicy::WaitRetry(retry_duration) } } }