Skip to content

Commit

Permalink
suggestion to simplify code
Browse files Browse the repository at this point in the history
  • Loading branch information
aawsome committed Nov 18, 2024
1 parent b69bc7b commit 772aac4
Showing 1 changed file with 92 additions and 171 deletions.
263 changes: 92 additions & 171 deletions crates/backend/src/rest.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::str::FromStr;
use std::time::Duration;

use backon::{BlockingRetryable, ExponentialBuilder};
use bytes::Bytes;
use log::{trace, warn};
use reqwest::{
Expand All @@ -27,100 +28,6 @@ pub(super) mod constants {
pub(super) const DEFAULT_TIMEOUT: Duration = Duration::from_secs(600);
}

mod backon_extension {
use std::time::Duration;

use backon::{BlockingRetryable, ExponentialBuilder};

use super::constants;

/// Trait to implement on error types to combine with [`LimitRetry::retry_notify`].
pub(super) trait NotifyWhenRetry {
fn when_retry(&self) -> bool {
// by default always retry
true
}
#[allow(unused_variables)]
fn notify(&self, dur: Duration) {}
}

/// A `backon::backoff` extension that limits the number of retries
#[derive(Debug)]
pub struct LimitRetry {
max_retries: usize,
}

impl Default for LimitRetry {
fn default() -> Self {
Self {
max_retries: constants::DEFAULT_RETRY,
}
}
}

/// We need to impl [`Clone`] manually because [`backon::ExponentialBuilder`] doesn't.
impl Clone for LimitRetry {
fn clone(&self) -> Self {
Self {
max_retries: self.max_retries,
}
}
}

impl LimitRetry {
pub fn new(max_retries: usize) -> Self {
Self { max_retries }
}

pub fn set_max_retries(&mut self, max_retries: usize) {
self.max_retries = max_retries;
}

fn builder(&self) -> ExponentialBuilder {
// backon doesn't allow us to specify `None` for `max_delay`
// see <https://github.com/Xuanwo/backon/pull/160>
ExponentialBuilder::default()
.with_max_delay(Duration::MAX) // no maximum elapsed time; we count number of retries
.with_max_times(self.max_retries)
}

pub fn retry_notify<F, T, E>(&self, op: F) -> Result<T, E>
where
F: FnMut() -> Result<T, E>,
E: NotifyWhenRetry,
{
let mut retry = op.retry(self.builder());
retry = retry.notify(E::notify);
retry = retry.when(E::when_retry);
retry.call()
}
}
}

impl backon_extension::NotifyWhenRetry for reqwest::Error {
/// Heuristic to decide if the error could be recovered by retrying or not.
///
/// If the error could be recovered by a retry: return `true`.
///
/// Else return `false` and the combined backoff will stop early.
fn when_retry(&self) -> bool {
self.status().map_or(
true, // retry
|status_code| !status_code.is_client_error(), // do not retry if `is_client_error`
)
}

/// Notify function for backon in case of error
///
/// # Arguments
///
/// * `err` - The error that occurred
/// * `duration` - The duration of the backoff
fn notify(&self, duration: Duration) {
warn!("Error {self} at {duration:?}, retrying");
}
}

fn construct_backoff_error(err: reqwest::Error) -> Box<RusticError> {
RusticError::with_source(
ErrorKind::Backend,
Expand All @@ -136,11 +43,27 @@ pub struct RestBackend {
url: Url,
/// The client to use.
client: Client,
/// The backoff implementation to use.
retry_handler: backon_extension::LimitRetry,
/// The ``BackoffBuilder`` we use
backoff: ExponentialBuilder,
}

impl RestBackend {
// call the given operation retrying non-permanent errors and giving warnings for failed operations
fn retry_notify<F, T>(&self, op: F) -> Result<T, reqwest::Error>
where
F: FnMut() -> Result<T, reqwest::Error>,
{
op.retry(self.backoff)
.notify(|err, duration| warn!("Error {err} at {duration:?}, retrying"))
.when(|err| {
err.status().map_or(
true, // retry
|status_code| !status_code.is_client_error(), // do not retry if `is_client_error`
)
})
.call()
}

/// Create a new [`RestBackend`] from a given url.
///
/// # Arguments
Expand Down Expand Up @@ -182,7 +105,11 @@ impl RestBackend {
RusticError::with_source(ErrorKind::Backend, "Failed to build HTTP client", err)
})?;

let mut backoff_generator = backon_extension::LimitRetry::default();
// backon doesn't allow us to specify `None` for `max_delay`
// see <https://github.com/Xuanwo/backon/pull/160>
let mut backoff = ExponentialBuilder::default()
.with_max_delay(Duration::MAX) // no maximum elapsed time; we count number of retries
.with_max_times(constants::DEFAULT_RETRY);

// FIXME: If we have multiple times the same option, this could lead to unexpected behavior
for (option, value) in options {
Expand All @@ -200,7 +127,7 @@ impl RestBackend {
.attach_context("option", "retry")
})?,
};
backoff_generator.set_max_retries(max_retries);
backoff = backoff.with_max_times(max_retries);
} else if option == "timeout" {
let timeout = humantime::Duration::from_str(&value).map_err(|err| {
RusticError::with_source(
Expand Down Expand Up @@ -228,7 +155,7 @@ impl RestBackend {
Ok(Self {
url,
client,
retry_handler: backoff_generator,
backoff,
})
}

Expand Down Expand Up @@ -312,36 +239,35 @@ impl ReadBackend for RestBackend {
.attach_context("tpe_dir", tpe.dirname().to_string())
})?;

self.retry_handler
.retry_notify::<_, _, reqwest::Error>(|| {
if tpe == FileType::Config {
return Ok(
if self.client.head(url.clone()).send()?.status().is_success() {
vec![(Id::default(), 0)]
} else {
Vec::new()
},
);
}

let list = self
.client
.get(url.clone())
.header("Accept", "application/vnd.x.restic.rest.v2")
.send()?
.error_for_status()?
.json::<Option<Vec<ListEntry>>>()? // use Option to be handle null json value
.unwrap_or_default();

Ok(list
.into_iter()
.filter_map(|i| match i.name.parse::<Id>() {
Ok(id) => Some((id, i.size)),
Err(_) => None,
})
.collect())
})
.map_err(construct_backoff_error)
self.retry_notify(|| {
if tpe == FileType::Config {
return Ok(
if self.client.head(url.clone()).send()?.status().is_success() {
vec![(Id::default(), 0)]
} else {
Vec::new()
},
);
}

let list = self
.client
.get(url.clone())
.header("Accept", "application/vnd.x.restic.rest.v2")
.send()?
.error_for_status()?
.json::<Option<Vec<ListEntry>>>()? // use Option to be handle null json value
.unwrap_or_default();

Ok(list
.into_iter()
.filter_map(|i| match i.name.parse::<Id>() {
Ok(id) => Some((id, i.size)),
Err(_) => None,
})
.collect())
})
.map_err(construct_backoff_error)
}

/// Returns the content of a file.
Expand All @@ -362,15 +288,14 @@ impl ReadBackend for RestBackend {
.url(tpe, id)
.map_err(|err| construct_join_url_error(err, tpe, id, &self.url))?;

self.retry_handler
.retry_notify(|| {
self.client
.get(url.clone())
.send()?
.error_for_status()?
.bytes()
})
.map_err(construct_backoff_error)
self.retry_notify(|| {
self.client
.get(url.clone())
.send()?
.error_for_status()?
.bytes()
})
.map_err(construct_backoff_error)
}

/// Returns a part of the content of a file.
Expand Down Expand Up @@ -405,16 +330,15 @@ impl ReadBackend for RestBackend {
.attach_context("id", id.to_string())
})?;

self.retry_handler
.retry_notify(|| {
self.client
.get(url.clone())
.header("Range", header_value.clone())
.send()?
.error_for_status()?
.bytes()
})
.map_err(construct_backoff_error)
self.retry_notify(|| {
self.client
.get(url.clone())
.header("Range", header_value.clone())
.send()?
.error_for_status()?
.bytes()
})
.map_err(construct_backoff_error)
}
}

