Skip to content

Commit

Permalink
Retry on 419 error
Browse files Browse the repository at this point in the history
Use `backoff` library to exponentially retry on HTTP 419 error
  • Loading branch information
vrutkovs committed Dec 8, 2021
1 parent 421c28c commit d9546ad
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 16 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ tag-message = "dkregistry v{{version}}"

[dependencies]
base64 = "0.13"
futures = "0.3"
futures = "0.3.8"
http = "0.2"
libflate = "1.0"
log = "0.4"
Expand All @@ -42,6 +42,8 @@ sha2 = "^0.10.0"
async-stream = "0.3"
thiserror = "1.0.19"
url = "2.1.1"
backoff = { version = "0.3.0", features = ["tokio"]}
async-trait = "0.1.51"

[dev-dependencies]
dirs = "4.0"
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ extern crate serde;
extern crate log;
#[macro_use]
extern crate strum_macros;
#[macro_use]
extern crate async_trait;

pub mod errors;
pub mod mediatypes;
Expand Down
9 changes: 6 additions & 3 deletions src/v2/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl BearerAuth {
}
.build_reqwest(Method::GET, url);

let r = auth_req.send().await?;
let r = auth_req.send_retry().await?;
let status = r.status();
trace!("authenticate: got status {}", status);
if status != StatusCode::OK {
Expand Down Expand Up @@ -232,7 +232,10 @@ impl Client {
reqwest::Url::parse(&ep)?
};

let r = self.build_reqwest(Method::GET, url.clone()).send().await?;
let r = self
.build_reqwest(Method::GET, url.clone())
.send_retry()
.await?;

trace!("GET '{}' status: {:?}", r.url(), r.status());
r.headers()
Expand Down Expand Up @@ -297,7 +300,7 @@ impl Client {
let req = self.build_reqwest(Method::GET, url.clone());

trace!("Sending request to '{}'", url);
let resp = req.send().await?;
let resp = req.send_retry().await?;
trace!("GET '{:?}'", resp);

let status = resp.status();
Expand Down
10 changes: 8 additions & 2 deletions src/v2/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ impl Client {
reqwest::Url::parse(&ep)?
};

let res = self.build_reqwest(Method::HEAD, url.clone()).send().await?;
let res = self
.build_reqwest(Method::HEAD, url.clone())
.send_retry()
.await?;

trace!("Blob HEAD status: {:?}", res.status());

Expand All @@ -28,7 +31,10 @@ impl Client {
let ep = format!("{}/v2/{}/blobs/{}", self.base_url, name, digest);
let url = reqwest::Url::parse(&ep)?;

let res = self.build_reqwest(Method::GET, url.clone()).send().await?;
let res = self
.build_reqwest(Method::GET, url.clone())
.send_retry()
.await?;

trace!("GET {} status: {}", res.url(), res.status());
let status = res.status();
Expand Down
6 changes: 3 additions & 3 deletions src/v2/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::errors::Result;
use crate::v2;
use crate::v2::{Client, SendRetry};
use async_stream::try_stream;
use futures::stream::Stream;
use futures::{self};
Expand All @@ -10,7 +10,7 @@ struct Catalog {
pub repositories: Vec<String>,
}

impl v2::Client {
impl Client {
pub fn get_catalog<'a, 'b: 'a>(
&'b self,
paginate: Option<u32>,
Expand Down Expand Up @@ -39,7 +39,7 @@ impl v2::Client {
}

async fn fetch_catalog(req: RequestBuilder) -> Result<Catalog> {
let r = req.send().await?;
let r = req.send_retry().await?;
let status = r.status();
trace!("Got status: {:?}", status);
match status {
Expand Down
3 changes: 2 additions & 1 deletion src/v2/manifest/manifest_schema2.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::errors::{Error, Result};
use crate::v2::SendRetry;
use reqwest::Method;

/// Manifest version 2 schema 2.
Expand Down Expand Up @@ -105,7 +106,7 @@ impl ManifestSchema2Spec {

let r = client
.build_reqwest(Method::GET, url.clone())
.send()
.send_retry()
.await?;

let status = r.status();
Expand Down
6 changes: 3 additions & 3 deletions src/v2/manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Client {
let res = self
.build_reqwest(Method::GET, url.clone())
.headers(accept_headers)
.send()
.send_retry()
.await?;

let status = res.status();
Expand Down Expand Up @@ -112,7 +112,7 @@ impl Client {
let res = self
.build_reqwest(Method::HEAD, url)
.headers(accept_headers)
.send()
.send_retry()
.await?;

let status = res.status();
Expand Down Expand Up @@ -165,7 +165,7 @@ impl Client {
let r = self
.build_reqwest(Method::HEAD, url.clone())
.headers(accept_headers)
.send()
.send_retry()
.await
.map_err(Error::from)?;

Expand Down
35 changes: 33 additions & 2 deletions src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
use crate::errors::*;
use crate::mediatypes::MediaTypes;
use futures::prelude::*;
use reqwest::{Method, StatusCode, Url};
use reqwest::{Method, Response, StatusCode, Url};

mod config;
pub use self::config::Config;
Expand All @@ -50,6 +50,8 @@ mod content_digest;
pub(crate) use self::content_digest::ContentDigest;
pub use self::content_digest::ContentDigestError;

use backoff::{future::retry, ExponentialBackoff};

/// A Client to make outgoing API requests to a registry.
#[derive(Clone, Debug)]
pub struct Client {
Expand Down Expand Up @@ -97,7 +99,7 @@ impl Client {
self.build_reqwest(Method::GET, url)
})?;

let response = request.send().await?;
let response = request.send_retry().await?;

let b = match (response.status(), response.headers().get(api_header)) {
(StatusCode::OK, Some(x)) => Ok((x == api_version, true)),
Expand Down Expand Up @@ -127,6 +129,35 @@ impl Client {
}
}

#[async_trait]
pub trait SendRetry {
const RETRY_CODE: u16 = 429;
async fn send_retry(self) -> Result<Response>;
}

#[async_trait]
impl SendRetry for reqwest::RequestBuilder {
async fn send_retry(self) -> Result<Response> {
let op = || async {
self.try_clone().unwrap().send().await.map_err(|err| {
if Some(StatusCode::from_u16(Self::RETRY_CODE).unwrap()) == err.status() {
backoff::Error::Transient(Error::from(err))
} else {
backoff::Error::Permanent(Error::from(err))
}
})
};

retry(
ExponentialBackoff {
..ExponentialBackoff::default()
},
op,
)
.await
}
}

#[derive(Debug, Default, Deserialize, Serialize)]
struct ApiError {
code: String,
Expand Down
2 changes: 1 addition & 1 deletion src/v2/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Client {
let resp = self
.build_reqwest(Method::GET, url.clone())
.header(header::ACCEPT, "application/json")
.send()
.send_retry()
.await?
.error_for_status()?;

Expand Down

0 comments on commit d9546ad

Please sign in to comment.