Skip to content

Commit

Permalink
Merge pull request #2705 from mickvandijke/fix-evmlib-error-escaped-r…
Browse files Browse the repository at this point in the history
…etry

fix: evmlib error escaped retry & ski pay free chunks
  • Loading branch information
mickvandijke authored Feb 5, 2025
2 parents 6035b2d + 2e12540 commit ceedec1
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 55 deletions.
34 changes: 18 additions & 16 deletions autonomi/src/client/payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,24 @@ impl Client {
let number_of_content_addrs = content_addrs.clone().count();
let quotes = self.get_store_quotes(data_type, content_addrs).await?;

// Make sure nobody else can use the wallet while we are paying
debug!("Waiting for wallet lock");
let lock_guard = wallet.lock().await;
debug!("Locked wallet");

// TODO: the error might contain some succeeded quote payments as well. These should be returned on err, so that they can be skipped when retrying.
// TODO: retry when it fails?
// Execute chunk payments
let _payments = wallet
.pay_for_quotes(quotes.payments())
.await
.map_err(|err| PayError::from(err.0))?;

// payment is done, unlock the wallet for other threads
drop(lock_guard);
debug!("Unlocked wallet");
if !quotes.is_empty() {
// Make sure nobody else can use the wallet while we are paying
debug!("Waiting for wallet lock");
let lock_guard = wallet.lock().await;
debug!("Locked wallet");

// TODO: the error might contain some succeeded quote payments as well. These should be returned on err, so that they can be skipped when retrying.
// TODO: retry when it fails?
// Execute chunk payments
let _payments = wallet
.pay_for_quotes(quotes.payments())
.await
.map_err(|err| PayError::from(err.0))?;

// payment is done, unlock the wallet for other threads
drop(lock_guard);
debug!("Unlocked wallet");
}

let skipped_chunks = number_of_content_addrs - quotes.len();
trace!(
Expand Down
2 changes: 2 additions & 0 deletions evmlib/src/contract/network_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub enum Error {
RpcError(#[from] RpcError<TransportErrorKind>),
#[error(transparent)]
PendingTransactionError(#[from] alloy::providers::PendingTransactionError),
#[error("Timeout: {0:?}")]
Timeout(#[from] tokio::time::error::Elapsed),
}

pub struct NetworkToken<T: Transport + Clone, P: Provider<T, N>, N: Network> {
Expand Down
2 changes: 2 additions & 0 deletions evmlib/src/contract/payment_vault/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ pub enum Error {
PaymentInvalid,
#[error("Payment verification length must be 3.")]
PaymentVerificationLengthInvalid,
#[error("Timeout: {0:?}")]
Timeout(#[from] tokio::time::error::Elapsed),
}
125 changes: 86 additions & 39 deletions evmlib/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use alloy::transports::Transport;
use std::time::Duration;

pub(crate) const MAX_RETRIES: u8 = 3;
const DEFAULT_RETRY_INTERVAL_MS: u64 = 1000;
const DEFAULT_RETRY_INTERVAL_MS: u64 = 4000;
const BROADCAST_TRANSACTION_TIMEOUT_MS: u64 = 5000;
const WATCH_TIMEOUT_MS: u64 = 1000;

/// Execute an async closure that returns a result. Retry on failure.
pub(crate) async fn retry<F, Fut, T, E>(
Expand Down Expand Up @@ -57,58 +59,103 @@ where
P: Provider<T, N>,
N: Network,
E: From<alloy::transports::RpcError<alloy::transports::TransportErrorKind>>
+ From<alloy::providers::PendingTransactionError>,
+ From<alloy::providers::PendingTransactionError>
+ From<tokio::time::error::Elapsed>,
{
let mut nonce: Option<u64> = None;
let mut retries = 0;

loop {
let result = {
let mut transaction_request = provider
.transaction_request()
.with_to(to)
.with_input(calldata.clone());

// Retry with the same nonce to replace a stuck transaction
if let Some(nonce) = nonce {
transaction_request.set_nonce(nonce);
} else {
nonce = transaction_request.nonce();
let mut transaction_request = provider
.transaction_request()
.with_to(to)
.with_input(calldata.clone());

// Retry with the same nonce to replace a stuck transaction
if let Some(nonce) = nonce {
transaction_request.set_nonce(nonce);
} else {
nonce = transaction_request.nonce();
}

let pending_tx_builder_result = tokio::time::timeout(
Duration::from_millis(BROADCAST_TRANSACTION_TIMEOUT_MS),
provider.send_transaction(transaction_request.clone()),
)
.await;

let pending_tx_builder = match pending_tx_builder_result {
Ok(Ok(pending_tx_builder)) => pending_tx_builder,
Ok(Err(err)) => {
if retries == MAX_RETRIES {
error!("Failed to send {tx_identifier} transaction after {retries} retries. Giving up. Error: {err:?}");
break Err(E::from(err));
}

retries += 1;
let retry_interval_ms = DEFAULT_RETRY_INTERVAL_MS;
let delay = Duration::from_millis(retry_interval_ms * retries.pow(2) as u64);

warn!(
"Error sending {tx_identifier} transaction: {err:?}. Retry #{} in {} second(s).",
retries,
delay.as_secs(),
);

tokio::time::sleep(delay).await;

continue;
}
Err(err) => {
if retries == MAX_RETRIES {
error!("Failed to send {tx_identifier} transaction after {retries} retries. Giving up. Error: {err:?}");
break Err(E::from(err));
}

retries += 1;
let retry_interval_ms = DEFAULT_RETRY_INTERVAL_MS;
let delay = Duration::from_millis(retry_interval_ms * retries.pow(2) as u64);

warn!(
"Error sending {tx_identifier} transaction: {err:?}. Retry #{} in {} second(s).",
retries,
delay.as_secs(),
);

let pending_tx_builder = provider
.send_transaction(transaction_request.clone())
.await?;

debug!(
"{tx_identifier} transaction is pending with tx_hash: {:?}",
pending_tx_builder.tx_hash()
);

retry(
|| async {
PendingTransactionBuilder::from_config(
provider.root().clone(),
pending_tx_builder.inner().clone(),
)
.with_timeout(Some(TX_TIMEOUT))
.watch()
.await
},
"watching pending transaction",
None,
)
.await
tokio::time::sleep(delay).await;

continue;
}
};

match result {
debug!(
"{tx_identifier} transaction is pending with tx_hash: {:?}",
pending_tx_builder.tx_hash()
);

let watch_result = retry(
|| async {
PendingTransactionBuilder::from_config(
provider.root().clone(),
pending_tx_builder.inner().clone(),
)
.with_timeout(Some(TX_TIMEOUT))
.watch()
.await
},
"watching pending transaction",
Some(WATCH_TIMEOUT_MS),
)
.await;

match watch_result {
Ok(tx_hash) => {
debug!("{tx_identifier} transaction with hash {tx_hash:?} is successful");
break Ok(tx_hash);
}
Err(err) => {
if retries == MAX_RETRIES {
error!("Failed to send and confirm {tx_identifier} transaction after {retries} retries. Giving up. Error: {err:?}");
error!("Failed to confirm {tx_identifier} transaction after {retries} retries. Giving up. Error: {err:?}");
break Err(E::from(err));
}

Expand All @@ -117,7 +164,7 @@ where
let delay = Duration::from_millis(retry_interval_ms * retries.pow(2) as u64);

warn!(
"Error sending and confirming {tx_identifier} transaction: {err:?}. Retry #{} in {} second(s).",
"Error confirming {tx_identifier} transaction: {err:?}. Retry #{} in {} second(s).",
retries,
delay.as_secs(),
);
Expand Down

0 comments on commit ceedec1

Please sign in to comment.