From a63eb1908bdeb87364abcd045692d1a46d434fa1 Mon Sep 17 00:00:00 2001 From: Dani Mehrjerdi Date: Sat, 14 Dec 2024 18:30:17 +0100 Subject: [PATCH] Fix svm slow conclusion (#292) --- .../src/auction/service/auction_manager.rs | 1 - .../src/auction/service/conclude_auction.rs | 90 +++++++++++-------- auction-server/src/auction/service/workers.rs | 89 ++++++++++++++---- 3 files changed, 124 insertions(+), 56 deletions(-) diff --git a/auction-server/src/auction/service/auction_manager.rs b/auction-server/src/auction/service/auction_manager.rs index 5b30c5e4..0e85fe30 100644 --- a/auction-server/src/auction/service/auction_manager.rs +++ b/auction-server/src/auction/service/auction_manager.rs @@ -455,7 +455,6 @@ impl AuctionManager for Service { return Ok(vec![]); } - //TODO: this can be optimized out if triggered by websocket events let signatures: Vec<_> = bids .iter() .map(|bid| { diff --git a/auction-server/src/auction/service/conclude_auction.rs b/auction-server/src/auction/service/conclude_auction.rs index 3d82dc43..397f8b36 100644 --- a/auction-server/src/auction/service/conclude_auction.rs +++ b/auction-server/src/auction/service/conclude_auction.rs @@ -15,26 +15,62 @@ pub struct ConcludeAuctionInput { pub auction: entities::Auction, } +pub struct ConcludeAuctionWithStatusesInput { + pub auction: entities::Auction, + pub bid_statuses: Vec<(T::BidStatusType, entities::Bid)>, +} + impl Service where Service: AuctionManager, { - #[tracing::instrument(skip_all, fields(auction_id, tx_hash, bid_ids, bid_statuses))] - pub async fn conclude_auction(&self, input: ConcludeAuctionInput) -> anyhow::Result<()> { + #[tracing::instrument(skip_all, fields(auction_id, bid_ids, bid_statuses))] + pub async fn conclude_auction_with_statuses( + &self, + input: ConcludeAuctionWithStatusesInput, + ) -> anyhow::Result<()> { let mut auction = input.auction; - tracing::info!(chain_id = self.config.chain_id, auction_id = ?auction.id, permission_key = auction.permission_key.to_string(), "Concluding auction"); + tracing::Span::current().record( + "bid_ids", + tracing::field::display(entities::BidContainerTracing(&auction.bids)), + ); tracing::Span::current().record("auction_id", auction.id.to_string()); + tracing::Span::current().record("bid_statuses", format!("{:?}", input.bid_statuses)); + join_all(input.bid_statuses.into_iter().map(|(status, bid)| { + self.update_bid_status(UpdateBidStatusInput { + bid: bid.clone(), + new_status: status.clone(), + }) + })) + .await; + + if self + .repo + .get_in_memory_submitted_bids_for_auction(&auction) + .await + .is_empty() + { + self.repo + .conclude_auction(&mut auction) + .await + .map_err(|e| anyhow::anyhow!("Failed to conclude auction: {:?}", e))?; + self.repo.remove_in_memory_submitted_auction(auction).await; + } + + Ok(()) + } + + /// Concludes an auction by getting the auction transaction status from the chain. + #[tracing::instrument(skip_all)] + pub async fn conclude_auction(&self, input: ConcludeAuctionInput) -> anyhow::Result<()> { + let auction = input.auction; + tracing::info!(chain_id = self.config.chain_id, auction_id = ?auction.id, permission_key = auction.permission_key.to_string(), "Concluding auction"); if let Some(tx_hash) = auction.tx_hash.clone() { - tracing::Span::current().record("tx_hash", format!("{:?}", tx_hash)); let bids = self .repo .get_in_memory_submitted_bids_for_auction(&auction) .await; - tracing::Span::current().record( - "bid_ids", - tracing::field::display(entities::BidContainerTracing(&bids)), - ); let bid_statuses = self .get_bid_results( bids.clone(), @@ -45,35 +81,15 @@ where ) .await?; - join_all( - bid_statuses - .iter() - .zip(bids.iter()) - .filter_map(|(status, bid)| { - status.as_ref().map(|status| { - self.update_bid_status(UpdateBidStatusInput { - bid: bid.clone(), - new_status: status.clone(), - }) - }) - }), - ) - .await; - - - if self - .repo - .get_in_memory_submitted_bids_for_auction(&auction) - .await - .is_empty() - { - tracing::Span::current().record("bid_statuses", format!("{:?}", bid_statuses)); - self.repo - .conclude_auction(&mut auction) - .await - .map_err(|e| anyhow::anyhow!("Failed to conclude auction: {:?}", e))?; - self.repo.remove_in_memory_submitted_auction(auction).await; - } + self.conclude_auction_with_statuses(ConcludeAuctionWithStatusesInput { + auction, + bid_statuses: bid_statuses + .into_iter() + .zip(bids) + .filter_map(|(status, bid)| status.map(|status| (status, bid))) + .collect(), + }) + .await?; } Ok(()) } diff --git a/auction-server/src/auction/service/workers.rs b/auction-server/src/auction/service/workers.rs index ba0e55e9..204d50ce 100644 --- a/auction-server/src/auction/service/workers.rs +++ b/auction-server/src/auction/service/workers.rs @@ -8,7 +8,8 @@ use { api::ws::UpdateEvent, auction::{ api::SvmChainUpdate, - service::conclude_auction::ConcludeAuctionInput, + entities, + service::conclude_auction::ConcludeAuctionWithStatusesInput, }, kernel::entities::{ Evm, @@ -25,9 +26,13 @@ use { }, axum_prometheus::metrics, ethers::providers::Middleware, - solana_client::rpc_config::{ - RpcTransactionLogsConfig, - RpcTransactionLogsFilter, + futures::future::join_all, + solana_client::{ + rpc_config::{ + RpcTransactionLogsConfig, + RpcTransactionLogsFilter, + }, + rpc_response::RpcLogsResponse, }, solana_sdk::{ commitment_config::CommitmentConfig, @@ -132,6 +137,53 @@ impl Service { const GET_LATEST_BLOCKHASH_INTERVAL_SVM: Duration = Duration::from_secs(5); impl Service { + pub async fn conclude_auction_for_log( + &self, + auction: entities::Auction, + log: RpcLogsResponse, + ) -> Result<()> { + let signature = Signature::from_str(&log.signature)?; + let submitted_bids = self + .repo + .get_in_memory_submitted_bids_for_auction(&auction) + .await; + if let Some(bid) = submitted_bids + .iter() + .find(|bid| bid.chain_data.transaction.signatures[0] == signature) + { + let bid_status = match log.err { + Some(_) => entities::BidStatusSvm::Failed { + auction: entities::BidStatusAuction { + id: auction.id, + tx_hash: signature, + }, + }, + None => entities::BidStatusSvm::Won { + auction: entities::BidStatusAuction { + id: auction.id, + tx_hash: signature, + }, + }, + }; + + self.conclude_auction_with_statuses(ConcludeAuctionWithStatusesInput { + auction: auction.clone(), + bid_statuses: vec![(bid_status, bid.clone())], + }) + .await + .map_err(|e| { + tracing::error!( + error = ?e, + auction_id = ?auction.id, + tx_hash = ?signature, + "Failed to conclude auction with statuses" + ); + e + })?; + } + Ok(()) + } + pub async fn run_auction_conclusion_loop(&self) -> Result<()> { tracing::info!( chain_id = self.config.chain_id, @@ -151,21 +203,22 @@ impl Service { log = ?rpc_log.clone(), "New log trigger received", ); - if let Ok(signature) = Signature::from_str(&rpc_log.value.signature){ - self.task_tracker.spawn({ - let service = self.clone(); - async move { - let submitted_auctions = service.repo.get_in_memory_submitted_auctions().await; - if let Some(auction) = submitted_auctions.iter().find(|auction| { - auction.bids.iter().any(|bid| bid.chain_data.transaction.signatures[0] == signature) - }) { - if let Err(err) = service.conclude_auction(ConcludeAuctionInput{auction: auction.clone()}).await { - tracing::error!(error = ?err, auction = ?auction, "Error while concluding submitted auction"); - } + if let Ok(signature) = Signature::from_str(&rpc_log.value.signature) { + self.task_tracker.spawn({ + let service = self.clone(); + async move { + let submitted_auctions = service.repo.get_in_memory_submitted_auctions().await; + let auctions = submitted_auctions.iter().filter(|auction| { + auction.bids.iter().any(|bid| { + bid.chain_data.transaction.signatures[0] == signature + }) + }); + join_all( + auctions.map(|auction| service.conclude_auction_for_log(auction.clone(), rpc_log.value.clone())) + ).await; } - } - }); - } + }); + } } } }