From bfb7bcc62960ac174eed3de68565289b617c8f30 Mon Sep 17 00:00:00 2001
From: Reisen <reisen@morphism.org>
Date: Mon, 29 Jul 2024 06:30:08 +0000
Subject: [PATCH] fix: spawn transaction senders in exporter

---
 src/agent/services/exporter.rs |  2 +-
 src/agent/state/exporter.rs    | 62 ++++++++++++++++++----------------
 2 files changed, 33 insertions(+), 31 deletions(-)

diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs
index 14c0ac9..1e6aadd 100644
--- a/src/agent/services/exporter.rs
+++ b/src/agent/services/exporter.rs
@@ -262,7 +262,7 @@ mod exporter {
                             config.exporter.unchanged_publish_threshold,
                         ).await {
                             if let Err(err) = publish_batches(
-                                &*state,
+                                state.clone(),
                                 client.clone(),
                                 network,
                                 &network_state_rx,
diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs
index eb7c74a..2bceded 100644
--- a/src/agent/state/exporter.rs
+++ b/src/agent/state/exporter.rs
@@ -429,7 +429,7 @@ async fn estimate_compute_unit_price_micro_lamports(
     )
 )]
 pub async fn publish_batches<S>(
-    state: &S,
+    state: Arc<S>,
     client: Arc<RpcClient>,
     network: Network,
     network_state_rx: &watch::Receiver<NetworkState>,
@@ -466,7 +466,7 @@ where
     let network_state = *network_state_rx.borrow();
     for batch in batches {
         batch_futures.push(publish_batch(
-            state,
+            state.clone(),
             client.clone(),
             network,
             network_state,
@@ -494,7 +494,7 @@ where
         .into_iter()
         .collect::<Result<Vec<_>>>()?;
 
-    Exporter::record_publish(state, batch_state).await;
+    Exporter::record_publish(&*state, batch_state).await;
     Ok(())
 }
 
@@ -509,7 +509,7 @@ where
     )
 )]
 async fn publish_batch<S>(
-    state: &S,
+    state: Arc<S>,
     client: Arc<RpcClient>,
     network: Network,
     network_state: NetworkState,
@@ -535,7 +535,7 @@ where
     let mut instructions = Vec::new();
 
     // Refresh the data in the batch
-    let local_store_contents = LocalStore::get_all_price_infos(state).await;
+    let local_store_contents = LocalStore::get_all_price_infos(&*state).await;
     let refreshed_batch = batch.iter().map(|(identifier, _)| {
         (
             identifier,
@@ -615,7 +615,7 @@ where
         // Use the estimated previous price if it is higher
         // than the current price.
         let recent_compute_unit_price_micro_lamports =
-            Exporter::get_recent_compute_unit_price_micro_lamports(state).await;
+            Exporter::get_recent_compute_unit_price_micro_lamports(&*state).await;
 
         if let Some(estimated_recent_price) = recent_compute_unit_price_micro_lamports {
             // Get the estimated compute unit price and wrap it so it stays below the maximum
@@ -633,7 +633,7 @@ where
         // in this batch. This will use the maximum total compute unit fee if the publisher
         // hasn't updated for >= MAXIMUM_SLOT_GAP_FOR_DYNAMIC_COMPUTE_UNIT_PRICE slots.
         let result = GlobalStore::price_accounts(
-            state,
+            &*state,
             network,
             price_accounts.clone().into_iter().collect(),
         )
@@ -697,31 +697,33 @@ where
         network_state.blockhash,
     );
 
-    let signature = match client
-        .send_transaction_with_config(
-            &transaction,
-            RpcSendTransactionConfig {
-                skip_preflight: true,
-                ..RpcSendTransactionConfig::default()
-            },
-        )
-        .await
-    {
-        Ok(signature) => signature,
-        Err(err) => {
-            tracing::error!(err = ?err, "Exporter: failed to send transaction.");
-            return Ok(());
-        }
-    };
+    tokio::spawn(async move {
+        let signature = match client
+            .send_transaction_with_config(
+                &transaction,
+                RpcSendTransactionConfig {
+                    skip_preflight: true,
+                    ..RpcSendTransactionConfig::default()
+                },
+            )
+            .await
+        {
+            Ok(signature) => signature,
+            Err(err) => {
+                tracing::error!(err = ?err, "Exporter: failed to send transaction.");
+                return;
+            }
+        };
 
-    tracing::debug!(
-        signature = signature.to_string(),
-        instructions = instructions.len(),
-        price_accounts = ?price_accounts,
-        "Sent upd_price transaction.",
-    );
+        tracing::debug!(
+            signature = signature.to_string(),
+            instructions = instructions.len(),
+            price_accounts = ?price_accounts,
+            "Sent upd_price transaction.",
+        );
 
-    Transactions::add_transaction(state, signature).await;
+        Transactions::add_transaction(&*state, signature).await;
+    });
 
     Ok(())
 }