Skip to content

Commit

Permalink
feat: refactor send_transaction, add land time metric (#362)
Browse files Browse the repository at this point in the history
* feat: refactor send_transaction

* lfg

* format

* cleanup

* cleanup

* add histogram

* add metric label

* rename

* rename metric

* add outcome tag

* result label

* remove these two files

* docs: add comment explaining expire inaccuracies

* restore initial send
  • Loading branch information
guibescos authored Jan 24, 2025
1 parent 5a7a759 commit fa8ef21
Showing 1 changed file with 54 additions and 28 deletions.
82 changes: 54 additions & 28 deletions auction-server/src/auction/service/auction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use {
},
anyhow::Result,
axum::async_trait,
axum_prometheus::metrics,
ethers::{
contract::EthEvent,
providers::{
Expand Down Expand Up @@ -63,7 +64,10 @@ use {
Context,
Poll,
},
time::Duration,
time::{
Duration,
Instant,
},
},
time::OffsetDateTime,
tokio::time::{
Expand Down Expand Up @@ -592,6 +596,7 @@ impl AuctionManager<Svm> for Service<Svm> {
}

const SEND_TRANSACTION_RETRY_COUNT_SVM: i32 = 30;
const RETRY_DURATION: Duration = Duration::from_secs(2);

impl Service<Svm> {
pub fn add_relayer_signature(&self, bid: &mut entities::Bid<Svm>) {
Expand All @@ -618,36 +623,58 @@ impl Service<Svm> {
}

#[tracing::instrument(skip_all, fields(bid_id, total_tries, tx_hash))]
async fn blocking_send_transaction(&self, bid: entities::Bid<Svm>, signature: Signature) {
async fn blocking_send_transaction(&self, bid: entities::Bid<Svm>) {
let start = Instant::now();
let mut result_label = "expired";
let signature = bid.chain_data.transaction.signatures[0];
tracing::Span::current().record("bid_id", bid.id.to_string());
tracing::Span::current().record("tx_hash", signature.to_string());
let mut receiver = self.config.chain_config.log_sender.subscribe();
for retry_count in 0..SEND_TRANSACTION_RETRY_COUNT_SVM {
tokio::time::sleep(Duration::from_secs(2)).await;

// Do not wait for the logs to be received
// just check if the transaction is in the logs already
while let Ok(log) = receiver.try_recv() {
if log.value.signature.eq(&signature.to_string()) {
tracing::Span::current().record("total_tries", retry_count + 1);
return;
let mut retry_interval = tokio::time::interval(RETRY_DURATION);
let mut try_count = 0;
while try_count < SEND_TRANSACTION_RETRY_COUNT_SVM {
tokio::select! {
log = receiver.recv() => {
if let Ok(log) = log {
if log.value.signature.eq(&signature.to_string()) {
if log.value.err.is_some() {
result_label = "failed";
} else {
result_label = "success";
}
break
}
}
}
_ = retry_interval.tick() => {
try_count += 1;
if let Err(e) = self
.config
.chain_config
.tx_broadcaster_client
.send_transaction_with_config(
&bid.chain_data.transaction,
self.get_send_transaction_config(),
)
.await
{
tracing::error!(error = ?e, "Failed to resend transaction");
}
}
}
if let Err(e) = self
.config
.chain_config
.tx_broadcaster_client
.send_transaction_with_config(
&bid.chain_data.transaction,
self.get_send_transaction_config(),
)
.await
{
tracing::error!(error = ?e, "Failed to resend transaction");
}
}

tracing::Span::current().record("total_tries", SEND_TRANSACTION_RETRY_COUNT_SVM + 1);
let labels = [
("chain_id", self.config.chain_id.clone()),
// note: this metric can have the label "expired" even when the transaction landed
// if the log listener didn't get the log in time
// but this is rare as we retry for 60 seconds and blockhash expires after 60 seconds
("result", result_label.to_string()),
];
metrics::histogram!("transaction_landing_time_seconds_svm", &labels)
.record(start.elapsed().as_secs_f64());

tracing::Span::current().record("total_tries", try_count);
}

#[tracing::instrument(skip_all, fields(bid_id))]
Expand All @@ -657,8 +684,7 @@ impl Service<Svm> {
) -> solana_client::client_error::Result<Signature> {
tracing::Span::current().record("bid_id", bid.id.to_string());
let tx = &bid.chain_data.transaction;
let res = self
.config
self.config
.chain_config
.tx_broadcaster_client
.send_transaction_with_config(tx, self.get_send_transaction_config())
Expand All @@ -671,10 +697,10 @@ impl Service<Svm> {
self.task_tracker.spawn({
let (service, bid) = (self.clone(), bid.clone());
async move {
service.blocking_send_transaction(bid, res).await;
service.blocking_send_transaction(bid).await;
}
});
Ok(res)
Ok(tx.signatures[0])
}
}

Expand Down

0 comments on commit fa8ef21

Please sign in to comment.