diff --git a/Cargo.lock b/Cargo.lock index 4bbc46f..4de4fd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3400,7 +3400,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.10.2" +version = "2.10.3" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 744d993..672bbd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.10.2" +version = "2.10.3" edition = "2021" [[bin]] diff --git a/config/config.sample.pythnet.toml b/config/config.sample.pythnet.toml index 8ea9a70..909779e 100644 --- a/config/config.sample.pythnet.toml +++ b/config/config.sample.pythnet.toml @@ -3,8 +3,8 @@ listen_address = "127.0.0.1:8910" [primary_network] -# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually rate-limited, so a private -# endpoint should be used in most cases. +# HTTP(S) endpoint of the RPC node. Public Pythnet RPC endpoints are usually +# rate-limited, so a private endpoint should be used in most cases. rpc_url = "https://api2.pythnet.pyth.network" # WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network. @@ -20,48 +20,8 @@ key_store.program_key = "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH" # Oracle mapping pubkey key_store.mapping_key = "AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J" -# Pythnet accumulator key -# The pythnet accumulator key settings are only valid for pythnet. Do not apply these settings for any other environment i.e. mainnet, pythtest-conformance -key_store.accumulator_key = "7Vbmv1jt4vyuqBZcpYPpnVhrqVe5e6ZPb6JxDcffRHUM" - -# IMPORTANT: Exporter batch size must be decreased to 7 to support -# larger accumulator transactions, when accumulator_key is set. -exporter.max_batch_size = 7 - -# Duration of the interval at which to publish updates -exporter.publish_interval_duration = "1s" - -# Configuration for the optional secondary network this agent will publish data to. -# In most cases this should be a Solana endpoint. -[secondary_network] - -# Please use other endpoints as these are rate limited -rpc_url = "https://api.mainnet-beta.solana.com" -wss_url = "wss://api.mainnet-beta.solana.com" - -# Path to your publishing keypair. -key_store.publish_keypair_path = "/path/to/keypair.json" - -# Oracle program pubkey -key_store.program_key = "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH" - -# Oracle mapping pubkey -key_store.mapping_key = "AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J" - -# Compute unit limit requested per instruction for transactions that update the price. -# This should be an upper bound of the compute units a single upd_price instruction might consume. -# For solana mainnet, this should be set to 20000 instead of the default 40000 since there is no accumulator. -exporter.compute_unit_limit = 20000 - -# Whether the dynamic compute unit pricing is enabled. -# This is needed for solana to be able to land transactions on the network -# during periods of high network congestion. -exporter.dynamic_compute_unit_pricing_enabled = true - -# Price per compute unit offered for update_price transactions -exporter.compute_unit_price_micro_lamports = 1000 - -exporter.maximum_compute_unit_price_micro_lamports = 100000 +# Compute unit per price update. +exporter.compute_unit_limit = 5000 # Configuration for the JRPC API [pythd_adapter] diff --git a/config/config.sample.pythtest.toml b/config/config.sample.pythtest.toml index 2674801..e19d72a 100644 --- a/config/config.sample.pythtest.toml +++ b/config/config.sample.pythtest.toml @@ -3,12 +3,12 @@ listen_address = "127.0.0.1:8910" [primary_network] -# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually rate-limited, so a private -# endpoint should be used in most cases. +# HTTP(S) endpoint of the RPC node. rpc_url = "https://api.pythtest.pyth.network" -# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network. -# This can be omitted when oracle.subscriber_enabled is set to false. +# WS(S) endpoint of the RRC node. This is used to subscribe to account changes +# on the network. This can be omitted when oracle.subscriber_enabled is set to +# false. wss_url = "wss://api.pythtest.pyth.network" # Path to your publishing keypair. @@ -28,31 +28,6 @@ key_store.mapping_key = "AFmdnt9ng1uVxqCmqwQJDAYC5cKTkw8gJKSM5PnzuF6z" # conform # Duration of the interval at which to publish updates exporter.publish_interval_duration = "400ms" -# Configuration for the optional secondary network this agent will publish data to. -# In most cases this should be a Solana endpoint. -[secondary_network] - -# Please use other endpoints as these are rate limited -rpc_url = "https://api.testnet.solana.com" -wss_url = "wss://api.testnet.solana.com" - -# Path to your publishing keypair. -key_store.publish_keypair_path = "/path/to/keypair.json" - -# Oracle program pubkey -key_store.program_key = "8tfDNiaEyrV6Q1U4DEXrEigs9DoDtkugzFbybENEbCDz" - -# Oracle mapping pubkey -key_store.mapping_key = "AFmdnt9ng1uVxqCmqwQJDAYC5cKTkw8gJKSM5PnzuF6z" - -# Duration of the interval at which to publish updates. Default interval is 1 seconds. -# exporter.publish_interval_duration = "1s" - -# Price per compute unit offered for update_price transactions. -# This is needed for solana to be able to land transactions on the network -# during periods of high network congestion. -exporter.compute_unit_price_micro_lamports = 1000 - # Configuration for the JRPC API [pythd_adapter] diff --git a/config/config.toml b/config/config.toml index 39b254b..d29904e 100644 --- a/config/config.toml +++ b/config/config.toml @@ -6,17 +6,29 @@ # connection is not exposed for unauthorized access. listen_address = "127.0.0.1:8910" +# Size of the buffer of each Server's channel on which `notify_price` events are +# received from the Price state. +# notify_price_tx_buffer = 10000 + +# Size of the buffer of each Server's channel on which `notify_price_sched` events are +# received from the Price state. +# notify_price_sched_tx_buffer = 10000 + +# Flush interval for responses and notifications. This is the maximum time the +# server will wait before flushing the messages to the client. +# flush_interval_duration = "50ms" + # Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint. [primary_network] ### Required fields ### -# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually rate-limited, so a private endpoint should be used in most cases. -# Note that api.pythtest.pyth.network is a private endpoint: please contact us for access. +# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually +# rate-limited for Pythnet, and so a private endpoint should be used in most +# cases. For Pythtest, the public endpoint can be used. rpc_url = "https://api.pythtest.pyth.network" # WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network. # This can be omitted when oracle.subscriber_enabled is set to false. -# Note that api.pythtest.pyth.network is a private endpoint: please contact us for access. wss_url = "wss://api.pythtest.pyth.network" # Path to the keypair used to publish price updates. If set to a @@ -186,8 +198,8 @@ key_store.mapping_key = "RelevantOracleMappingAddress" ## Configuration for OpenTelemetry ## [opentelemetry] -# Timeout in seconds for the OpenTelemetry exporter -exporter_timeout_secs = 3 +# Timeout duration for the OpenTelemetry exporter +exporter_timeout_duration = "3s" # Endpoint URL for the OpenTelemetry exporter exporter_endpoint = "http://127.0.0.1:4317" diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 05e6ca1..82eda2f 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] -channel = "stable" +channel = "1.79.0" profile = "minimal" components = ["rustfmt", "clippy"] diff --git a/src/agent/config.rs b/src/agent/config.rs index 8fb2292..40cb234 100644 --- a/src/agent/config.rs +++ b/src/agent/config.rs @@ -13,7 +13,10 @@ use { File, }, serde::Deserialize, - std::path::Path, + std::{ + path::Path, + time::Duration, + }, }; /// Configuration for all components of the Agent @@ -88,6 +91,7 @@ impl Default for ChannelCapacities { #[derive(Deserialize, Debug)] pub struct OpenTelemetryConfig { - pub exporter_timeout_secs: u64, - pub exporter_endpoint: String, + #[serde(with = "humantime_serde")] + pub exporter_timeout_duration: Duration, + pub exporter_endpoint: String, } diff --git a/src/agent/pyth/rpc.rs b/src/agent/pyth/rpc.rs index ea8b7a7..ae52f0e 100644 --- a/src/agent/pyth/rpc.rs +++ b/src/agent/pyth/rpc.rs @@ -48,6 +48,7 @@ use { fmt::Debug, net::SocketAddr, sync::Arc, + time::Duration, }, tokio::sync::mpsc, tracing::instrument, @@ -115,6 +116,7 @@ async fn handle_connection( state: Arc, notify_price_tx_buffer: usize, notify_price_sched_tx_buffer: usize, + flush_interval_duration: Duration, ) where S: state::Prices, S: Send, @@ -127,6 +129,8 @@ async fn handle_connection( let (mut notify_price_sched_tx, mut notify_price_sched_rx) = mpsc::channel(notify_price_sched_tx_buffer); + let mut flush_interval = tokio::time::interval(flush_interval_duration); + loop { if let Err(err) = handle_next( &*state, @@ -136,6 +140,7 @@ async fn handle_connection( &mut notify_price_rx, &mut notify_price_sched_tx, &mut notify_price_sched_rx, + &mut flush_interval, ) .await { @@ -159,6 +164,7 @@ async fn handle_next( notify_price_rx: &mut mpsc::Receiver, notify_price_sched_tx: &mut mpsc::Sender, notify_price_sched_rx: &mut mpsc::Receiver, + flush_interval: &mut tokio::time::Interval, ) -> Result<()> where S: state::Prices, @@ -183,13 +189,16 @@ where } } Some(notify_price) = notify_price_rx.recv() => { - send_notification(ws_tx, Method::NotifyPrice, Some(notify_price)) + feed_notification(ws_tx, Method::NotifyPrice, Some(notify_price)) .await } Some(notify_price_sched) = notify_price_sched_rx.recv() => { - send_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched)) + feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched)) .await } + _ = flush_interval.tick() => { + flush(ws_tx).await + } } } @@ -229,9 +238,9 @@ where // Send an array if we're handling a batch // request, single response object otherwise if is_batch { - send_text(ws_tx, &serde_json::to_string(&responses)?).await?; + feed_text(ws_tx, &serde_json::to_string(&responses)?).await?; } else { - send_text(ws_tx, &serde_json::to_string(&responses[0])?).await?; + feed_text(ws_tx, &serde_json::to_string(&responses[0])?).await?; } } // The top-level parsing errors are fine to share with client @@ -354,10 +363,10 @@ async fn send_error( error.to_string(), None, ); - send_text(ws_tx, &response.to_string()).await + feed_text(ws_tx, &response.to_string()).await } -async fn send_notification( +async fn feed_notification( ws_tx: &mut SplitSink, method: Method, params: Option, @@ -365,10 +374,10 @@ async fn send_notification( where T: Sized + Serialize + DeserializeOwned, { - send_request(ws_tx, IdReq::Notification, method, params).await + feed_request(ws_tx, IdReq::Notification, method, params).await } -async fn send_request( +async fn feed_request( ws_tx: &mut SplitSink, id: I, method: Method, @@ -379,16 +388,20 @@ where T: Sized + Serialize + DeserializeOwned, { let request = Request::with_params(id, method, params); - send_text(ws_tx, &request.to_string()).await + feed_text(ws_tx, &request.to_string()).await } -async fn send_text(ws_tx: &mut SplitSink, msg: &str) -> Result<()> { +async fn feed_text(ws_tx: &mut SplitSink, msg: &str) -> Result<()> { ws_tx - .send(Message::text(msg.to_string())) + .feed(Message::text(msg.to_string())) .await .map_err(|e| e.into()) } +async fn flush(ws_tx: &mut SplitSink) -> Result<()> { + ws_tx.flush().await.map_err(|e| e.into()) +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct Config { @@ -400,6 +413,9 @@ pub struct Config { /// Size of the buffer of each Server's channel on which `notify_price_sched` events are /// received from the Price state. pub notify_price_sched_tx_buffer: usize, + /// Flush interval duration for the notifications. + #[serde(with = "humantime_serde")] + pub flush_interval_duration: Duration, } impl Default for Config { @@ -408,6 +424,7 @@ impl Default for Config { listen_address: "127.0.0.1:8910".to_string(), notify_price_tx_buffer: 10000, notify_price_sched_tx_buffer: 10000, + flush_interval_duration: Duration::from_millis(50), } } } @@ -448,6 +465,7 @@ where state, config.notify_price_tx_buffer, config.notify_price_sched_tx_buffer, + config.flush_interval_duration, ) .await }) diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index 14c0ac9..1e6aadd 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -262,7 +262,7 @@ mod exporter { config.exporter.unchanged_publish_threshold, ).await { if let Err(err) = publish_batches( - &*state, + state.clone(), client.clone(), network, &network_state_rx, diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index eb7c74a..2bceded 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -429,7 +429,7 @@ async fn estimate_compute_unit_price_micro_lamports( ) )] pub async fn publish_batches( - state: &S, + state: Arc, client: Arc, network: Network, network_state_rx: &watch::Receiver, @@ -466,7 +466,7 @@ where let network_state = *network_state_rx.borrow(); for batch in batches { batch_futures.push(publish_batch( - state, + state.clone(), client.clone(), network, network_state, @@ -494,7 +494,7 @@ where .into_iter() .collect::>>()?; - Exporter::record_publish(state, batch_state).await; + Exporter::record_publish(&*state, batch_state).await; Ok(()) } @@ -509,7 +509,7 @@ where ) )] async fn publish_batch( - state: &S, + state: Arc, client: Arc, network: Network, network_state: NetworkState, @@ -535,7 +535,7 @@ where let mut instructions = Vec::new(); // Refresh the data in the batch - let local_store_contents = LocalStore::get_all_price_infos(state).await; + let local_store_contents = LocalStore::get_all_price_infos(&*state).await; let refreshed_batch = batch.iter().map(|(identifier, _)| { ( identifier, @@ -615,7 +615,7 @@ where // Use the estimated previous price if it is higher // than the current price. let recent_compute_unit_price_micro_lamports = - Exporter::get_recent_compute_unit_price_micro_lamports(state).await; + Exporter::get_recent_compute_unit_price_micro_lamports(&*state).await; if let Some(estimated_recent_price) = recent_compute_unit_price_micro_lamports { // Get the estimated compute unit price and wrap it so it stays below the maximum @@ -633,7 +633,7 @@ where // in this batch. This will use the maximum total compute unit fee if the publisher // hasn't updated for >= MAXIMUM_SLOT_GAP_FOR_DYNAMIC_COMPUTE_UNIT_PRICE slots. let result = GlobalStore::price_accounts( - state, + &*state, network, price_accounts.clone().into_iter().collect(), ) @@ -697,31 +697,33 @@ where network_state.blockhash, ); - let signature = match client - .send_transaction_with_config( - &transaction, - RpcSendTransactionConfig { - skip_preflight: true, - ..RpcSendTransactionConfig::default() - }, - ) - .await - { - Ok(signature) => signature, - Err(err) => { - tracing::error!(err = ?err, "Exporter: failed to send transaction."); - return Ok(()); - } - }; + tokio::spawn(async move { + let signature = match client + .send_transaction_with_config( + &transaction, + RpcSendTransactionConfig { + skip_preflight: true, + ..RpcSendTransactionConfig::default() + }, + ) + .await + { + Ok(signature) => signature, + Err(err) => { + tracing::error!(err = ?err, "Exporter: failed to send transaction."); + return; + } + }; - tracing::debug!( - signature = signature.to_string(), - instructions = instructions.len(), - price_accounts = ?price_accounts, - "Sent upd_price transaction.", - ); + tracing::debug!( + signature = signature.to_string(), + instructions = instructions.len(), + price_accounts = ?price_accounts, + "Sent upd_price transaction.", + ); - Transactions::add_transaction(state, signature).await; + Transactions::add_transaction(&*state, signature).await; + }); Ok(()) } diff --git a/src/bin/agent.rs b/src/bin/agent.rs index 9312c2d..2e1759b 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -14,7 +14,6 @@ use { std::{ io::IsTerminal, path::PathBuf, - time::Duration, }, tracing_subscriber::{ prelude::*, @@ -65,9 +64,7 @@ async fn main() -> Result<()> { let otlp_exporter = opentelemetry_otlp::new_exporter() .tonic() .with_endpoint(&opentelemetry_config.exporter_endpoint) - .with_timeout(Duration::from_secs( - opentelemetry_config.exporter_timeout_secs, - )); + .with_timeout(opentelemetry_config.exporter_timeout_duration); // Set up the OpenTelemetry tracer let tracer = opentelemetry_otlp::new_pipeline()