Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: spawn transaction senders in exporter #134

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "2.10.1"
version = "2.10.2"
edition = "2021"

[[bin]]
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[toolchain]
channel = "stable"
channel = "1.79.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you!

profile = "minimal"
components = ["rustfmt", "clippy"]
2 changes: 1 addition & 1 deletion src/agent/services/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ mod exporter {
config.exporter.unchanged_publish_threshold,
).await {
if let Err(err) = publish_batches(
&*state,
state.clone(),
client.clone(),
network,
&network_state_rx,
Expand Down
62 changes: 32 additions & 30 deletions src/agent/state/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ async fn estimate_compute_unit_price_micro_lamports(
)
)]
pub async fn publish_batches<S>(
state: &S,
state: Arc<S>,
client: Arc<RpcClient>,
network: Network,
network_state_rx: &watch::Receiver<NetworkState>,
Expand Down Expand Up @@ -466,7 +466,7 @@ where
let network_state = *network_state_rx.borrow();
for batch in batches {
batch_futures.push(publish_batch(
state,
state.clone(),
client.clone(),
network,
network_state,
Expand Down Expand Up @@ -494,7 +494,7 @@ where
.into_iter()
.collect::<Result<Vec<_>>>()?;

Exporter::record_publish(state, batch_state).await;
Exporter::record_publish(&*state, batch_state).await;
Ok(())
}

Expand All @@ -509,7 +509,7 @@ where
)
)]
async fn publish_batch<S>(
state: &S,
state: Arc<S>,
client: Arc<RpcClient>,
network: Network,
network_state: NetworkState,
Expand All @@ -535,7 +535,7 @@ where
let mut instructions = Vec::new();

// Refresh the data in the batch
let local_store_contents = LocalStore::get_all_price_infos(state).await;
let local_store_contents = LocalStore::get_all_price_infos(&*state).await;
let refreshed_batch = batch.iter().map(|(identifier, _)| {
(
identifier,
Expand Down Expand Up @@ -615,7 +615,7 @@ where
// Use the estimated previous price if it is higher
// than the current price.
let recent_compute_unit_price_micro_lamports =
Exporter::get_recent_compute_unit_price_micro_lamports(state).await;
Exporter::get_recent_compute_unit_price_micro_lamports(&*state).await;

if let Some(estimated_recent_price) = recent_compute_unit_price_micro_lamports {
// Get the estimated compute unit price and wrap it so it stays below the maximum
Expand All @@ -633,7 +633,7 @@ where
// in this batch. This will use the maximum total compute unit fee if the publisher
// hasn't updated for >= MAXIMUM_SLOT_GAP_FOR_DYNAMIC_COMPUTE_UNIT_PRICE slots.
let result = GlobalStore::price_accounts(
state,
&*state,
network,
price_accounts.clone().into_iter().collect(),
)
Expand Down Expand Up @@ -697,31 +697,33 @@ where
network_state.blockhash,
);

let signature = match client
.send_transaction_with_config(
&transaction,
RpcSendTransactionConfig {
skip_preflight: true,
..RpcSendTransactionConfig::default()
},
)
.await
{
Ok(signature) => signature,
Err(err) => {
tracing::error!(err = ?err, "Exporter: failed to send transaction.");
return Ok(());
}
};
tokio::spawn(async move {
let signature = match client
.send_transaction_with_config(
&transaction,
RpcSendTransactionConfig {
skip_preflight: true,
..RpcSendTransactionConfig::default()
},
)
.await
{
Ok(signature) => signature,
Err(err) => {
tracing::error!(err = ?err, "Exporter: failed to send transaction.");
return;
}
};

tracing::debug!(
signature = signature.to_string(),
instructions = instructions.len(),
price_accounts = ?price_accounts,
"Sent upd_price transaction.",
);
tracing::debug!(
signature = signature.to_string(),
instructions = instructions.len(),
price_accounts = ?price_accounts,
"Sent upd_price transaction.",
);

Transactions::add_transaction(state, signature).await;
Transactions::add_transaction(&*state, signature).await;
});

Ok(())
}
Expand Down
Loading