Skip to content

Commit

Permalink
Simplify retry strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Jun 26, 2024
1 parent 4a2a9db commit 6a79bc2
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 53 deletions.
4 changes: 1 addition & 3 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1858,7 +1858,7 @@ mod tests {
.stream(Box::new(group_callbacks.clone()))
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;

let stream_messages = bo
.conversations()
.stream_all_messages(Box::new(message_callbacks.clone()))
Expand All @@ -1877,8 +1877,6 @@ mod tests {
.await
.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;

alix_group.send("hello1".as_bytes().to_vec()).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;

Expand Down
5 changes: 2 additions & 3 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{
},
identity::{parse_credential, Identity, IdentityError},
identity_updates::IdentityUpdateError,
retry::BackoffRetry,
retry::Retry,
retry_async, retryable,
storage::{
db_connection::DbConnection,
Expand Down Expand Up @@ -530,9 +530,8 @@ where
return None;
}
};
let mut retrier = BackoffRetry::default();
retry_async!(
retrier,
Retry::default(),
(async {
let welcome_v1 = welcome_v1.clone();
self.process_for_id(
Expand Down
5 changes: 2 additions & 3 deletions xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use futures::Stream;

use super::{extract_message_v1, GroupError, MlsGroup};
use crate::retry::BackoffRetry;
use crate::retry::Retry;
use crate::storage::group_message::StoredGroupMessage;
use crate::subscriptions::{MessagesStreamInfo, StreamCloser};
use crate::XmtpApi;
Expand Down Expand Up @@ -33,9 +33,8 @@ impl MlsGroup {
let created_ns = msgv1.created_ns;

let client_pointer = client.clone();

let process_result = retry_async!(
BackoffRetry::default(),
Retry::default(),
(async {
let client_pointer = client_pointer.clone();
let client_id = client_id.clone();
Expand Down
6 changes: 3 additions & 3 deletions xmtp_mls/src/groups/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
hpke::{encrypt_welcome, HpkeError},
identity::parse_credential,
identity_updates::load_identity_updates,
retry::BackoffRetry,
retry::Retry,
retry_async,
storage::{
db_connection::DbConnection,
Expand Down Expand Up @@ -651,7 +651,7 @@ impl MlsGroup {
let mut receive_errors = vec![];
for message in messages.into_iter() {
let result = retry_async!(
BackoffRetry::default(),
Retry::default(),
(async {
self.consume_message(&message, &mut openmls_group, client)
.await
Expand Down Expand Up @@ -752,7 +752,7 @@ impl MlsGroup {

for intent in intents {
let result = retry_async!(
BackoffRetry::default(),
Retry::default(),
(async {
self.get_publish_intent_data(&provider, client, &mut openmls_group, &intent)
.await
Expand Down
62 changes: 24 additions & 38 deletions xmtp_mls/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! }
//! ```
use std::{ops::Add, time::Duration};
use std::time::Duration;

use rand::Rng;
use smart_default::SmartDefault;
Expand All @@ -32,8 +32,13 @@ pub trait RetryableError: std::error::Error {
pub struct Retry {
#[default = 5]
retries: usize,
#[default(_code = "std::time::Duration::from_millis(200)")]
#[default(_code = "std::time::Duration::from_millis(50)")]
duration: std::time::Duration,
#[default = 3]
// The amount to multiply the duration on each subsequent attempt
multiplier: u32,
#[default = 25]
max_jitter_ms: usize,
}

impl Retry {
Expand All @@ -43,32 +48,16 @@ impl Retry {
}

/// Get the duration to wait between retries.
pub fn duration(&self) -> Duration {
self.duration
}
}

#[derive(SmartDefault, Debug, PartialEq, Eq, Copy, Clone)]
pub struct BackoffRetry {
#[default = 5]
max_retries: usize,
#[default(_code = "std::time::Duration::from_millis(50)")]
duration: std::time::Duration,
#[default = 3]
multiplier: u32,
}

impl BackoffRetry {
pub fn retries(&self) -> usize {
self.max_retries
}

pub fn duration(&mut self) -> Duration {
let jitter = rand::thread_rng().gen_range(0..=25);
let duration = self.duration.clone();
self.duration *= self.multiplier;
/// Multiples the duration by the multiplier for each subsequent attempt
/// and adds a random jitter to avoid repeated collisions
pub fn duration(&self, attempts: usize) -> Duration {
let mut duration = self.duration.clone();

Check warning on line 54 in xmtp_mls/src/retry.rs

View workflow job for this annotation

GitHub Actions / workspace

using `clone` on type `Duration` which implements the `Copy` trait

warning: using `clone` on type `Duration` which implements the `Copy` trait --> xmtp_mls/src/retry.rs:54:28 | 54 | let mut duration = self.duration.clone(); | ^^^^^^^^^^^^^^^^^^^^^ help: try removing the `clone` call: `self.duration` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#clone_on_copy = note: `#[warn(clippy::clone_on_copy)]` on by default

Check warning on line 54 in xmtp_mls/src/retry.rs

View workflow job for this annotation

GitHub Actions / workspace

using `clone` on type `Duration` which implements the `Copy` trait

warning: using `clone` on type `Duration` which implements the `Copy` trait --> xmtp_mls/src/retry.rs:54:28 | 54 | let mut duration = self.duration.clone(); | ^^^^^^^^^^^^^^^^^^^^^ help: try removing the `clone` call: `self.duration` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#clone_on_copy = note: `#[warn(clippy::clone_on_copy)]` on by default
for _ in 0..attempts - 1 {
duration *= self.multiplier;
}

duration + Duration::from_millis(jitter)
let jitter = rand::thread_rng().gen_range(0..=self.max_jitter_ms);
duration + Duration::from_millis(jitter as u64)
}
}

Expand Down Expand Up @@ -180,10 +169,12 @@ macro_rules! retry_sync {
Ok(v) => break Ok(v),
Err(e) => {
if (&e).is_retryable() && attempts < $retry.retries() {
log::info!(
"retrying function that failed with error=`{}`",
e.to_string()
);
attempts += 1;
std::thread::sleep($retry.duration(attempts));
} else {
break Err(e);
}
Expand Down Expand Up @@ -252,7 +243,9 @@ macro_rules! retry_async {
Ok(v) => break Ok(v),
Err(e) => {
if (&e).is_retryable() && attempts < $retry.retries() {
log::warn!("retrying function that failed with error={}", e.to_string());
attempts += 1;
tokio::time::sleep($retry.duration(attempts)).await;
} else {
log::info!("error is not retryable. {:?}", e);
break Err(e);
Expand Down Expand Up @@ -412,17 +405,10 @@ mod tests {

#[test]
fn backoff_retry() {
let mut backoff_retry = BackoffRetry::default();

let duration = backoff_retry.duration();

assert!(duration.as_millis() - 50 <= 25);

let duration = backoff_retry.duration();

assert!(duration.as_millis() - 150 <= 25);
let backoff_retry = Retry::default();

let duration = backoff_retry.duration();
assert!(duration.as_millis() - 450 <= 25);
assert!(backoff_retry.duration(1).as_millis() - 50 <= 25);
assert!(backoff_retry.duration(2).as_millis() - 150 <= 25);
assert!(backoff_retry.duration(3).as_millis() - 450 <= 25);
}
}
5 changes: 2 additions & 3 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
api::GroupFilter,
client::{extract_welcome_message, ClientError},
groups::{extract_group_id, GroupError, MlsGroup},
retry::BackoffRetry,
retry::Retry,
retry_async,
storage::group_message::StoredGroupMessage,
Client, XmtpApi,
Expand Down Expand Up @@ -60,9 +60,8 @@ where
welcome: WelcomeMessage,
) -> Result<MlsGroup, ClientError> {
let welcome_v1 = extract_welcome_message(welcome)?;
let mut retrier = BackoffRetry::default();
let creation_result = retry_async!(
retrier,
Retry::default(),
(async {
let welcome_v1 = welcome_v1.clone();
self.context
Expand Down

0 comments on commit 6a79bc2

Please sign in to comment.