diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 05e6ca1..82eda2f 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] -channel = "stable" +channel = "1.79.0" profile = "minimal" components = ["rustfmt", "clippy"] diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index 14c0ac9..1e6aadd 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -262,7 +262,7 @@ mod exporter { config.exporter.unchanged_publish_threshold, ).await { if let Err(err) = publish_batches( - &*state, + state.clone(), client.clone(), network, &network_state_rx, diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index eb7c74a..2bceded 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -429,7 +429,7 @@ async fn estimate_compute_unit_price_micro_lamports( ) )] pub async fn publish_batches( - state: &S, + state: Arc, client: Arc, network: Network, network_state_rx: &watch::Receiver, @@ -466,7 +466,7 @@ where let network_state = *network_state_rx.borrow(); for batch in batches { batch_futures.push(publish_batch( - state, + state.clone(), client.clone(), network, network_state, @@ -494,7 +494,7 @@ where .into_iter() .collect::>>()?; - Exporter::record_publish(state, batch_state).await; + Exporter::record_publish(&*state, batch_state).await; Ok(()) } @@ -509,7 +509,7 @@ where ) )] async fn publish_batch( - state: &S, + state: Arc, client: Arc, network: Network, network_state: NetworkState, @@ -535,7 +535,7 @@ where let mut instructions = Vec::new(); // Refresh the data in the batch - let local_store_contents = LocalStore::get_all_price_infos(state).await; + let local_store_contents = LocalStore::get_all_price_infos(&*state).await; let refreshed_batch = batch.iter().map(|(identifier, _)| { ( identifier, @@ -615,7 +615,7 @@ where // Use the estimated previous price if it is higher // than the current price. let recent_compute_unit_price_micro_lamports = - Exporter::get_recent_compute_unit_price_micro_lamports(state).await; + Exporter::get_recent_compute_unit_price_micro_lamports(&*state).await; if let Some(estimated_recent_price) = recent_compute_unit_price_micro_lamports { // Get the estimated compute unit price and wrap it so it stays below the maximum @@ -633,7 +633,7 @@ where // in this batch. This will use the maximum total compute unit fee if the publisher // hasn't updated for >= MAXIMUM_SLOT_GAP_FOR_DYNAMIC_COMPUTE_UNIT_PRICE slots. let result = GlobalStore::price_accounts( - state, + &*state, network, price_accounts.clone().into_iter().collect(), ) @@ -697,31 +697,33 @@ where network_state.blockhash, ); - let signature = match client - .send_transaction_with_config( - &transaction, - RpcSendTransactionConfig { - skip_preflight: true, - ..RpcSendTransactionConfig::default() - }, - ) - .await - { - Ok(signature) => signature, - Err(err) => { - tracing::error!(err = ?err, "Exporter: failed to send transaction."); - return Ok(()); - } - }; + tokio::spawn(async move { + let signature = match client + .send_transaction_with_config( + &transaction, + RpcSendTransactionConfig { + skip_preflight: true, + ..RpcSendTransactionConfig::default() + }, + ) + .await + { + Ok(signature) => signature, + Err(err) => { + tracing::error!(err = ?err, "Exporter: failed to send transaction."); + return; + } + }; - tracing::debug!( - signature = signature.to_string(), - instructions = instructions.len(), - price_accounts = ?price_accounts, - "Sent upd_price transaction.", - ); + tracing::debug!( + signature = signature.to_string(), + instructions = instructions.len(), + price_accounts = ?price_accounts, + "Sent upd_price transaction.", + ); - Transactions::add_transaction(state, signature).await; + Transactions::add_transaction(&*state, signature).await; + }); Ok(()) }