diff --git a/tools/acc_forwarder/src/main.rs b/tools/acc_forwarder/src/main.rs index a20628e75..436e9b8d4 100644 --- a/tools/acc_forwarder/src/main.rs +++ b/tools/acc_forwarder/src/main.rs @@ -29,7 +29,7 @@ use { }, std::{collections::HashSet, env, str::FromStr, sync::Arc}, tokio::sync::Mutex, - txn_forwarder::{find_signatures, read_lines, rpc_send_with_retries}, + txn_forwarder::{find_signatures, read_lines, rpc_tx_with_retries}, }; #[derive(Parser)] @@ -146,7 +146,7 @@ async fn main() -> anyhow::Result<()> { let collection = Pubkey::from_str(&collection) .with_context(|| format!("failed to parse collection {collection}"))?; let stream = Arc::new(Mutex::new(find_signatures( - collection, client, None, None, 2_000, + collection, client, None, None, 2_000, false, ))); try_join_all((0..concurrency).map(|_| { @@ -210,7 +210,7 @@ async fn collection_get_tx_info( max_supported_transaction_version: Some(u8::MAX), }; - let tx: EncodedConfirmedTransactionWithStatusMeta = rpc_send_with_retries( + let tx: EncodedConfirmedTransactionWithStatusMeta = rpc_tx_with_retries( client, RpcRequest::GetTransaction, serde_json::json!([signature.to_string(), CONFIG]), @@ -306,7 +306,7 @@ async fn fetch_metadata_and_send_accounts( // returns largest (NFT related) token account belonging to mint async fn get_token_largest_account(client: &RpcClient, mint: Pubkey) -> anyhow::Result { - let response: RpcResponse> = rpc_send_with_retries( + let response: RpcResponse> = rpc_tx_with_retries( client, RpcRequest::Custom { method: "getTokenLargestAccounts", @@ -335,7 +335,7 @@ async fn fetch_account(pubkey: Pubkey, client: &RpcClient) -> anyhow::Result<(Ac min_context_slot: None, }; - let response: RpcResponse> = rpc_send_with_retries( + let response: RpcResponse> = rpc_tx_with_retries( client, RpcRequest::GetAccountInfo, serde_json::json!([pubkey.to_string(), CONFIG]), diff --git a/tools/tree-status/src/main.rs b/tools/tree-status/src/main.rs index f17febcde..420cc059d 100644 --- a/tools/tree-status/src/main.rs +++ b/tools/tree-status/src/main.rs @@ -64,7 +64,7 @@ use { io::{stdout, AsyncWrite, AsyncWriteExt}, sync::{mpsc, Mutex}, }, - txn_forwarder::{find_signatures, read_lines, rpc_send_with_retries}, + txn_forwarder::{find_signatures, read_lines, rpc_tx_with_retries}, }; const RPC_GET_TXN_RETRIES: u8 = 5; @@ -534,7 +534,7 @@ async fn send_txn( client: &RpcClient, messenger: &Mutex>, ) -> anyhow::Result<()> { - let txn = rpc_send_with_retries( + let txn: EncodedConfirmedTransactionWithStatusMeta = rpc_tx_with_retries( &client, RpcRequest::GetTransaction, serde_json::json!([signature.to_string(), RPC_TXN_CONFIG,]), @@ -927,7 +927,7 @@ async fn process_tx( max_supported_transaction_version: Some(0), }; - let tx: EncodedConfirmedTransactionWithStatusMeta = rpc_send_with_retries( + let tx: EncodedConfirmedTransactionWithStatusMeta = rpc_tx_with_retries( client, RpcRequest::GetTransaction, serde_json::json!([signature.to_string(), CONFIG]), diff --git a/tools/txn_forwarder/src/lib.rs b/tools/txn_forwarder/src/lib.rs index 7d7d0d44a..68b2fe183 100644 --- a/tools/txn_forwarder/src/lib.rs +++ b/tools/txn_forwarder/src/lib.rs @@ -4,6 +4,8 @@ use { log::{debug, error, info}, plerkle_messenger::TRANSACTION_STREAM, plerkle_serialization::serializer::seralize_encoded_transaction_with_status, + serde::de::DeserializeOwned, + solana_client::client_error::Result as RpcClientResult, solana_client::{ client_error::ClientError, nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, @@ -114,6 +116,36 @@ pub fn find_signatures( rx } +pub async fn rpc_tx_with_retries( + client: &RpcClient, + request: RpcRequest, + value: serde_json::Value, + max_retries: u8, + error_key: E, +) -> RpcClientResult +where + T: DeserializeOwned, + E: fmt::Debug, +{ + let mut retries = 0; + let mut delay = Duration::from_millis(500); + loop { + match client.send(request, value.clone()).await { + Ok(value) => return Ok(value), + Err(error) => { + if retries < max_retries { + error!("retrying {request} {error_key:?}: {error}"); + sleep(delay).await; + delay *= 2; + retries += 1; + } else { + return Err(error); + } + } + } + } +} + pub async fn rpc_send_with_retries( client: &RpcClient, request: RpcRequest,