Skip to content

Commit

Permalink
Ensure the decode and send retry happens multiple times (metaplex-fou…
Browse files Browse the repository at this point in the history
Nikhil Acharya authored Jun 2, 2023
1 parent fe740e5 commit 404acdb
Showing 2 changed files with 59 additions and 59 deletions.
83 changes: 53 additions & 30 deletions tools/txn_forwarder/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
use solana_client::client_error::ClientErrorKind;

use {
anyhow::Context,
futures::stream::{BoxStream, StreamExt},
log::{debug, error, info},
serde::de::DeserializeOwned,
plerkle_messenger::TRANSACTION_STREAM,
plerkle_serialization::serializer::seralize_encoded_transaction_with_status,
solana_client::{
client_error::ClientError, client_error::Result as RpcClientResult,
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
client_error::ClientError, nonblocking::rpc_client::RpcClient,
rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_request::RpcError::RpcRequestError, rpc_request::RpcRequest,
},
solana_sdk::{
pubkey::Pubkey,
signature::{ParseSignatureError, Signature},
},
solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding},
solana_transaction_status::EncodedConfirmedTransactionWithStatusMeta,
std::sync::Arc,
std::{fmt, io::Result as IoResult, str::FromStr},
tokio::sync::Mutex,
tokio::{
fs::File,
io::{stdin, AsyncBufReadExt, BufReader},
@@ -113,16 +114,14 @@ pub fn find_signatures(
rx
}

pub async fn rpc_send_with_retries<E>(
pub async fn rpc_send_with_retries(
client: &RpcClient,
request: RpcRequest,
value: serde_json::Value,
max_retries: u8,
error_key: E,
) -> Result<EncodedConfirmedTransactionWithStatusMeta, ClientError>
where
E: fmt::Debug,
{
messenger: Arc<Mutex<Box<dyn plerkle_messenger::Messenger>>>,
signature: Signature,
) -> Result<(), ClientError> {
let mut retries = 0;
let mut delay = Duration::from_millis(500);

@@ -131,7 +130,7 @@ where

if let Err(error) = response {
if retries < max_retries {
error!("retrying {:?} {:?}: {:?}", request, error_key, error);
error!("retrying {:?} {:?}: {:?}", request, signature, error);
sleep(delay).await;
delay *= 2;
retries += 1;
@@ -140,30 +139,54 @@ where
return Err(error);
}
}

let value = response.unwrap();
let tx: EncodedConfirmedTransactionWithStatusMeta = value;

if let Some(_) = tx.transaction.transaction.decode() {
return Ok(tx);
} else {
if retries < max_retries {
error!(
"retrying {:?} {:?}: Transaction could not be decoded",
request, error_key
);
sleep(delay).await;
delay *= 2;
retries += 1;
} else {
return Err(ClientError::from(RpcRequestError(
"Transaction could not be decoded".to_string(),
)));
match send(signature, tx, Arc::clone(&messenger)).await {
Ok(_) => return Ok(()),
Err(e) => {
if retries < max_retries {
error!(
"retrying {:?} {:?}: Transaction could not be sent: {:?}",
request, signature, e
);
sleep(delay).await;
delay *= 2;
retries += 1;
} else {
return Err(ClientError::from(RpcRequestError(format!(
"Transaction could not be decoded: {}",
e
))));
}
continue;
}
}
}
}

async fn send(
signature: Signature,
tx: EncodedConfirmedTransactionWithStatusMeta,
messenger: Arc<Mutex<Box<dyn plerkle_messenger::Messenger>>>,
) -> anyhow::Result<()> {
// Ignore if tx failed or meta is missed
let meta = tx.transaction.meta.as_ref();
if meta.map(|meta| meta.status.is_err()).unwrap_or(true) {
return Ok(());
}

let fbb = flatbuffers::FlatBufferBuilder::new();
let fbb = seralize_encoded_transaction_with_status(fbb, tx)
.with_context(|| format!("failed to serialize transaction with {}", signature))?;
let bytes = fbb.finished_data();

let mut locked = messenger.lock().await;
locked.send(TRANSACTION_STREAM, bytes).await?;
info!("Sent transaction to stream {}", signature);

Ok(())
}

pub async fn read_lines(path: &str) -> anyhow::Result<BoxStream<'static, IoResult<String>>> {
Ok(if path == "-" {
LinesStream::new(BufReader::new(stdin()).lines()).boxed()
35 changes: 6 additions & 29 deletions tools/txn_forwarder/src/main.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@ use {
},
log::info,
plerkle_messenger::{MessengerConfig, ACCOUNT_STREAM, TRANSACTION_STREAM},
plerkle_serialization::serializer::seralize_encoded_transaction_with_status,
solana_client::{
nonblocking::rpc_client::RpcClient, rpc_config::RpcTransactionConfig,
rpc_request::RpcRequest,
@@ -18,7 +17,7 @@ use {
pubkey::Pubkey,
signature::Signature,
},
solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding},
solana_transaction_status::UiTransactionEncoding,
std::{env, str::FromStr, sync::Arc},
tokio::sync::{mpsc, Mutex},
txn_forwarder::{find_signatures, read_lines, rpc_send_with_retries},
@@ -33,7 +32,7 @@ struct Cli {
rpc_url: String,
#[arg(long, short, default_value_t = 25)]
concurrency: usize,
#[arg(long, short, default_value_t = 3)]
#[arg(long, short, default_value_t = 5)]
max_retries: u8,
#[arg(long, short, default_value_t = false)]
replay_forward: bool,
@@ -220,36 +219,14 @@ async fn send_tx(
};

let client = RpcClient::new(rpc_url);
let tx: EncodedConfirmedTransactionWithStatusMeta = rpc_send_with_retries(
rpc_send_with_retries(
&client,
RpcRequest::GetTransaction,
serde_json::json!([signature.to_string(), CONFIG,]),
max_retries,
Arc::clone(&messenger),
signature,
)
.await?;
send(signature, tx, Arc::clone(&messenger)).await
}

async fn send(
signature: Signature,
tx: EncodedConfirmedTransactionWithStatusMeta,
messenger: Arc<Mutex<Box<dyn plerkle_messenger::Messenger>>>,
) -> anyhow::Result<()> {
// Ignore if tx failed or meta is missed
let meta = tx.transaction.meta.as_ref();
if meta.map(|meta| meta.status.is_err()).unwrap_or(true) {
return Ok(());
}

let fbb = flatbuffers::FlatBufferBuilder::new();
let fbb = seralize_encoded_transaction_with_status(fbb, tx)
.with_context(|| format!("failed to serialize transaction with {}", signature))?;
let bytes = fbb.finished_data();

let mut locked = messenger.lock().await;
locked.send(TRANSACTION_STREAM, bytes).await?;
info!("Sent transaction to stream {}", signature);

Ok(())
.await
.map_err(|e| anyhow::anyhow!(e))
}

0 comments on commit 404acdb

Please sign in to comment.