From 6bae41236f2af646f7989b92098bd306d9d99607 Mon Sep 17 00:00:00 2001 From: nardor Date: Sun, 17 Nov 2024 23:52:23 +0100 Subject: [PATCH] refactor(backend::rest): use backon to implement retry on RestBackend --- crates/backend/src/rest.rs | 286 ++++++++++++++++++++----------------- 1 file changed, 153 insertions(+), 133 deletions(-) diff --git a/crates/backend/src/rest.rs b/crates/backend/src/rest.rs index 2293afa6..4774149a 100644 --- a/crates/backend/src/rest.rs +++ b/crates/backend/src/rest.rs @@ -1,11 +1,10 @@ use std::str::FromStr; use std::time::Duration; -use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder}; use bytes::Bytes; use log::{trace, warn}; use reqwest::{ - blocking::{Client, ClientBuilder, Response}, + blocking::{Client, ClientBuilder}, header::{HeaderMap, HeaderValue}, Url, }; @@ -28,79 +27,97 @@ pub(super) mod constants { pub(super) const DEFAULT_TIMEOUT: Duration = Duration::from_secs(600); } -// trait CheckError to add user-defined method check_error on Response -pub(crate) trait CheckError { - /// Check reqwest Response for error and treat errors as permanent or transient - fn check_error(self) -> Result>; -} +mod backon_extension { + use std::time::Duration; -impl CheckError for Response { - /// Check reqwest Response for error and treat errors as permanent or transient - /// - /// # Errors - /// - /// If the response is an error, it will return an error of type Error - /// - /// # Returns - /// - /// The response if it is not an error - fn check_error(self) -> Result> { - match self.error_for_status() { - Ok(t) => Ok(t), - // Note: status() always give Some(_) as it is called from a Response - Err(err) if err.status().unwrap().is_client_error() => { - Err(backoff::Error::Permanent(err)) + use backon::{BlockingRetryable, ExponentialBuilder}; + + use super::constants; + + /// Trait to implement on error types to combine with [`LimitRetry::retry_notify`]. + pub(super) trait NotifyWhenRetry { + fn when_retry(&self) -> bool { + // by default always retry + true + } + #[allow(unused_variables)] + fn notify(&self, dur: Duration) {} + } + + /// A backon::backoff extension that limits the number of retries + #[derive(Debug)] + pub struct LimitRetry { + max_retries: usize, + } + + impl Default for LimitRetry { + fn default() -> Self { + Self { + max_retries: constants::DEFAULT_RETRY, } - Err(err) => Err(backoff::Error::Transient { - err, - retry_after: None, - }), } } -} -/// A backoff implementation that limits the number of retries -#[derive(Clone, Debug)] -struct LimitRetryBackoff { - /// The maximum number of retries - max_retries: usize, - /// The current number of retries - retries: usize, - /// The exponential backoff - exp: ExponentialBackoff, -} + /// We need to impl [`Clone`] manually because [backon::ExponentialBuilder] doesn't. + impl Clone for LimitRetry { + fn clone(&self) -> Self { + Self { + max_retries: self.max_retries, + } + } + } + + impl LimitRetry { + pub fn new(max_retries: usize) -> Self { + Self { max_retries } + } + + pub fn set_max_retries(&mut self, max_retries: usize) { + self.max_retries = max_retries; + } -impl Default for LimitRetryBackoff { - fn default() -> Self { - Self { - max_retries: constants::DEFAULT_RETRY, - retries: 0, - exp: ExponentialBackoffBuilder::new() - .with_max_elapsed_time(None) // no maximum elapsed time; we count number of retires - .build(), + fn builder(&self) -> ExponentialBuilder { + // backon doesn't allow us to specify `None` for `max_delay` + // see + ExponentialBuilder::default() + .with_max_delay(Duration::MAX) // no maximum elapsed time; we count number of retries + .with_max_times(self.max_retries) + } + + pub fn retry_notify(&self, op: F) -> Result + where + F: FnMut() -> Result, + E: NotifyWhenRetry, + { + let mut retry = op.retry(self.builder()); + retry = retry.notify(E::notify); + retry = retry.when(E::when_retry); + retry.call() } } } -impl Backoff for LimitRetryBackoff { - /// Returns the next backoff duration. +impl backon_extension::NotifyWhenRetry for reqwest::Error { + /// Heuristic to decide if the error could be recovered by retrying or not. /// - /// # Notes + /// If the error could be recovered by a retry: return `true`. /// - /// If the number of retries exceeds the maximum number of retries, it returns None. - fn next_backoff(&mut self) -> Option { - self.retries += 1; - if self.retries > self.max_retries { - None - } else { - self.exp.next_backoff() + /// Else return `false` and the combined backoff will stop early. + fn when_retry(&self) -> bool { + match self.status() { + Some(status_code) => !status_code.is_client_error(), // do no retry if client error + None => true, // else retry } } - /// Resets the backoff to the initial state. - fn reset(&mut self) { - self.retries = 0; - self.exp.reset(); + /// Notify function for backon in case of error + /// + /// # Arguments + /// + /// * `err` - The error that occurred + /// * `duration` - The duration of the backoff + fn notify(&self, duration: Duration) { + warn!("Error {self} at {duration:?}, retrying"); } } @@ -112,20 +129,7 @@ pub struct RestBackend { /// The client to use. client: Client, /// The backoff implementation to use. - backoff: LimitRetryBackoff, -} - -/// Notify function for backoff in case of error -/// -/// # Arguments -/// -/// * `err` - The error that occurred -/// * `duration` - The duration of the backoff -// We need to pass the error by value to satisfy the signature of the notify function -// for handling errors in backoff -#[allow(clippy::needless_pass_by_value)] -fn notify(err: reqwest::Error, duration: Duration) { - warn!("Error {err} at {duration:?}, retrying"); + retry_handler: backon_extension::LimitRetry, } impl RestBackend { @@ -169,7 +173,8 @@ impl RestBackend { .map_err(|err| { RusticError::with_source(ErrorKind::Backend, "Failed to build HTTP client", err) })?; - let mut backoff = LimitRetryBackoff::default(); + + let mut backoff_generator = backon_extension::LimitRetry::default(); // FIXME: If we have multiple times the same option, this could lead to unexpected behavior for (option, value) in options { @@ -187,7 +192,7 @@ impl RestBackend { .attach_context("option", "retry") })?, }; - backoff.max_retries = max_retries; + backoff_generator.set_max_retries(max_retries); } else if option == "timeout" { let timeout = humantime::Duration::from_str(&value).map_err(|err| { RusticError::with_source( @@ -215,7 +220,7 @@ impl RestBackend { Ok(Self { url, client, - backoff, + retry_handler: backoff_generator, }) } @@ -299,9 +304,8 @@ impl ReadBackend for RestBackend { .attach_context("tpe_dir", tpe.dirname().to_string()) })?; - backoff::retry_notify( - self.backoff.clone(), - || { + self.retry_handler + .retry_notify::<_, _, reqwest::Error>(|| { if tpe == FileType::Config { return Ok( if self.client.head(url.clone()).send()?.status().is_success() { @@ -317,9 +321,10 @@ impl ReadBackend for RestBackend { .get(url.clone()) .header("Accept", "application/vnd.x.restic.rest.v2") .send()? - .check_error()? + .error_for_status()? .json::>>()? // use Option to be handle null json value .unwrap_or_default(); + Ok(list .into_iter() .filter_map(|i| match i.name.parse::() { @@ -327,10 +332,14 @@ impl ReadBackend for RestBackend { Err(_) => None, }) .collect()) - }, - notify, - ) - .map_err(construct_backoff_error) + }) + .map_err(|err| { + RusticError::with_source( + ErrorKind::Backend, + "Backoff failed, please check the logs for more information.", + err, + ) + }) } /// Returns the content of a file. @@ -351,19 +360,22 @@ impl ReadBackend for RestBackend { .url(tpe, id) .map_err(|err| construct_join_url_error(err, tpe, id, &self.url))?; - backoff::retry_notify( - self.backoff.clone(), - || { + self.retry_handler + .retry_notify::<_, _, reqwest::Error>(|| { Ok(self .client .get(url.clone()) .send()? - .check_error()? + .error_for_status()? .bytes()?) - }, - notify, - ) - .map_err(construct_backoff_error) + }) + .map_err(|err| { + RusticError::with_source( + ErrorKind::Backend, + "Backoff failed, please check the logs for more information.", + err, + ) + }) } /// Returns a part of the content of a file. @@ -398,31 +410,26 @@ impl ReadBackend for RestBackend { .attach_context("id", id.to_string()) })?; - backoff::retry_notify( - self.backoff.clone(), - || { + self.retry_handler + .retry_notify::<_, _, reqwest::Error>(|| { Ok(self .client .get(url.clone()) .header("Range", header_value.clone()) .send()? - .check_error()? + .error_for_status()? .bytes()?) - }, - notify, - ) - .map_err(construct_backoff_error) + }) + .map_err(|err| { + RusticError::with_source( + ErrorKind::Backend, + "Backoff failed, please check the logs for more information.", + err, + ) + }) } } -fn construct_backoff_error(err: backoff::Error) -> Box { - RusticError::with_source( - ErrorKind::Backend, - "Backoff failed, please check the logs for more information.", - err, - ) -} - fn construct_join_url_error( err: JoiningUrlFailedError, tpe: FileType, @@ -453,15 +460,18 @@ impl WriteBackend for RestBackend { .attach_context("join_input", "?create=true") })?; - backoff::retry_notify( - self.backoff.clone(), - || { - _ = self.client.post(url.clone()).send()?.check_error()?; + self.retry_handler + .retry_notify::<_, _, reqwest::Error>(|| { + _ = self.client.post(url.clone()).send()?.error_for_status()?; Ok(()) - }, - notify, - ) - .map_err(construct_backoff_error) + }) + .map_err(|err| { + RusticError::with_source( + ErrorKind::Backend, + "Backoff failed, please check the logs for more information.", + err, + ) + }) } /// Writes bytes to the given file. @@ -492,16 +502,23 @@ impl WriteBackend for RestBackend { ) .body(buf); - backoff::retry_notify( - self.backoff.clone(), - || { + self.retry_handler + .retry_notify::<_, _, reqwest::Error>(|| { // Note: try_clone() always gives Some(_) as the body is Bytes which is cloneable - _ = req_builder.try_clone().unwrap().send()?.check_error()?; + _ = req_builder + .try_clone() + .unwrap() + .send()? + .error_for_status()?; Ok(()) - }, - notify, - ) - .map_err(construct_backoff_error) + }) + .map_err(|err| { + RusticError::with_source( + ErrorKind::Backend, + "Backoff failed, please check the logs for more information.", + err, + ) + }) } /// Removes the given file. @@ -521,14 +538,17 @@ impl WriteBackend for RestBackend { .url(tpe, id) .map_err(|err| construct_join_url_error(err, tpe, id, &self.url))?; - backoff::retry_notify( - self.backoff.clone(), - || { - _ = self.client.delete(url.clone()).send()?.check_error()?; + self.retry_handler + .retry_notify::<_, _, reqwest::Error>(|| { + _ = self.client.delete(url.clone()).send()?.error_for_status()?; Ok(()) - }, - notify, - ) - .map_err(construct_backoff_error) + }) + .map_err(|err| { + RusticError::with_source( + ErrorKind::Backend, + "Backoff failed, please check the logs for more information.", + err, + ) + }) } }