Skip to content

Commit

Permalink
fix: spawn transaction senders in exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
Reisen committed Jul 29, 2024
1 parent 756495f commit 25565d9
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 32 deletions.
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[toolchain]
channel = "stable"
channel = "1.79.0"
profile = "minimal"
components = ["rustfmt", "clippy"]
2 changes: 1 addition & 1 deletion src/agent/services/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ mod exporter {
config.exporter.unchanged_publish_threshold,
).await {
if let Err(err) = publish_batches(
&*state,
state.clone(),
client.clone(),
network,
&network_state_rx,
Expand Down
62 changes: 32 additions & 30 deletions src/agent/state/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ async fn estimate_compute_unit_price_micro_lamports(
)
)]
pub async fn publish_batches<S>(
state: &S,
state: Arc<S>,
client: Arc<RpcClient>,
network: Network,
network_state_rx: &watch::Receiver<NetworkState>,
Expand Down Expand Up @@ -466,7 +466,7 @@ where
let network_state = *network_state_rx.borrow();
for batch in batches {
batch_futures.push(publish_batch(
state,
state.clone(),
client.clone(),
network,
network_state,
Expand Down Expand Up @@ -494,7 +494,7 @@ where
.into_iter()
.collect::<Result<Vec<_>>>()?;

Exporter::record_publish(state, batch_state).await;
Exporter::record_publish(&*state, batch_state).await;
Ok(())
}

Expand All @@ -509,7 +509,7 @@ where
)
)]
async fn publish_batch<S>(
state: &S,
state: Arc<S>,
client: Arc<RpcClient>,
network: Network,
network_state: NetworkState,
Expand All @@ -535,7 +535,7 @@ where
let mut instructions = Vec::new();

// Refresh the data in the batch
let local_store_contents = LocalStore::get_all_price_infos(state).await;
let local_store_contents = LocalStore::get_all_price_infos(&*state).await;
let refreshed_batch = batch.iter().map(|(identifier, _)| {
(
identifier,
Expand Down Expand Up @@ -615,7 +615,7 @@ where
// Use the estimated previous price if it is higher
// than the current price.
let recent_compute_unit_price_micro_lamports =
Exporter::get_recent_compute_unit_price_micro_lamports(state).await;
Exporter::get_recent_compute_unit_price_micro_lamports(&*state).await;

if let Some(estimated_recent_price) = recent_compute_unit_price_micro_lamports {
// Get the estimated compute unit price and wrap it so it stays below the maximum
Expand All @@ -633,7 +633,7 @@ where
// in this batch. This will use the maximum total compute unit fee if the publisher
// hasn't updated for >= MAXIMUM_SLOT_GAP_FOR_DYNAMIC_COMPUTE_UNIT_PRICE slots.
let result = GlobalStore::price_accounts(
state,
&*state,
network,
price_accounts.clone().into_iter().collect(),
)
Expand Down Expand Up @@ -697,31 +697,33 @@ where
network_state.blockhash,
);

let signature = match client
.send_transaction_with_config(
&transaction,
RpcSendTransactionConfig {
skip_preflight: true,
..RpcSendTransactionConfig::default()
},
)
.await
{
Ok(signature) => signature,
Err(err) => {
tracing::error!(err = ?err, "Exporter: failed to send transaction.");
return Ok(());
}
};
tokio::spawn(async move {
let signature = match client
.send_transaction_with_config(
&transaction,
RpcSendTransactionConfig {
skip_preflight: true,
..RpcSendTransactionConfig::default()
},
)
.await
{
Ok(signature) => signature,
Err(err) => {
tracing::error!(err = ?err, "Exporter: failed to send transaction.");
return;
}
};

tracing::debug!(
signature = signature.to_string(),
instructions = instructions.len(),
price_accounts = ?price_accounts,
"Sent upd_price transaction.",
);
tracing::debug!(
signature = signature.to_string(),
instructions = instructions.len(),
price_accounts = ?price_accounts,
"Sent upd_price transaction.",
);

Transactions::add_transaction(state, signature).await;
Transactions::add_transaction(&*state, signature).await;
});

Ok(())
}
Expand Down

0 comments on commit 25565d9

Please sign in to comment.