From efc17f54899b4c63b7faec1a5e87503b0430508a Mon Sep 17 00:00:00 2001 From: Reisen Date: Mon, 29 Jul 2024 10:32:20 +0100 Subject: [PATCH 1/5] fix: spawn transaction senders in exporter (#134) --- Cargo.toml | 2 +- rust-toolchain.toml | 2 +- src/agent/services/exporter.rs | 2 +- src/agent/state/exporter.rs | 62 ++++++++++++++++++---------------- 4 files changed, 35 insertions(+), 33 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f4fc094c..744d993f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.10.1" +version = "2.10.2" edition = "2021" [[bin]] diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 05e6ca1b..82eda2fc 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] -channel = "stable" +channel = "1.79.0" profile = "minimal" components = ["rustfmt", "clippy"] diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index 14c0ac94..1e6aadd1 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -262,7 +262,7 @@ mod exporter { config.exporter.unchanged_publish_threshold, ).await { if let Err(err) = publish_batches( - &*state, + state.clone(), client.clone(), network, &network_state_rx, diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index eb7c74a8..2bceded5 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -429,7 +429,7 @@ async fn estimate_compute_unit_price_micro_lamports( ) )] pub async fn publish_batches( - state: &S, + state: Arc, client: Arc, network: Network, network_state_rx: &watch::Receiver, @@ -466,7 +466,7 @@ where let network_state = *network_state_rx.borrow(); for batch in batches { batch_futures.push(publish_batch( - state, + state.clone(), client.clone(), network, network_state, @@ -494,7 +494,7 @@ where .into_iter() .collect::>>()?; - Exporter::record_publish(state, batch_state).await; + Exporter::record_publish(&*state, batch_state).await; Ok(()) } @@ -509,7 +509,7 @@ where ) )] async fn publish_batch( - state: &S, + state: Arc, client: Arc, network: Network, network_state: NetworkState, @@ -535,7 +535,7 @@ where let mut instructions = Vec::new(); // Refresh the data in the batch - let local_store_contents = LocalStore::get_all_price_infos(state).await; + let local_store_contents = LocalStore::get_all_price_infos(&*state).await; let refreshed_batch = batch.iter().map(|(identifier, _)| { ( identifier, @@ -615,7 +615,7 @@ where // Use the estimated previous price if it is higher // than the current price. let recent_compute_unit_price_micro_lamports = - Exporter::get_recent_compute_unit_price_micro_lamports(state).await; + Exporter::get_recent_compute_unit_price_micro_lamports(&*state).await; if let Some(estimated_recent_price) = recent_compute_unit_price_micro_lamports { // Get the estimated compute unit price and wrap it so it stays below the maximum @@ -633,7 +633,7 @@ where // in this batch. This will use the maximum total compute unit fee if the publisher // hasn't updated for >= MAXIMUM_SLOT_GAP_FOR_DYNAMIC_COMPUTE_UNIT_PRICE slots. let result = GlobalStore::price_accounts( - state, + &*state, network, price_accounts.clone().into_iter().collect(), ) @@ -697,31 +697,33 @@ where network_state.blockhash, ); - let signature = match client - .send_transaction_with_config( - &transaction, - RpcSendTransactionConfig { - skip_preflight: true, - ..RpcSendTransactionConfig::default() - }, - ) - .await - { - Ok(signature) => signature, - Err(err) => { - tracing::error!(err = ?err, "Exporter: failed to send transaction."); - return Ok(()); - } - }; + tokio::spawn(async move { + let signature = match client + .send_transaction_with_config( + &transaction, + RpcSendTransactionConfig { + skip_preflight: true, + ..RpcSendTransactionConfig::default() + }, + ) + .await + { + Ok(signature) => signature, + Err(err) => { + tracing::error!(err = ?err, "Exporter: failed to send transaction."); + return; + } + }; - tracing::debug!( - signature = signature.to_string(), - instructions = instructions.len(), - price_accounts = ?price_accounts, - "Sent upd_price transaction.", - ); + tracing::debug!( + signature = signature.to_string(), + instructions = instructions.len(), + price_accounts = ?price_accounts, + "Sent upd_price transaction.", + ); - Transactions::add_transaction(state, signature).await; + Transactions::add_transaction(&*state, signature).await; + }); Ok(()) } From 38521959a22dbdd143066937254ea35ac857b2ea Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Thu, 1 Aug 2024 13:17:40 +0200 Subject: [PATCH 2/5] fix: feed rpc responses and notifications instead of send and flush (#135) * fix: feed rpc respones and notifications instead of send and flush * refactor: use duration instead of _secs in config * chore: bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- config/config.toml | 16 ++++++++++++++-- src/agent/config.rs | 10 +++++++--- src/agent/pyth/rpc.rs | 40 +++++++++++++++++++++++++++++----------- src/bin/agent.rs | 5 +---- 6 files changed, 53 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b14e6af..4de4fd98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3400,7 +3400,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.10.1" +version = "2.10.3" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 744d993f..672bbd2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.10.2" +version = "2.10.3" edition = "2021" [[bin]] diff --git a/config/config.toml b/config/config.toml index 39b254b6..ea62bcfe 100644 --- a/config/config.toml +++ b/config/config.toml @@ -6,6 +6,18 @@ # connection is not exposed for unauthorized access. listen_address = "127.0.0.1:8910" +# Size of the buffer of each Server's channel on which `notify_price` events are +# received from the Price state. +# notify_price_tx_buffer = 10000 + +# Size of the buffer of each Server's channel on which `notify_price_sched` events are +# received from the Price state. +# notify_price_sched_tx_buffer = 10000 + +# Flush interval for responses and notifications. This is the maximum time the +# server will wait before flushing the messages to the client. +# flush_interval_duration = "50ms" + # Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint. [primary_network] ### Required fields ### @@ -186,8 +198,8 @@ key_store.mapping_key = "RelevantOracleMappingAddress" ## Configuration for OpenTelemetry ## [opentelemetry] -# Timeout in seconds for the OpenTelemetry exporter -exporter_timeout_secs = 3 +# Timeout duration for the OpenTelemetry exporter +exporter_timeout_duration = "3s" # Endpoint URL for the OpenTelemetry exporter exporter_endpoint = "http://127.0.0.1:4317" diff --git a/src/agent/config.rs b/src/agent/config.rs index 8fb22926..40cb234f 100644 --- a/src/agent/config.rs +++ b/src/agent/config.rs @@ -13,7 +13,10 @@ use { File, }, serde::Deserialize, - std::path::Path, + std::{ + path::Path, + time::Duration, + }, }; /// Configuration for all components of the Agent @@ -88,6 +91,7 @@ impl Default for ChannelCapacities { #[derive(Deserialize, Debug)] pub struct OpenTelemetryConfig { - pub exporter_timeout_secs: u64, - pub exporter_endpoint: String, + #[serde(with = "humantime_serde")] + pub exporter_timeout_duration: Duration, + pub exporter_endpoint: String, } diff --git a/src/agent/pyth/rpc.rs b/src/agent/pyth/rpc.rs index ea8b7a7c..ae52f0e4 100644 --- a/src/agent/pyth/rpc.rs +++ b/src/agent/pyth/rpc.rs @@ -48,6 +48,7 @@ use { fmt::Debug, net::SocketAddr, sync::Arc, + time::Duration, }, tokio::sync::mpsc, tracing::instrument, @@ -115,6 +116,7 @@ async fn handle_connection( state: Arc, notify_price_tx_buffer: usize, notify_price_sched_tx_buffer: usize, + flush_interval_duration: Duration, ) where S: state::Prices, S: Send, @@ -127,6 +129,8 @@ async fn handle_connection( let (mut notify_price_sched_tx, mut notify_price_sched_rx) = mpsc::channel(notify_price_sched_tx_buffer); + let mut flush_interval = tokio::time::interval(flush_interval_duration); + loop { if let Err(err) = handle_next( &*state, @@ -136,6 +140,7 @@ async fn handle_connection( &mut notify_price_rx, &mut notify_price_sched_tx, &mut notify_price_sched_rx, + &mut flush_interval, ) .await { @@ -159,6 +164,7 @@ async fn handle_next( notify_price_rx: &mut mpsc::Receiver, notify_price_sched_tx: &mut mpsc::Sender, notify_price_sched_rx: &mut mpsc::Receiver, + flush_interval: &mut tokio::time::Interval, ) -> Result<()> where S: state::Prices, @@ -183,13 +189,16 @@ where } } Some(notify_price) = notify_price_rx.recv() => { - send_notification(ws_tx, Method::NotifyPrice, Some(notify_price)) + feed_notification(ws_tx, Method::NotifyPrice, Some(notify_price)) .await } Some(notify_price_sched) = notify_price_sched_rx.recv() => { - send_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched)) + feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched)) .await } + _ = flush_interval.tick() => { + flush(ws_tx).await + } } } @@ -229,9 +238,9 @@ where // Send an array if we're handling a batch // request, single response object otherwise if is_batch { - send_text(ws_tx, &serde_json::to_string(&responses)?).await?; + feed_text(ws_tx, &serde_json::to_string(&responses)?).await?; } else { - send_text(ws_tx, &serde_json::to_string(&responses[0])?).await?; + feed_text(ws_tx, &serde_json::to_string(&responses[0])?).await?; } } // The top-level parsing errors are fine to share with client @@ -354,10 +363,10 @@ async fn send_error( error.to_string(), None, ); - send_text(ws_tx, &response.to_string()).await + feed_text(ws_tx, &response.to_string()).await } -async fn send_notification( +async fn feed_notification( ws_tx: &mut SplitSink, method: Method, params: Option, @@ -365,10 +374,10 @@ async fn send_notification( where T: Sized + Serialize + DeserializeOwned, { - send_request(ws_tx, IdReq::Notification, method, params).await + feed_request(ws_tx, IdReq::Notification, method, params).await } -async fn send_request( +async fn feed_request( ws_tx: &mut SplitSink, id: I, method: Method, @@ -379,16 +388,20 @@ where T: Sized + Serialize + DeserializeOwned, { let request = Request::with_params(id, method, params); - send_text(ws_tx, &request.to_string()).await + feed_text(ws_tx, &request.to_string()).await } -async fn send_text(ws_tx: &mut SplitSink, msg: &str) -> Result<()> { +async fn feed_text(ws_tx: &mut SplitSink, msg: &str) -> Result<()> { ws_tx - .send(Message::text(msg.to_string())) + .feed(Message::text(msg.to_string())) .await .map_err(|e| e.into()) } +async fn flush(ws_tx: &mut SplitSink) -> Result<()> { + ws_tx.flush().await.map_err(|e| e.into()) +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct Config { @@ -400,6 +413,9 @@ pub struct Config { /// Size of the buffer of each Server's channel on which `notify_price_sched` events are /// received from the Price state. pub notify_price_sched_tx_buffer: usize, + /// Flush interval duration for the notifications. + #[serde(with = "humantime_serde")] + pub flush_interval_duration: Duration, } impl Default for Config { @@ -408,6 +424,7 @@ impl Default for Config { listen_address: "127.0.0.1:8910".to_string(), notify_price_tx_buffer: 10000, notify_price_sched_tx_buffer: 10000, + flush_interval_duration: Duration::from_millis(50), } } } @@ -448,6 +465,7 @@ where state, config.notify_price_tx_buffer, config.notify_price_sched_tx_buffer, + config.flush_interval_duration, ) .await }) diff --git a/src/bin/agent.rs b/src/bin/agent.rs index 9312c2d6..2e1759b4 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -14,7 +14,6 @@ use { std::{ io::IsTerminal, path::PathBuf, - time::Duration, }, tracing_subscriber::{ prelude::*, @@ -65,9 +64,7 @@ async fn main() -> Result<()> { let otlp_exporter = opentelemetry_otlp::new_exporter() .tonic() .with_endpoint(&opentelemetry_config.exporter_endpoint) - .with_timeout(Duration::from_secs( - opentelemetry_config.exporter_timeout_secs, - )); + .with_timeout(opentelemetry_config.exporter_timeout_duration); // Set up the OpenTelemetry tracer let tracer = opentelemetry_otlp::new_pipeline() From 564441fb99841079c3a77ccd8b78ebe7ceea3d8a Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Fri, 2 Aug 2024 16:16:21 +0200 Subject: [PATCH 3/5] chore: update config (#136) --- config/config.sample.pythnet.toml | 7 ++++--- config/config.sample.pythtest.toml | 8 ++++---- config/config.toml | 6 +++--- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/config/config.sample.pythnet.toml b/config/config.sample.pythnet.toml index 8ea9a709..77046091 100644 --- a/config/config.sample.pythnet.toml +++ b/config/config.sample.pythnet.toml @@ -3,8 +3,8 @@ listen_address = "127.0.0.1:8910" [primary_network] -# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually rate-limited, so a private -# endpoint should be used in most cases. +# HTTP(S) endpoint of the RPC node. Public Pythnet RPC endpoints are usually +# rate-limited, so a private endpoint should be used in most cases. rpc_url = "https://api2.pythnet.pyth.network" # WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network. @@ -21,7 +21,8 @@ key_store.program_key = "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH" key_store.mapping_key = "AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J" # Pythnet accumulator key -# The pythnet accumulator key settings are only valid for pythnet. Do not apply these settings for any other environment i.e. mainnet, pythtest-conformance +# The pythnet accumulator key settings are only valid for pythnet. Do not apply +# these settings for any other environment i.e. mainnet, pythtest-conformance key_store.accumulator_key = "7Vbmv1jt4vyuqBZcpYPpnVhrqVe5e6ZPb6JxDcffRHUM" # IMPORTANT: Exporter batch size must be decreased to 7 to support diff --git a/config/config.sample.pythtest.toml b/config/config.sample.pythtest.toml index 2674801e..46a7d1f5 100644 --- a/config/config.sample.pythtest.toml +++ b/config/config.sample.pythtest.toml @@ -3,12 +3,12 @@ listen_address = "127.0.0.1:8910" [primary_network] -# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually rate-limited, so a private -# endpoint should be used in most cases. +# HTTP(S) endpoint of the RPC node. rpc_url = "https://api.pythtest.pyth.network" -# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network. -# This can be omitted when oracle.subscriber_enabled is set to false. +# WS(S) endpoint of the RRC node. This is used to subscribe to account changes +# on the network. This can be omitted when oracle.subscriber_enabled is set to +# false. wss_url = "wss://api.pythtest.pyth.network" # Path to your publishing keypair. diff --git a/config/config.toml b/config/config.toml index ea62bcfe..d29904ef 100644 --- a/config/config.toml +++ b/config/config.toml @@ -22,13 +22,13 @@ listen_address = "127.0.0.1:8910" [primary_network] ### Required fields ### -# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually rate-limited, so a private endpoint should be used in most cases. -# Note that api.pythtest.pyth.network is a private endpoint: please contact us for access. +# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually +# rate-limited for Pythnet, and so a private endpoint should be used in most +# cases. For Pythtest, the public endpoint can be used. rpc_url = "https://api.pythtest.pyth.network" # WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network. # This can be omitted when oracle.subscriber_enabled is set to false. -# Note that api.pythtest.pyth.network is a private endpoint: please contact us for access. wss_url = "wss://api.pythtest.pyth.network" # Path to the keypair used to publish price updates. If set to a From 1a07ac38bb1c1aa62ae43ecd2557f14b39ce8bf2 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Mon, 5 Aug 2024 17:30:27 +0200 Subject: [PATCH 4/5] chore: remove secondary network from sample config (#137) --- config/config.sample.pythnet.toml | 32 ------------------------------ config/config.sample.pythtest.toml | 25 ----------------------- 2 files changed, 57 deletions(-) diff --git a/config/config.sample.pythnet.toml b/config/config.sample.pythnet.toml index 77046091..aacf0649 100644 --- a/config/config.sample.pythnet.toml +++ b/config/config.sample.pythnet.toml @@ -32,38 +32,6 @@ exporter.max_batch_size = 7 # Duration of the interval at which to publish updates exporter.publish_interval_duration = "1s" -# Configuration for the optional secondary network this agent will publish data to. -# In most cases this should be a Solana endpoint. -[secondary_network] - -# Please use other endpoints as these are rate limited -rpc_url = "https://api.mainnet-beta.solana.com" -wss_url = "wss://api.mainnet-beta.solana.com" - -# Path to your publishing keypair. -key_store.publish_keypair_path = "/path/to/keypair.json" - -# Oracle program pubkey -key_store.program_key = "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH" - -# Oracle mapping pubkey -key_store.mapping_key = "AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J" - -# Compute unit limit requested per instruction for transactions that update the price. -# This should be an upper bound of the compute units a single upd_price instruction might consume. -# For solana mainnet, this should be set to 20000 instead of the default 40000 since there is no accumulator. -exporter.compute_unit_limit = 20000 - -# Whether the dynamic compute unit pricing is enabled. -# This is needed for solana to be able to land transactions on the network -# during periods of high network congestion. -exporter.dynamic_compute_unit_pricing_enabled = true - -# Price per compute unit offered for update_price transactions -exporter.compute_unit_price_micro_lamports = 1000 - -exporter.maximum_compute_unit_price_micro_lamports = 100000 - # Configuration for the JRPC API [pythd_adapter] diff --git a/config/config.sample.pythtest.toml b/config/config.sample.pythtest.toml index 46a7d1f5..e19d72ae 100644 --- a/config/config.sample.pythtest.toml +++ b/config/config.sample.pythtest.toml @@ -28,31 +28,6 @@ key_store.mapping_key = "AFmdnt9ng1uVxqCmqwQJDAYC5cKTkw8gJKSM5PnzuF6z" # conform # Duration of the interval at which to publish updates exporter.publish_interval_duration = "400ms" -# Configuration for the optional secondary network this agent will publish data to. -# In most cases this should be a Solana endpoint. -[secondary_network] - -# Please use other endpoints as these are rate limited -rpc_url = "https://api.testnet.solana.com" -wss_url = "wss://api.testnet.solana.com" - -# Path to your publishing keypair. -key_store.publish_keypair_path = "/path/to/keypair.json" - -# Oracle program pubkey -key_store.program_key = "8tfDNiaEyrV6Q1U4DEXrEigs9DoDtkugzFbybENEbCDz" - -# Oracle mapping pubkey -key_store.mapping_key = "AFmdnt9ng1uVxqCmqwQJDAYC5cKTkw8gJKSM5PnzuF6z" - -# Duration of the interval at which to publish updates. Default interval is 1 seconds. -# exporter.publish_interval_duration = "1s" - -# Price per compute unit offered for update_price transactions. -# This is needed for solana to be able to land transactions on the network -# during periods of high network congestion. -exporter.compute_unit_price_micro_lamports = 1000 - # Configuration for the JRPC API [pythd_adapter] From e4683c7d9cc1a6e8f1037bc7458aa5dacf1a0a18 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Wed, 14 Aug 2024 14:17:06 +0200 Subject: [PATCH 5/5] Update config.sample.pythnet.toml (#138) * Update config.sample.pythnet.toml * Update config.sample.pythnet.toml --- config/config.sample.pythnet.toml | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/config/config.sample.pythnet.toml b/config/config.sample.pythnet.toml index aacf0649..909779e4 100644 --- a/config/config.sample.pythnet.toml +++ b/config/config.sample.pythnet.toml @@ -20,17 +20,8 @@ key_store.program_key = "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH" # Oracle mapping pubkey key_store.mapping_key = "AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J" -# Pythnet accumulator key -# The pythnet accumulator key settings are only valid for pythnet. Do not apply -# these settings for any other environment i.e. mainnet, pythtest-conformance -key_store.accumulator_key = "7Vbmv1jt4vyuqBZcpYPpnVhrqVe5e6ZPb6JxDcffRHUM" - -# IMPORTANT: Exporter batch size must be decreased to 7 to support -# larger accumulator transactions, when accumulator_key is set. -exporter.max_batch_size = 7 - -# Duration of the interval at which to publish updates -exporter.publish_interval_duration = "1s" +# Compute unit per price update. +exporter.compute_unit_limit = 5000 # Configuration for the JRPC API [pythd_adapter]