diff --git a/Cargo.lock b/Cargo.lock index 7741ae1..3ee7d4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,6 +424,28 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.55", +] + [[package]] name = "async-trait" version = "0.1.79" @@ -461,6 +483,51 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -1792,6 +1859,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "goblin" version = "0.5.4" @@ -2058,6 +2131,18 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -2408,6 +2493,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.7.2" @@ -2768,6 +2859,81 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "opentelemetry" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b69a91d4893e713e06f724597ad630f1fa76057a5e1026c0ca67054a9032a76" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a94c69209c05319cdf7460c6d4c055ed102be242a0a6245835d7bc42c6ec7f54" +dependencies = [ + "async-trait", + "futures-core", + "http", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "984806e6cf27f2b49282e2a05e288f30594f3dbc74eb7a6e99422bc48ed78162" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae312d58eaa90a82d2e627fd86e075cf5230b3f11794e2ed74199ebbe572d4fd" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "lazy_static", + "once_cell", + "opentelemetry", + "ordered-float", + "percent-encoding", + "rand 0.8.5", + "thiserror", + "tokio", + "tokio-stream", +] + +[[package]] +name = "ordered-float" +version = "4.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ff2cf528c6c03d9ed653d6c4ce1dc0582dc4af309790ad92f07c1cd551b0be" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.6.0" @@ -3209,9 +3375,32 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 2.0.55", +] + [[package]] name = "pyth-agent" -version = "2.9.0" +version = "2.10.0" dependencies = [ "anyhow", "async-trait", @@ -3227,6 +3416,9 @@ dependencies = [ "iobuffer", "jrpc", "lazy_static", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "parking_lot", "portpicker", "prometheus-client", @@ -3248,6 +3440,7 @@ dependencies = [ "tokio-util", "toml_edit 0.22.9", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "typed-html", "warp", @@ -5565,6 +5758,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -5706,6 +5909,59 @@ dependencies = [ "winnow 0.6.5", ] +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -5756,6 +6012,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f68803492bf28ab40aeccaecc7021096bd256baf7ca77c3d425d89b35a7be4e4" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -6141,6 +6415,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.24.0" diff --git a/Cargo.toml b/Cargo.toml index 489ec98..a239c9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.9.0" +version = "2.10.0" edition = "2021" [[bin]] @@ -52,6 +52,10 @@ winnow = "0.6.5" proptest = "1.4.0" tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } +tracing-opentelemetry = "0.24.0" +opentelemetry = "0.23.0" +opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"]} +opentelemetry-otlp = { version = "0.16.0" } [dev-dependencies] tokio-util = { version = "0.7.10", features = ["full"] } diff --git a/config/config.toml b/config/config.toml index 8f35ade..39b254b 100644 --- a/config/config.toml +++ b/config/config.toml @@ -181,3 +181,13 @@ key_store.mapping_key = "RelevantOracleMappingAddress" # publish data to. In most cases this should be a Solana endpoint. The # options correspond to the ones in primary_network # [secondary_network] + + +## Configuration for OpenTelemetry ## +[opentelemetry] + +# Timeout in seconds for the OpenTelemetry exporter +exporter_timeout_secs = 3 + +# Endpoint URL for the OpenTelemetry exporter +exporter_endpoint = "http://127.0.0.1:4317" diff --git a/src/agent.rs b/src/agent.rs index 0395401..7096423 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -72,6 +72,7 @@ use { lazy_static::lazy_static, std::sync::Arc, tokio::sync::watch, + tracing::instrument, }; pub mod config; @@ -121,58 +122,59 @@ impl Agent { }; } + #[instrument(skip(self))] async fn spawn(&self) -> Result<()> { // job handles - let mut jhs = vec![]; + let mut handles = vec![]; // Create the Application State. let state = Arc::new(state::State::new(&self.config).await); // Spawn the primary network Oracle. - jhs.push(tokio::spawn(services::oracle( + handles.extend(services::oracle( self.config.primary_network.clone(), network::Network::Primary, state.clone(), - ))); + )); - jhs.push(tokio::spawn(services::exporter( + handles.extend(services::exporter( self.config.primary_network.clone(), network::Network::Primary, state.clone(), - ))); + )); // Spawn the secondary network Oracle, if needed. if let Some(config) = &self.config.secondary_network { - jhs.push(tokio::spawn(services::oracle( + handles.extend(services::oracle( config.clone(), network::Network::Secondary, state.clone(), - ))); + )); - jhs.push(tokio::spawn(services::exporter( + handles.extend(services::exporter( config.clone(), network::Network::Secondary, state.clone(), - ))); + )); } // Create the Notifier task for the Pythd RPC. - jhs.push(tokio::spawn(services::notifier(state.clone()))); + handles.push(tokio::spawn(services::notifier(state.clone()))); // Spawn the Pythd API Server - jhs.push(tokio::spawn(rpc::run( + handles.push(tokio::spawn(rpc::run( self.config.pythd_api_server.clone(), state.clone(), ))); // Spawn the metrics server - jhs.push(tokio::spawn(metrics::spawn( + handles.push(tokio::spawn(metrics::spawn( self.config.metrics_server.bind_address, ))); // Spawn the remote keypair loader endpoint for both networks - jhs.append( - &mut services::keypairs( + handles.extend( + services::keypairs( self.config.primary_network.rpc_url.clone(), self.config .secondary_network @@ -185,7 +187,7 @@ impl Agent { ); // Wait for all tasks to complete - join_all(jhs).await; + join_all(handles).await; Ok(()) } diff --git a/src/agent/config.rs b/src/agent/config.rs index d43d6ba..8fb2292 100644 --- a/src/agent/config.rs +++ b/src/agent/config.rs @@ -32,6 +32,7 @@ pub struct Config { pub metrics_server: metrics::Config, #[serde(default)] pub remote_keypair_loader: services::keypairs::Config, + pub opentelemetry: Option, } impl Config { @@ -83,3 +84,10 @@ impl Default for ChannelCapacities { } } } + + +#[derive(Deserialize, Debug)] +pub struct OpenTelemetryConfig { + pub exporter_timeout_secs: u64, + pub exporter_endpoint: String, +} diff --git a/src/agent/pyth/rpc.rs b/src/agent/pyth/rpc.rs index 8d705a4..ea8b7a7 100644 --- a/src/agent/pyth/rpc.rs +++ b/src/agent/pyth/rpc.rs @@ -50,6 +50,7 @@ use { sync::Arc, }, tokio::sync::mpsc, + tracing::instrument, warp::{ ws::{ Message, @@ -411,6 +412,7 @@ impl Default for Config { } } +#[instrument(skip_all)] pub async fn run(config: Config, state: Arc) where S: state::Prices, diff --git a/src/agent/pyth/rpc/get_all_products.rs b/src/agent/pyth/rpc/get_all_products.rs index 7a34278..47dbf31 100644 --- a/src/agent/pyth/rpc/get_all_products.rs +++ b/src/agent/pyth/rpc/get_all_products.rs @@ -1,8 +1,10 @@ use { crate::agent::state, anyhow::Result, + tracing::instrument, }; +#[instrument(skip_all)] pub async fn get_all_products(state: &S) -> Result where S: state::Prices, diff --git a/src/agent/pyth/rpc/get_product.rs b/src/agent/pyth/rpc/get_product.rs index 19c69a5..07fabf5 100644 --- a/src/agent/pyth/rpc/get_product.rs +++ b/src/agent/pyth/rpc/get_product.rs @@ -12,8 +12,10 @@ use { Request, Value, }, + tracing::instrument, }; +#[instrument(skip_all, fields(account))] pub async fn get_product( state: &S, request: &Request, @@ -27,6 +29,8 @@ where }?; let account = params.account.parse::()?; + tracing::Span::current().record("account", account.to_string()); + let product = state.get_product(&account).await?; Ok(serde_json::to_value(product)?) } diff --git a/src/agent/pyth/rpc/get_product_list.rs b/src/agent/pyth/rpc/get_product_list.rs index 30cde6e..44c57a3 100644 --- a/src/agent/pyth/rpc/get_product_list.rs +++ b/src/agent/pyth/rpc/get_product_list.rs @@ -1,8 +1,10 @@ use { crate::agent::state, anyhow::Result, + tracing::instrument, }; +#[instrument(skip_all)] pub async fn get_product_list(state: &S) -> Result where S: state::Prices, diff --git a/src/agent/pyth/rpc/subscribe_price.rs b/src/agent/pyth/rpc/subscribe_price.rs index 5936505..73a8389 100644 --- a/src/agent/pyth/rpc/subscribe_price.rs +++ b/src/agent/pyth/rpc/subscribe_price.rs @@ -15,8 +15,10 @@ use { Value, }, tokio::sync::mpsc, + tracing::instrument, }; +#[instrument(skip_all, fields(account))] pub async fn subscribe_price( state: &S, notify_price_tx: &mpsc::Sender, @@ -33,6 +35,8 @@ where )?; let account = params.account.parse::()?; + tracing::Span::current().record("account", account.to_string()); + let subscription = state .subscribe_price(&account, notify_price_tx.clone()) .await; diff --git a/src/agent/pyth/rpc/subscribe_price_sched.rs b/src/agent/pyth/rpc/subscribe_price_sched.rs index 608a489..e3be8c3 100644 --- a/src/agent/pyth/rpc/subscribe_price_sched.rs +++ b/src/agent/pyth/rpc/subscribe_price_sched.rs @@ -15,8 +15,10 @@ use { Value, }, tokio::sync::mpsc, + tracing::instrument, }; +#[instrument(skip_all, fields(account))] pub async fn subscribe_price_sched( state: &S, notify_price_sched_tx: &mpsc::Sender, @@ -33,6 +35,8 @@ where )?; let account = params.account.parse::()?; + tracing::Span::current().record("account", account.to_string()); + let subscription = state .subscribe_price_sched(&account, notify_price_sched_tx.clone()) .await; diff --git a/src/agent/pyth/rpc/update_price.rs b/src/agent/pyth/rpc/update_price.rs index c5748af..06b11f9 100644 --- a/src/agent/pyth/rpc/update_price.rs +++ b/src/agent/pyth/rpc/update_price.rs @@ -12,8 +12,10 @@ use { Request, Value, }, + tracing::instrument, }; +#[instrument(skip_all, fields(account))] pub async fn update_price( state: &S, request: &Request, @@ -28,6 +30,8 @@ where .ok_or_else(|| anyhow!("Missing request parameters"))?, )?; + tracing::Span::current().record("account", params.account.to_string()); + state .update_local_price( ¶ms.account.parse::()?, diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index d7fe47a..14c0ac9 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -23,8 +23,10 @@ use { }, tokio::{ sync::watch, + task::JoinHandle, time::Interval, }, + tracing::instrument, }; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -118,6 +120,13 @@ struct NetworkStateQuerier { } impl NetworkStateQuerier { + #[instrument( + skip(rpc_endpoint, rpc_timeout, query_interval), + fields( + rpc_timeout = rpc_timeout.as_millis(), + query_interval = query_interval.period().as_millis(), + ) + )] pub fn new( rpc_endpoint: &str, rpc_timeout: Duration, @@ -140,6 +149,7 @@ impl NetworkStateQuerier { } } + #[instrument(skip(self))] async fn query_network_state(&mut self) -> Result<()> { // Fetch the blockhash and current slot in parallel let current_slot_future = self @@ -160,12 +170,15 @@ impl NetworkStateQuerier { } } -pub async fn exporter(config: network::Config, network: Network, state: Arc) +#[instrument(skip(config, state))] +pub fn exporter(config: network::Config, network: Network, state: Arc) -> Vec> where S: Exporter, S: Transactions, S: Send + Sync + 'static, { + let mut handles = Vec::new(); + // Create and spawn the network state querier let (network_state_tx, network_state_rx) = watch::channel(Default::default()); let mut network_state_querier = NetworkStateQuerier::new( @@ -175,12 +188,23 @@ where network_state_tx, ); - tokio::spawn(transaction_monitor::transaction_monitor( + handles.push(tokio::spawn(transaction_monitor::transaction_monitor( config.clone(), state.clone(), + ))); + + handles.push(tokio::spawn(exporter::exporter( + config, + network, + state, + network_state_rx, + ))); + + handles.push(tokio::spawn( + async move { network_state_querier.run().await }, )); - tokio::spawn(exporter::exporter(config, network, state, network_state_rx)); - tokio::spawn(async move { network_state_querier.run().await }); + + handles } mod exporter { @@ -294,6 +318,7 @@ mod transaction_monitor { sync::Arc, time::Duration, }, + tracing::instrument, }; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -318,6 +343,7 @@ mod transaction_monitor { } } + #[instrument(skip(config, state))] pub async fn transaction_monitor(config: network::Config, state: Arc) where S: Transactions, diff --git a/src/agent/services/notifier.rs b/src/agent/services/notifier.rs index aecc682..0f4e4a2 100644 --- a/src/agent/services/notifier.rs +++ b/src/agent/services/notifier.rs @@ -6,8 +6,10 @@ use { crate::agent::state::Prices, std::sync::Arc, + tracing::instrument, }; +#[instrument(skip(state))] pub async fn notifier(state: Arc) where S: Prices, diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index 9cfb523..a9d5237 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -38,30 +38,35 @@ use { Instant, }, }, + tokio::task::JoinHandle, tokio_stream::StreamExt, + tracing::instrument, }; -pub async fn oracle(config: Config, network: Network, state: Arc) +#[instrument(skip(config, state))] +pub fn oracle(config: Config, network: Network, state: Arc) -> Vec> where S: Oracle, S: Send + Sync + 'static, { + let mut handles = Vec::new(); + let Ok(key_store) = KeyStore::new(config.key_store.clone()) else { tracing::warn!("Key store not available, Oracle won't start."); - return; + return handles; }; - tokio::spawn(poller( + handles.push(tokio::spawn(poller( config.clone(), network, state.clone(), key_store.mapping_key, key_store.publish_keypair, config.oracle.max_lookup_batch_size, - )); + ))); if config.oracle.subscriber_enabled { - tokio::spawn(async move { + handles.push(tokio::spawn(async move { loop { let current_time = Instant::now(); if let Err(ref err) = subscriber( @@ -79,8 +84,10 @@ where } } } - }); + })); } + + handles } /// When an account RPC Subscription update is receiveed. @@ -88,6 +95,7 @@ where /// We check if the account is one we're aware of and tracking, and if so, spawn /// a small background task that handles that update. We only do this for price /// accounts, all other accounts are handled below in the poller. +#[instrument(skip(config, state))] async fn subscriber( config: Config, network: Network, @@ -144,6 +152,7 @@ where } /// On poll lookup all Pyth Mapping/Product/Price accounts and sync. +#[instrument(skip(config, publish_keypair, state))] async fn poller( config: Config, network: Network, diff --git a/src/agent/state/api.rs b/src/agent/state/api.rs index 3ea1308..dc92f5d 100644 --- a/src/agent/state/api.rs +++ b/src/agent/state/api.rs @@ -53,6 +53,7 @@ use { mpsc, RwLock, }, + tracing::instrument, }; // TODO: implement Display on PriceStatus and then just call PriceStatus::to_string @@ -382,6 +383,10 @@ where .map_err(|_| anyhow!("failed to send update to local store")) } + #[instrument(skip(self, update), fields(update = match update { + Update::ProductAccountUpdate { account_key, .. } => account_key, + Update::PriceAccountUpdate { account_key, .. } => account_key, + }.to_string()))] async fn update_global_price(&self, network: Network, update: &Update) -> Result<()> { GlobalStore::update(self, network, update) .await diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index 3e8585f..eb7c74a 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -53,6 +53,7 @@ use { watch, RwLock, }, + tracing::instrument, }; const PYTH_ORACLE_VERSION: u32 = 2; @@ -146,6 +147,14 @@ where .extend(batch_state); } + #[instrument( + skip_all, + fields( + publish_keypair = publish_keypair.pubkey().to_string(), + staleness_threshold = staleness_threshold.as_millis(), + unchanged_publish_threshold = unchanged_publish_threshold.as_millis(), + ) + )] async fn get_permissioned_updates( &self, publish_keypair: &Keypair, @@ -254,6 +263,14 @@ where .await } + #[instrument( + skip_all, + fields( + publish_keypair = publish_keypair.pubkey().to_string(), + staleness_threshold = staleness_threshold.as_millis(), + unchanged_publish_threshold = unchanged_publish_threshold.as_millis(), + ) + )] async fn update_recent_compute_unit_price( &self, publish_keypair: &Keypair, @@ -283,6 +300,7 @@ where Ok(()) } + #[instrument(skip(self, publish_keypair, publisher_permissions))] async fn update_permissions( &self, network: Network, @@ -305,6 +323,7 @@ where } } +#[instrument(skip(state, publish_keypair))] pub async fn get_publish_keypair( state: &S, network: Network, @@ -402,6 +421,13 @@ async fn estimate_compute_unit_price_micro_lamports( /// - Degrade gracefully if the blockchain RPC node exhibits poor performance. If the RPC node takes a long /// time to respond, no internal queues grow unboundedly. At any single point in time there are at most /// (n / batch_size) requests in flight. +#[instrument( + skip(state, client, network_state_rx, publish_keypair, staleness_threshold, permissioned_updates), + fields( + publish_keypair = publish_keypair.pubkey().to_string(), + staleness_threshold = staleness_threshold.as_millis(), + ) +)] pub async fn publish_batches( state: &S, client: Arc, @@ -472,6 +498,16 @@ where Ok(()) } +#[instrument( + skip(state, client, network_state, publish_keypair, batch, staleness_threshold), + fields( + publish_keypair = publish_keypair.pubkey().to_string(), + blockhash = network_state.blockhash.to_string(), + current_slot = network_state.current_slot, + staleness_threshold = staleness_threshold.as_millis(), + batch = ?batch.iter().map(|(identifier, _)| identifier.to_string()).collect::>(), + ) +)] async fn publish_batch( state: &S, client: Arc, diff --git a/src/agent/state/keypairs.rs b/src/agent/state/keypairs.rs index 5b57962..095f6b2 100644 --- a/src/agent/state/keypairs.rs +++ b/src/agent/state/keypairs.rs @@ -8,6 +8,7 @@ use { anyhow::Result, solana_sdk::signature::Keypair, tokio::sync::RwLock, + tracing::instrument, }; #[derive(Default)] @@ -35,6 +36,7 @@ where for<'a> &'a T: Into<&'a KeypairState>, T: Sync, { + #[instrument(skip(self))] async fn request_keypair(&self, network: Network) -> Result { let keypair = match network { Network::Primary => &self.into().primary_current_keypair, @@ -51,6 +53,7 @@ where )?) } + #[instrument(skip(self, new_keypair))] async fn update_keypair(&self, network: Network, new_keypair: Keypair) { *match network { Network::Primary => self.into().primary_current_keypair.write().await, diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index 0f4182a..eb4b0ae 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -46,6 +46,7 @@ use { time::Duration, }, tokio::sync::RwLock, + tracing::instrument, }; #[derive(Debug, Clone)] @@ -96,6 +97,7 @@ impl From for PriceEntry { impl PriceEntry { /// Construct the right underlying GenericPriceAccount based on the account size. + #[instrument(skip(acc))] pub fn load_from_account(acc: &[u8]) -> Option { unsafe { let size = match acc.len() { @@ -216,14 +218,13 @@ where T: Prices, T: Exporter, { + #[instrument(skip(self, account_key))] async fn handle_price_account_update( &self, network: Network, account_key: &Pubkey, account: &Account, ) -> Result<()> { - tracing::debug!("Handling account update."); - let mut data = self.into().data.write().await; // We are only interested in price account updates, all other types of updates @@ -259,6 +260,7 @@ where } /// Poll target Solana based chain for Pyth related accounts. + #[instrument(skip(self, publish_keypair, rpc_client))] async fn poll_updates( &self, network: Network, @@ -329,6 +331,7 @@ where } /// Sync Product/Price Accounts found by polling to the Global Store. + #[instrument(skip(self))] async fn sync_global_store(&self, network: Network) -> Result<()> { for (product_account_key, product_account) in &self.into().data.read().await.product_accounts @@ -357,11 +360,11 @@ where .await .map_err(|_| anyhow!("failed to notify price account update"))?; } - Ok(()) } } +#[instrument(skip(rpc_client))] async fn fetch_mapping_accounts( rpc_client: &RpcClient, mapping_account_key: Pubkey, @@ -381,6 +384,7 @@ async fn fetch_mapping_accounts( Ok(accounts) } +#[instrument(skip(rpc_client, mapping_accounts))] async fn fetch_product_and_price_accounts<'a, A>( rpc_client: &RpcClient, max_lookup_batch_size: usize, @@ -417,6 +421,7 @@ where Ok((product_entries, price_entries)) } +#[instrument(skip(rpc_client, product_key_batch))] async fn fetch_batch_of_product_and_price_accounts( rpc_client: &RpcClient, product_key_batch: &[Pubkey], @@ -564,6 +569,7 @@ async fn fetch_batch_of_product_and_price_accounts( Ok((product_entries, price_entries)) } +#[instrument(skip(data, new_data))] fn log_data_diff(data: &Data, new_data: &Data) { // Log new accounts which have been found let previous_mapping_accounts = data diff --git a/src/agent/state/transactions.rs b/src/agent/state/transactions.rs index f69ca1e..c0e8921 100644 --- a/src/agent/state/transactions.rs +++ b/src/agent/state/transactions.rs @@ -8,6 +8,7 @@ use { }, std::collections::VecDeque, tokio::sync::RwLock, + tracing::instrument, }; #[derive(Default)] @@ -44,6 +45,7 @@ where for<'a> &'a T: Into<&'a TransactionsState>, T: Sync + Send + 'static, { + #[instrument(skip(self))] async fn add_transaction(&self, signature: Signature) { tracing::debug!( signature = signature.to_string(), @@ -60,6 +62,7 @@ where } } + #[instrument(skip(self, rpc))] async fn poll_transactions_status(&self, rpc: &RpcClient) -> Result<()> { let mut txs = self.into().sent_transactions.write().await; if txs.is_empty() { diff --git a/src/bin/agent.rs b/src/bin/agent.rs index 3a515f7..040e018 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -5,6 +5,8 @@ use { Result, }, clap::Parser, + opentelemetry::KeyValue, + opentelemetry_otlp::WithExportConfig, pyth_agent::agent::{ config::Config, Agent, @@ -12,6 +14,11 @@ use { std::{ io::IsTerminal, path::PathBuf, + time::Duration, + }, + tracing_subscriber::{ + prelude::*, + EnvFilter, }, }; @@ -30,31 +37,61 @@ struct Arguments { #[tokio::main] async fn main() -> Result<()> { + let args = Arguments::parse(); + + if !args.config.as_path().exists() { + return Err(anyhow!("No config found under {:?}", args.config.to_str())); + } + + println!("Loading config from {:?}", args.config.display()); + + // Parse config early for logging channel capacity + let config = Config::new(args.config).context("Could not parse config")?; + + let env_filter = EnvFilter::from_default_env(); + // Initialize a Tracing Subscriber - let fmt_builder = tracing_subscriber::fmt() + let fmt_layer = tracing_subscriber::fmt::layer() .with_file(false) .with_line_number(true) .with_thread_ids(true) - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .with_ansi(std::io::stderr().is_terminal()); + let mut layers = Vec::new(); + layers.push(env_filter.boxed()); + + // Set up OpenTelemetry only if it's configured + if let Some(opentelemetry_config) = &config.opentelemetry { + // Set up the OpenTelemetry exporter + let otlp_exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(&opentelemetry_config.exporter_endpoint) + .with_timeout(Duration::from_secs( + opentelemetry_config.exporter_timeout_secs, + )); + + // Set up the OpenTelemetry tracer + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(otlp_exporter) + .with_trace_config(opentelemetry_sdk::trace::config().with_resource( + opentelemetry_sdk::Resource::new(vec![KeyValue::new("service.name", "pyth-agent")]), + )) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .map_err(|e| anyhow::anyhow!("Error initializing open telemetry: {}", e))?; + + // Set up the telemetry layer + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + layers.push(telemetry.boxed()); + } // Use the compact formatter if we're in a terminal, otherwise use the JSON formatter. if std::io::stderr().is_terminal() { - tracing::subscriber::set_global_default(fmt_builder.compact().finish())?; + layers.push(fmt_layer.compact().boxed()); } else { - tracing::subscriber::set_global_default(fmt_builder.json().finish())?; + layers.push(fmt_layer.json().boxed()); } - let args = Arguments::parse(); - - if !args.config.as_path().exists() { - return Err(anyhow!("No config found under {:?}", args.config.to_str())); - } - - println!("Loading config from {:?}", args.config.display()); - - // Parse config early for logging channel capacity - let config = Config::new(args.config).context("Could not parse config")?; + tracing_subscriber::registry().with(layers).init(); // Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE // should be set to 1 for this otherwise it will only print the top-level error.