Expand Down Expand Up @@ -448,12 +372,11 @@ impl WriteBackend for RestBackend {
.attach_context("join_input", "?create=true")
})?;

self.retry_handler
.retry_notify::<_, _, reqwest::Error>(|| {
_ = self.client.post(url.clone()).send()?.error_for_status()?;
Ok(())
})
.map_err(construct_backoff_error)
self.retry_notify(|| {
_ = self.client.post(url.clone()).send()?.error_for_status()?;
Ok(())
})
.map_err(construct_backoff_error)
}

/// Writes bytes to the given file.
Expand Down Expand Up @@ -484,17 +407,16 @@ impl WriteBackend for RestBackend {
)
.body(buf);

self.retry_handler
.retry_notify::<_, _, reqwest::Error>(|| {
// Note: try_clone() always gives Some(_) as the body is Bytes which is cloneable
_ = req_builder
.try_clone()
.unwrap()
.send()?
.error_for_status()?;
Ok(())
})
.map_err(construct_backoff_error)
self.retry_notify(|| {
// Note: try_clone() always gives Some(_) as the body is Bytes which is cloneable
_ = req_builder
.try_clone()
.unwrap()
.send()?
.error_for_status()?;
Ok(())
})
.map_err(construct_backoff_error)
}

/// Removes the given file.
Expand All @@ -514,11 +436,10 @@ impl WriteBackend for RestBackend {
.url(tpe, id)
.map_err(|err| construct_join_url_error(err, tpe, id, &self.url))?;

self.retry_handler
.retry_notify::<_, _, reqwest::Error>(|| {
_ = self.client.delete(url.clone()).send()?.error_for_status()?;
Ok(())
})
.map_err(construct_backoff_error)
self.retry_notify(|| {
_ = self.client.delete(url.clone()).send()?.error_for_status()?;
Ok(())
})
.map_err(construct_backoff_error)
}
}

0 comments on commit 772aac4

Please sign in to comment.