From a8ccdab83e9ad7bc7a44275bacc386ca8b478960 Mon Sep 17 00:00:00 2001 From: Reisen Date: Wed, 5 Jun 2024 08:56:11 +0000 Subject: [PATCH] refactor(agent): convert to a tracing logger --- Cargo.lock | 324 ++++++-------------- Cargo.toml | 8 +- integration-tests/tests/test_integration.py | 2 +- src/agent.rs | 34 +- src/agent/metrics.rs | 77 ++--- src/agent/pyth/rpc.rs | 81 ++--- src/agent/solana.rs | 19 +- src/agent/solana/exporter.rs | 123 ++++---- src/agent/solana/oracle.rs | 149 ++++----- src/agent/state.rs | 14 +- src/agent/state/api.rs | 8 +- src/agent/state/global.rs | 24 +- src/agent/state/keypairs.rs | 27 +- src/agent/state/local.rs | 12 +- src/bin/agent.rs | 108 ++----- src/lib.rs | 7 - 16 files changed, 369 insertions(+), 648 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 641e24a2..4acfe921 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,12 +195,6 @@ version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" -[[package]] -name = "arc-swap" -version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" - [[package]] name = "ark-bn254" version = "0.4.0" @@ -348,7 +342,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff8eb72df928aafb99fe5d37b383f2fe25bd2a765e3e5f7c365916b6f2463a29" dependencies = [ - "term 0.5.2", + "term", ] [[package]] @@ -1395,28 +1389,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fd78930633bd1c6e35c4b42b1df7b0cbc6bc191146e512bb3bedf243fcc3901" dependencies = [ "libc", - "redox_users 0.3.5", - "winapi", -] - -[[package]] -name = "dirs-next" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" -dependencies = [ - "cfg-if", - "dirs-sys-next", -] - -[[package]] -name = "dirs-sys-next" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" -dependencies = [ - "libc", - "redox_users 0.4.4", + "redox_users", "winapi", ] @@ -1597,15 +1570,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" -[[package]] -name = "erased-serde" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c138974f9d5e7fe373eb04df7cae98833802ae4b11c24ac7039a21d5af4b26c" -dependencies = [ - "serde", -] - [[package]] name = "errno" version = "0.3.8" @@ -2000,17 +1964,6 @@ dependencies = [ "hmac 0.8.1", ] -[[package]] -name = "hostname" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" -dependencies = [ - "libc", - "match_cfg", - "winapi", -] - [[package]] name = "htmlescape" version = "0.3.1" @@ -2214,17 +2167,6 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" -[[package]] -name = "is-terminal" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" -dependencies = [ - "hermit-abi 0.3.9", - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "itertools" version = "0.8.2" @@ -2335,7 +2277,7 @@ dependencies = [ "serde_derive", "sha2 0.8.2", "string_cache", - "term 0.5.2", + "term", "unicode-xid 0.1.0", ] @@ -2369,17 +2311,6 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" -[[package]] -name = "libredox" -version = "0.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8" -dependencies = [ - "bitflags 2.5.0", - "libc", - "redox_syscall 0.4.1", -] - [[package]] name = "libsecp256k1" version = "0.6.0" @@ -2469,10 +2400,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] -name = "match_cfg" +name = "matchers" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] [[package]] name = "memchr" @@ -2608,6 +2542,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.2.1" @@ -2776,15 +2720,6 @@ dependencies = [ "syn 2.0.55", ] -[[package]] -name = "num_threads" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9" -dependencies = [ - "libc", -] - [[package]] name = "number_prefix" version = "0.4.0" @@ -2855,6 +2790,12 @@ version = "6.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.1" @@ -3296,12 +3237,6 @@ dependencies = [ "serde", "serde-this-or-that", "serde_json", - "slog", - "slog-async", - "slog-bunyan", - "slog-envlogger", - "slog-extlog", - "slog-term", "soketto", "solana-account-decoder", "solana-client", @@ -3312,6 +3247,8 @@ dependencies = [ "tokio-stream", "tokio-util", "toml_edit 0.22.9", + "tracing", + "tracing-subscriber", "typed-html", "warp", "winnow 0.6.5", @@ -3693,17 +3630,6 @@ dependencies = [ "rust-argon2", ] -[[package]] -name = "redox_users" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a18479200779601e498ada4e8c1e1f50e3ee19deb0259c25825a98b5603b2cb4" -dependencies = [ - "getrandom 0.2.12", - "libredox", - "thiserror", -] - [[package]] name = "regex" version = "1.10.4" @@ -3712,10 +3638,19 @@ checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata", + "regex-automata 0.4.6", "regex-syntax 0.8.3", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + [[package]] name = "regex-automata" version = "0.4.6" @@ -4253,6 +4188,15 @@ dependencies = [ "keccak", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shell-words" version = "1.1.0" @@ -4305,116 +4249,6 @@ dependencies = [ "autocfg 1.2.0", ] -[[package]] -name = "slog" -version = "2.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8347046d4ebd943127157b94d63abb990fcf729dc4e9978927fdf4ac3c998d06" -dependencies = [ - "erased-serde", -] - -[[package]] -name = "slog-async" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72c8038f898a2c79507940990f05386455b3a317d8f18d4caea7cbc3d5096b84" -dependencies = [ - "crossbeam-channel", - "slog", - "take_mut", - "thread_local", -] - -[[package]] -name = "slog-bunyan" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcaaf6e68789d3f0411f1e72bc443214ef252a1038b6e344836e50442541f190" -dependencies = [ - "hostname", - "slog", - "slog-json", - "time", -] - -[[package]] -name = "slog-envlogger" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "906a1a0bc43fed692df4b82a5e2fbfc3733db8dad8bb514ab27a4f23ad04f5c0" -dependencies = [ - "log", - "regex", - "slog", - "slog-async", - "slog-scope", - "slog-stdlog", - "slog-term", -] - -[[package]] -name = "slog-extlog" -version = "8.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c00caea52ddc6535e015114a7eb1d2483898f14d6f5110755c56c9f0d765fb71" -dependencies = [ - "erased-serde", - "iobuffer", - "serde", - "serde_json", - "slog", - "slog-json", -] - -[[package]] -name = "slog-json" -version = "2.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e1e53f61af1e3c8b852eef0a9dee29008f55d6dd63794f3f12cef786cf0f219" -dependencies = [ - "erased-serde", - "serde", - "serde_json", - "slog", - "time", -] - -[[package]] -name = "slog-scope" -version = "4.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f95a4b4c3274cd2869549da82b57ccc930859bdbf5bcea0424bc5f140b3c786" -dependencies = [ - "arc-swap", - "lazy_static", - "slog", -] - -[[package]] -name = "slog-stdlog" -version = "4.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6706b2ace5bbae7291d3f8d2473e2bfab073ccd7d03670946197aec98471fa3e" -dependencies = [ - "log", - "slog", - "slog-scope", -] - -[[package]] -name = "slog-term" -version = "2.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6e022d0b998abfe5c3782c1f03551a596269450ccd677ea51c56f8b214610e8" -dependencies = [ - "is-terminal", - "slog", - "term 0.7.0", - "thread_local", - "time", -] - [[package]] name = "smallvec" version = "1.13.2" @@ -5561,12 +5395,6 @@ dependencies = [ "libc", ] -[[package]] -name = "take_mut" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" - [[package]] name = "tempfile" version = "3.10.1" @@ -5590,17 +5418,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "term" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f" -dependencies = [ - "dirs-next", - "rustversion", - "winapi", -] - [[package]] name = "termcolor" version = "1.4.1" @@ -5663,9 +5480,7 @@ checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" dependencies = [ "deranged", "itoa", - "libc", "num-conv", - "num_threads", "powerfmt", "serde", "time-core", @@ -5927,6 +5742,49 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", + "tracing-serde", ] [[package]] @@ -6122,6 +5980,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vec_map" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 4612b389..f7fa1f9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,15 +33,11 @@ solana-account-decoder = "1.18.8" solana-client = "1.18.8" solana-sdk = "1.18.8" bincode = "1.3.3" -slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_trace"] } -slog-term = "2.9.1" rand = "0.8.5" -slog-async = "2.8.0" config = "0.14.0" thiserror = "1.0.58" clap = { version = "4.5.4", features = ["derive"] } humantime-serde = "1.1.1" -slog-envlogger = "2.2.0" serde-this-or-that = "0.4.2" # The public typed-html 0.2.2 release is causing a recursion limit # error that cannot be fixed from outside the crate. @@ -52,9 +48,10 @@ humantime = "2.1.0" prometheus-client = "0.22.2" lazy_static = "1.4.0" toml_edit = "0.22.9" -slog-bunyan = "2.5.0" winnow = "0.6.5" proptest = "1.4.0" +tracing = { version = "0.1.40", features = ["log"] } +tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } [dev-dependencies] tokio-util = { version = "0.7.10", features = ["full"] } @@ -62,7 +59,6 @@ soketto = "0.8.0" portpicker = "0.1.1" rand = "0.8.5" tokio-retry = "0.3.0" -slog-extlog = "8.1.0" iobuffer = "0.2.0" [profile.release] diff --git a/integration-tests/tests/test_integration.py b/integration-tests/tests/test_integration.py index fae2e87d..d155a903 100644 --- a/integration-tests/tests/test_integration.py +++ b/integration-tests/tests/test_integration.py @@ -695,7 +695,7 @@ async def test_update_price_discards_unpermissioned(self, client: PythAgentClien lines_found += 1 expected_unperm_pubkey = final_price_account_unperm["account"] # Must point at the expected account as all other attempts must be valid - assert f"price_account: {expected_unperm_pubkey}" in line + assert f'"unpermissioned_price_account":"{expected_unperm_pubkey}"' in line # Must find at least one log discarding the account assert lines_found > 0 diff --git a/src/agent.rs b/src/agent.rs index c30262a8..d0f66206 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -71,7 +71,6 @@ use { anyhow::Result, futures_util::future::join_all, lazy_static::lazy_static, - slog::Logger, std::sync::Arc, tokio::sync::watch, }; @@ -105,31 +104,33 @@ impl Agent { Agent { config } } - pub async fn start(&self, logger: Logger) { - info!(logger, "Starting {}", env!("CARGO_PKG_NAME"); - "config" => format!("{:?}", &self.config), - "version" => env!("CARGO_PKG_VERSION"), - "cwd" => std::env::current_dir().map(|p| format!("{}", p.display())).unwrap_or("".to_owned()) + pub async fn start(&self) { + tracing::info!( + config = format!("{:?}", &self.config), + version = env!("CARGO_PKG_VERSION"), + cwd = std::env::current_dir() + .map(|p| format!("{}", p.display())) + .unwrap_or("".to_owned()), + "Starting {}", + env!("CARGO_PKG_NAME"), ); - if let Err(err) = self.spawn(logger.clone()).await { - error!(logger, "{}", err); - debug!(logger, "error context"; "context" => format!("{:?}", err)); + if let Err(err) = self.spawn().await { + tracing::error!(err = ?err, "Agent spawn failed."); }; } - async fn spawn(&self, logger: Logger) -> Result<()> { + async fn spawn(&self) -> Result<()> { // job handles let mut jhs = vec![]; // Create the Application State. - let state = Arc::new(state::State::new(self.config.state.clone(), logger.clone()).await); + let state = Arc::new(state::State::new(self.config.state.clone()).await); // Spawn the primary network jhs.extend(network::spawn_network( self.config.primary_network.clone(), network::Network::Primary, - logger.new(o!("primary" => true)), state.clone(), )?); @@ -138,26 +139,22 @@ impl Agent { jhs.extend(network::spawn_network( config.clone(), network::Network::Secondary, - logger.new(o!("primary" => false)), state.clone(), )?); } // Create the Notifier task for the Pythd RPC. - jhs.push(tokio::spawn(notifier(logger.clone(), state.clone()))); + jhs.push(tokio::spawn(notifier(state.clone()))); // Spawn the Pythd API Server jhs.push(tokio::spawn(rpc::run( self.config.pythd_api_server.clone(), - logger.clone(), state.clone(), ))); // Spawn the metrics server - jhs.push(tokio::spawn(metrics::MetricsServer::spawn( + jhs.push(tokio::spawn(metrics::spawn( self.config.metrics_server.bind_address, - logger.clone(), - state.clone(), ))); // Spawn the remote keypair loader endpoint for both networks @@ -169,7 +166,6 @@ impl Agent { .as_ref() .map(|c| c.rpc_url.clone()), self.config.remote_keypair_loader.clone(), - logger, state, ) .await, diff --git a/src/agent/metrics.rs b/src/agent/metrics.rs index c5f7e5a6..4c830ce1 100644 --- a/src/agent/metrics.rs +++ b/src/agent/metrics.rs @@ -1,8 +1,5 @@ use { - super::state::{ - local::PriceInfo, - State, - }, + super::state::local::PriceInfo, crate::agent::solana::oracle::PriceEntry, lazy_static::lazy_static, prometheus_client::{ @@ -18,7 +15,6 @@ use { registry::Registry, }, serde::Deserialize, - slog::Logger, solana_sdk::pubkey::Pubkey, std::{ net::SocketAddr, @@ -26,7 +22,6 @@ use { atomic::AtomicU64, Arc, }, - time::Instant, }, tokio::sync::Mutex, warp::{ @@ -61,53 +56,33 @@ lazy_static! { Arc::new(Mutex::new(::default())); } -/// Internal metrics server state, holds state needed for serving -/// metrics. -pub struct MetricsServer { - pub start_time: Instant, - pub logger: Logger, - pub state: Arc, -} - -impl MetricsServer { - /// Instantiate a metrics API. - pub async fn spawn(addr: impl Into + 'static, logger: Logger, state: Arc) { - let server = MetricsServer { - start_time: Instant::now(), - logger, - state, - }; - - let shared_state = Arc::new(Mutex::new(server)); - let shared_state4metrics = shared_state.clone(); - let metrics_route = warp::path("metrics") - .and(warp::path::end()) - .and_then(move || { - let shared_state = shared_state4metrics.clone(); - async move { - let locked_state = shared_state.lock().await; - let mut buf = String::new(); - let response = encode(&mut buf, &&PROMETHEUS_REGISTRY.lock().await) - .map_err(|e| -> Box { - e.into() - }) - .and_then(|_| -> Result<_, Box> { - Ok(Box::new(reply::with_status(buf, StatusCode::OK))) - }).unwrap_or_else(|e| { - error!(locked_state.logger, "Metrics: Could not gather metrics from registry"; "error" => e.to_string()); - Box::new(reply::with_status("Could not gather metrics. See logs for details".to_string(), StatusCode::INTERNAL_SERVER_ERROR)) - }); - - Result::, Rejection>::Ok(response) - } - }); - - let (_, serve) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async { - let _ = crate::agent::EXIT.subscribe().changed().await; +/// Instantiate a metrics API. +pub async fn spawn(addr: impl Into + 'static) { + let metrics_route = warp::path("metrics") + .and(warp::path::end()) + .and_then(move || async move { + let mut buf = String::new(); + let response = encode(&mut buf, &&PROMETHEUS_REGISTRY.lock().await) + .map_err(|e| -> Box { e.into() }) + .and_then(|_| -> Result<_, Box> { + Ok(Box::new(reply::with_status(buf, StatusCode::OK))) + }) + .unwrap_or_else(|e| { + tracing::error!(err = ?e, "Metrics: Could not gather metrics from registry"); + Box::new(reply::with_status( + "Could not gather metrics. See logs for details".to_string(), + StatusCode::INTERNAL_SERVER_ERROR, + )) + }); + + Result::, Rejection>::Ok(response) }); - serve.await - } + let (_, serve) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async { + let _ = crate::agent::EXIT.subscribe().changed().await; + }); + + serve.await } #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] diff --git a/src/agent/pyth/rpc.rs b/src/agent/pyth/rpc.rs index 29473af4..8d705a44 100644 --- a/src/agent/pyth/rpc.rs +++ b/src/agent/pyth/rpc.rs @@ -44,7 +44,6 @@ use { as_i64, as_u64, }, - slog::Logger, std::{ fmt::Debug, net::SocketAddr, @@ -115,7 +114,6 @@ async fn handle_connection( state: Arc, notify_price_tx_buffer: usize, notify_price_sched_tx_buffer: usize, - logger: Logger, ) where S: state::Prices, S: Send, @@ -130,7 +128,6 @@ async fn handle_connection( loop { if let Err(err) = handle_next( - &logger, &*state, &mut ws_tx, &mut ws_rx, @@ -144,18 +141,16 @@ async fn handle_connection( if let Some(ConnectionError::WebsocketConnectionClosed) = err.downcast_ref::() { - info!(logger, "websocket connection closed"); + tracing::info!("Websocket connection closed."); return; } - error!(logger, "{}", err); - debug!(logger, "error context"; "context" => format!("{:?}", err)); + tracing::error!(err = ?err, "RPC failed to handle WebSocket message."); } } } async fn handle_next( - logger: &Logger, state: &S, ws_tx: &mut SplitSink, ws_rx: &mut SplitStream, @@ -173,7 +168,6 @@ where Some(body) => match body { Ok(msg) => { handle( - logger, ws_tx, state, notify_price_tx, @@ -199,7 +193,6 @@ where } async fn handle( - logger: &Logger, ws_tx: &mut SplitSink, state: &S, notify_price_tx: &mpsc::Sender, @@ -211,7 +204,7 @@ where { // Ignore control and binary messages if !msg.is_text() { - debug!(logger, "JSON RPC API: skipped non-text message"); + tracing::debug!("JSON RPC API: skipped non-text message"); return Ok(()); } @@ -223,7 +216,6 @@ where // Perform requests in sequence and gather responses for request in requests { let response = dispatch_and_catch_error( - logger, state, notify_price_tx, notify_price_sched_tx, @@ -286,7 +278,6 @@ async fn parse(msg: Message) -> Result<(Vec>, bool)> { } async fn dispatch_and_catch_error( - logger: &Logger, state: &S, notify_price_tx: &mpsc::Sender, notify_price_sched_tx: &mpsc::Sender, @@ -295,10 +286,9 @@ async fn dispatch_and_catch_error( where S: state::Prices, { - debug!( - logger, - "JSON RPC API: handling request"; - "method" => format!("{:?}", request.method), + tracing::debug!( + method = ?request.method, + "JSON RPC API: handling request", ); let result = match request.method { @@ -321,11 +311,10 @@ where Response::success(request.id.clone().to_id().unwrap_or(Id::from(0)), payload) } Err(e) => { - warn!( - logger, - "Error handling JSON RPC request"; - "request" => format!("{:?}", request), - "error" => format!("{}", e.to_string()), + tracing::warn!( + request = ?request, + error = e.to_string(), + "Error handling JSON RPC request", ); Response::error( @@ -399,11 +388,6 @@ async fn send_text(ws_tx: &mut SplitSink, msg: &str) -> Resu .map_err(|e| e.into()) } -#[derive(Clone)] -struct WithLogger { - logger: Logger, -} - #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct Config { @@ -427,20 +411,19 @@ impl Default for Config { } } -pub async fn run(config: Config, logger: Logger, state: Arc) +pub async fn run(config: Config, state: Arc) where S: state::Prices, S: Send, S: Sync, S: 'static, { - if let Err(err) = serve(config, &logger, state).await { - error!(logger, "{}", err); - debug!(logger, "error context"; "context" => format!("{:?}", err)); + if let Err(err) = serve(config, state).await { + tracing::error!(err = ?err, "RPC server failed."); } } -async fn serve(config: Config, logger: &Logger, state: Arc) -> Result<()> +async fn serve(config: Config, state: Arc) -> Result<()> where S: state::Prices, S: Send, @@ -448,32 +431,25 @@ where S: 'static, { let config = config.clone(); - let with_logger = WithLogger { - logger: logger.clone(), - }; let index = { let config = config.clone(); warp::path::end() .and(warp::ws()) .and(warp::any().map(move || state.clone())) - .and(warp::any().map(move || with_logger.clone())) .and(warp::any().map(move || config.clone())) - .map( - |ws: Ws, state: Arc, with_logger: WithLogger, config: Config| { - ws.on_upgrade(move |conn| async move { - info!(with_logger.logger, "websocket user connected"); - handle_connection( - conn, - state, - config.notify_price_tx_buffer, - config.notify_price_sched_tx_buffer, - with_logger.logger, - ) - .await - }) - }, - ) + .map(|ws: Ws, state: Arc, config: Config| { + ws.on_upgrade(move |conn| async move { + tracing::info!("Websocket user connected."); + handle_connection( + conn, + state, + config.notify_price_tx_buffer, + config.notify_price_sched_tx_buffer, + ) + .await + }) + }) }; let (_, serve) = warp::serve(index).bind_with_graceful_shutdown( @@ -483,7 +459,10 @@ where }, ); - info!(logger, "starting api server"; "listen address" => config.listen_address.clone()); + tracing::info!( + listen_address = config.listen_address.clone(), + "Starting api server.", + ); tokio::task::spawn(serve).await.map_err(|e| e.into()) } diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 1b4456cb..5a32ff40 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -20,7 +20,6 @@ pub mod network { Deserialize, Serialize, }, - slog::Logger, std::{ sync::Arc, time::Duration, @@ -74,7 +73,6 @@ pub mod network { pub fn spawn_network( config: Config, network: Network, - logger: Logger, state: Arc, ) -> Result>> { // Publisher permissions updates between oracle and exporter @@ -88,8 +86,7 @@ pub mod network { &config.wss_url, config.rpc_timeout, publisher_permissions_tx, - KeyStore::new(config.key_store.clone(), &logger)?, - logger.clone(), + KeyStore::new(config.key_store.clone())?, state.clone(), ); @@ -100,8 +97,7 @@ pub mod network { &config.rpc_url, config.rpc_timeout, publisher_permissions_rx, - KeyStore::new(config.key_store.clone(), &logger)?, - logger, + KeyStore::new(config.key_store.clone())?, state, )?; @@ -122,7 +118,6 @@ mod key_store { Serialize, Serializer, }, - slog::Logger, solana_sdk::{ pubkey::Pubkey, signature::Keypair, @@ -176,13 +171,15 @@ mod key_store { } impl KeyStore { - pub fn new(config: Config, logger: &Logger) -> Result { + pub fn new(config: Config) -> Result { let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) { Ok(k) => Some(k), Err(e) => { - warn!(logger, - "Reading publish keypair returned an error. Waiting for a remote-loaded key before publishing."; - "publish_keypair_path" => config.publish_keypair_path.display(), "error" => e.to_string()); + tracing::warn!( + error = ?e, + publish_keypair_path = config.publish_keypair_path.display().to_string(), + "Reading publish keypair returned an error. Waiting for a remote-loaded key before publishing.", + ); None } }; diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index 9fd3dde8..870fa87f 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -32,7 +32,6 @@ use { Deserialize, Serialize, }, - slog::Logger, solana_client::{ nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig, @@ -174,7 +173,6 @@ pub fn spawn_exporter( HashMap>, >, key_store: KeyStore, - logger: Logger, state: Arc, ) -> Result>> { // Create and spawn the network state querier @@ -184,7 +182,6 @@ pub fn spawn_exporter( rpc_timeout, time::interval(config.refresh_network_state_interval_duration), network_state_tx, - logger.clone(), ); let network_state_querier_jh = tokio::spawn(async move { network_state_querier.run().await }); @@ -196,7 +193,6 @@ pub fn spawn_exporter( rpc_url, rpc_timeout, transactions_rx, - logger.clone(), ); let transaction_monitor_jh = tokio::spawn(async move { transaction_monitor.run().await }); @@ -210,7 +206,6 @@ pub fn spawn_exporter( network_state_rx, transactions_tx, publisher_permissions_rx, - logger, state, ); let exporter_jh = tokio::spawn(async move { exporter.run().await }); @@ -262,8 +257,6 @@ pub struct Exporter { /// Recent compute unit price in micro lamports (set if dynamic compute unit pricing is enabled) recent_compute_unit_price_micro_lamports: Option, - logger: Logger, - state: Arc, } @@ -279,7 +272,6 @@ impl Exporter { publisher_permissions_rx: watch::Receiver< HashMap>, >, - logger: Logger, state: Arc, ) -> Self { let publish_interval = time::interval(config.publish_interval_duration); @@ -301,7 +293,6 @@ impl Exporter { time::Duration::from_secs(1), ), recent_compute_unit_price_micro_lamports: None, - logger, state, } } @@ -311,15 +302,13 @@ impl Exporter { tokio::select! { _ = self.publish_interval.tick() => { if let Err(err) = self.publish_updates().await { - error!(self.logger, "{}", err); - debug!(self.logger, "error context"; "context" => format!("{:?}", err)); + tracing::error!(err = ?err, "Exporter failed to publish."); } } _ = self.dynamic_compute_unit_price_update_interval.tick() => { if self.config.dynamic_compute_unit_pricing_enabled { if let Err(err) = self.update_recent_compute_unit_price().await { - error!(self.logger, "{}", err); - debug!(self.logger, "error context"; "context" => format!("{:?}", err)); + tracing::error!(err = ?err, "Exporter failed to compute unit price."); } } } @@ -406,12 +395,9 @@ impl Exporter { // keypairs it does not have. Currently expressed in // handle_key_requests() in remote_keypair_loader.rs - debug!( - self.logger, - "Exporter: Publish keypair is None, requesting remote loaded key" - ); + tracing::debug!("Exporter: Publish keypair is None, requesting remote loaded key"); let kp = Keypairs::request_keypair(&*self.state, self.network).await?; - debug!(self.logger, "Exporter: Keypair received"); + tracing::debug!("Exporter: Keypair received"); Ok(kp) } } @@ -424,9 +410,10 @@ impl Exporter { let now = Utc::now().naive_utc(); - debug!(self.logger, "Exporter: filtering prices permissioned to us"; - "our_prices" => format!("{:?}", self.our_prices.keys()), - "publish_pubkey" => publish_keypair.pubkey().to_string(), + tracing::debug!( + our_prices = ?self.our_prices.keys(), + publish_pubkey = publish_keypair.pubkey().to_string(), + "Exporter: filtering prices permissioned to us", ); // Filter the contents to only include information we haven't already sent, @@ -459,10 +446,11 @@ impl Exporter { let ret = publisher_permission.schedule.can_publish_at(&now_utc); if !ret { - debug!(self.logger, "Exporter: Attempted to publish price outside market hours"; - "price_account" => key_from_id.to_string(), - "schedule" => format!("{:?}", publisher_permission.schedule), - "utc_time" => now_utc.format("%c").to_string(), + tracing::debug!( + price_account = key_from_id.to_string(), + schedule = ?publisher_permission.schedule, + utc_time = now_utc.format("%c").to_string(), + "Exporter: Attempted to publish price outside market hours", ); } @@ -471,11 +459,10 @@ impl Exporter { // Note: This message is not an error. Some // publishers have different permissions on // primary/secondary networks - debug!( - self.logger, - "Exporter: Attempted to publish a price without permission, skipping"; - "unpermissioned_price_account" => key_from_id.to_string(), - "permissioned_accounts" => format!("{:?}", self.our_prices) + tracing::debug!( + unpermissioned_price_account = key_from_id.to_string(), + permissioned_accounts = ?self.our_prices, + "Exporter: Attempted to publish a price without permission, skipping", ); false } @@ -569,11 +556,10 @@ impl Exporter { Ok(true) => {} Ok(false) => return, Err(other) => { - warn!( - self.logger, - "Exporter: Updating permissioned price accounts failed unexpectedly, using cached value"; - "cached_value" => format!("{:?}", self.our_prices), - "error" => other.to_string(), + tracing::warn!( + cached_value = ?self.our_prices, + error = other.to_string(), + "Exporter: Updating permissioned price accounts failed unexpectedly, using cached value", ); return; } @@ -585,10 +571,9 @@ impl Exporter { .get(publish_pubkey) .cloned() .unwrap_or_else(|| { - warn!( - self.logger, - "Exporter: No permissioned prices were found for the publishing keypair on-chain. This is expected only on startup."; - "publish_pubkey" => publish_pubkey.to_string(), + tracing::warn!( + publish_pubkey = publish_pubkey.to_string(), + "Exporter: No permissioned prices were found for the publishing keypair on-chain. This is expected only on startup.", ); HashMap::new() }); @@ -750,7 +735,10 @@ impl Exporter { compute_unit_price_micro_lamports = compute_unit_price_micro_lamports .min(self.config.maximum_compute_unit_price_micro_lamports); - debug!(self.logger, "setting compute unit price"; "unit_price" => compute_unit_price_micro_lamports); + tracing::debug!( + unit_price = compute_unit_price_micro_lamports, + "setting compute unit price", + ); instructions.push(ComputeBudgetInstruction::set_compute_unit_price( compute_unit_price_micro_lamports, )); @@ -764,7 +752,6 @@ impl Exporter { ); let tx = self.inflight_transactions_tx.clone(); - let logger = self.logger.clone(); let rpc_client = self.rpc_client.clone(); // Fire this off in a separate task so we don't block the main thread of the exporter @@ -781,17 +768,20 @@ impl Exporter { { Ok(signature) => signature, Err(err) => { - error!(logger, "{}", err); - debug!(logger, "error context"; "context" => format!("{:?}", err)); + tracing::error!(err = ?err, "Exporter: failed to send transaction."); return; } }; - debug!(logger, "sent upd_price transaction"; "signature" => signature.to_string(), "instructions" => instructions.len(), "price_accounts" => format!("{:?}", price_accounts)); + tracing::debug!( + signature = signature.to_string(), + instructions = instructions.len(), + price_accounts = ?price_accounts, + "Sent upd_price transaction.", + ); if let Err(err) = tx.send(signature).await { - error!(logger, "{}", err); - debug!(logger, "error context"; "context" => format!("{:?}", err)); + tracing::error!(err = ?err, "Exporter failed to send signature to transaction monitor"); } }); @@ -946,9 +936,6 @@ struct NetworkStateQuerier { /// Channel the current network state is sent on network_state_tx: watch::Sender, - - /// Logger - logger: Logger, } impl NetworkStateQuerier { @@ -957,13 +944,11 @@ impl NetworkStateQuerier { rpc_timeout: Duration, query_interval: Interval, network_state_tx: watch::Sender, - logger: Logger, ) -> Self { NetworkStateQuerier { rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout), query_interval, network_state_tx, - logger, } } @@ -971,8 +956,7 @@ impl NetworkStateQuerier { loop { self.query_interval.tick().await; if let Err(err) = self.query_network_state().await { - error!(self.logger, "{}", err); - debug!(self.logger, "error context"; "context" => format!("{:?}", err)); + tracing::error!(err = ?err, "Network state query failed"); } } } @@ -1004,7 +988,6 @@ mod transaction_monitor { Deserialize, Serialize, }, - slog::Logger, solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::{ commitment_config::CommitmentConfig, @@ -1061,8 +1044,6 @@ mod transaction_monitor { /// Interval with which to poll the status of transactions poll_interval: Interval, - - logger: Logger, } impl TransactionMonitor { @@ -1071,7 +1052,6 @@ mod transaction_monitor { rpc_url: &str, rpc_timeout: Duration, transactions_rx: mpsc::Receiver, - logger: Logger, ) -> Self { let poll_interval = time::interval(config.poll_interval_duration); let rpc_client = RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout); @@ -1081,15 +1061,13 @@ mod transaction_monitor { sent_transactions: VecDeque::new(), transactions_rx, poll_interval, - logger, } } pub async fn run(&mut self) { loop { if let Err(err) = self.handle_next().await { - error!(self.logger, "{}", err); - debug!(self.logger, "error context"; "context" => format!("{:?}", err)); + tracing::error!(err = ?err, "Transaction monitor failed."); } } } @@ -1107,7 +1085,10 @@ mod transaction_monitor { } fn add_transaction(&mut self, signature: Signature) { - debug!(self.logger, "monitoring new transaction"; "signature" => signature.to_string()); + tracing::debug!( + signature = signature.to_string(), + "Monitoring new transaction.", + ); // Add the new transaction to the list self.sent_transactions.push_back(signature); @@ -1132,7 +1113,10 @@ mod transaction_monitor { .await? .value; - debug!(self.logger, "Processing Signature Statuses"; "statuses" => format!("{:?}", statuses)); + tracing::debug!( + statuses = ?statuses, + "Processing Signature Statuses", + ); // Determine the percentage of the recently sent transactions that have successfully been committed // TODO: expose as metric @@ -1143,10 +1127,11 @@ mod transaction_monitor { .flatten() .filter(|(status, sig)| { if let Some(err) = status.err.as_ref() { - warn!(self.logger, "TX status has err value"; - "error" => err.to_string(), - "tx_signature" => sig.to_string(), - ) + tracing::warn!( + error = err.to_string(), + tx_signature = sig.to_string(), + "TX status has err value", + ); } status.satisfies_commitment(CommitmentConfig::confirmed()) @@ -1154,7 +1139,11 @@ mod transaction_monitor { .count(); let percentage_confirmed = ((confirmed as f64) / (self.sent_transactions.len() as f64)) * 100.0; - info!(self.logger, "monitoring transaction hit rate"; "percentage confirmed" => format!("{:.}", percentage_confirmed)); + + tracing::info!( + percentage_confirmed = format!("{:.}", percentage_confirmed), + "monitoring transaction hit rate", + ); Ok(()) } diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index e8cba19c..ff7c1f42 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -32,7 +32,6 @@ use { Deserialize, Serialize, }, - slog::Logger, solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::{ account::Account, @@ -176,8 +175,6 @@ pub struct Oracle { network: Network, - logger: Logger, - state: Arc, } @@ -227,7 +224,6 @@ pub fn spawn_oracle( HashMap>, >, key_store: KeyStore, - logger: Logger, state: Arc, ) -> Vec> { let mut jhs = vec![]; @@ -240,7 +236,6 @@ pub fn spawn_oracle( config.commitment, key_store.program_key, updates_tx, - logger.clone(), ); jhs.push(tokio::spawn(async move { subscriber.run().await })); } @@ -256,12 +251,11 @@ pub fn spawn_oracle( config.poll_interval_duration, config.max_lookup_batch_size, key_store.mapping_key, - logger.clone(), ); jhs.push(tokio::spawn(async move { poller.run().await })); // Create and spawn the Oracle - let mut oracle = Oracle::new(data_rx, updates_rx, network, logger, state); + let mut oracle = Oracle::new(data_rx, updates_rx, network, state); jhs.push(tokio::spawn(async move { oracle.run().await })); jhs @@ -272,7 +266,6 @@ impl Oracle { data_rx: mpsc::Receiver, updates_rx: mpsc::Receiver<(Pubkey, solana_sdk::account::Account)>, network: Network, - logger: Logger, state: Arc, ) -> Self { Oracle { @@ -280,7 +273,6 @@ impl Oracle { data_rx, updates_rx, network, - logger, state, } } @@ -288,8 +280,7 @@ impl Oracle { pub async fn run(&mut self) { loop { if let Err(err) = self.handle_next().await { - error!(self.logger, "{}", err); - debug!(self.logger, "error context"; "context" => format!("{:?}", err)); + tracing::error!(err = ?err, "Oracle failed to handle next update."); } } } @@ -314,33 +305,45 @@ impl Oracle { .keys() .cloned() .collect::>(); - info!(self.logger, "fetched mapping accounts"; "new" => format!("{:?}", data + tracing::info!( + new = ?data .mapping_accounts .keys() .cloned() - .collect::>().difference(&previous_mapping_accounts)), "total" => data.mapping_accounts.len()); + .collect::>().difference(&previous_mapping_accounts), + total = data.mapping_accounts.len(), + "Fetched mapping accounts." + ); let previous_product_accounts = self .data .product_accounts .keys() .cloned() .collect::>(); - info!(self.logger, "fetched product accounts"; "new" => format!("{:?}", data + tracing::info!( + new = ?data .product_accounts .keys() .cloned() - .collect::>().difference(&previous_product_accounts)), "total" => data.product_accounts.len()); + .collect::>().difference(&previous_product_accounts), + total = data.product_accounts.len(), + "Fetched product accounts.", + ); let previous_price_accounts = self .data .price_accounts .keys() .cloned() .collect::>(); - info!(self.logger, "fetched price accounts"; "new" => format!("{:?}", data + tracing::info!( + new = ?data .price_accounts .keys() .cloned() - .collect::>().difference(&previous_price_accounts)), "total" => data.price_accounts.len()); + .collect::>().difference(&previous_price_accounts), + total = data.price_accounts.len(), + "Fetched price accounts.", + ); let previous_publishers = self .data @@ -348,11 +351,10 @@ impl Oracle { .keys() .collect::>(); let new_publishers = data.publisher_permissions.keys().collect::>(); - info!( - self.logger, - "updated publisher permissions"; - "new_publishers" => format!("{:?}", new_publishers.difference(&previous_publishers).collect::>()), - "total_publishers" => new_publishers.len(), + tracing::info!( + new_publishers = ?new_publishers.difference(&previous_publishers).collect::>(), + total_publishers = new_publishers.len(), + "Updated publisher permissions.", ); // Update the data with the new data structs @@ -364,7 +366,7 @@ impl Oracle { account_key: &Pubkey, account: &Account, ) -> Result<()> { - debug!(self.logger, "handling account update"); + tracing::debug!("Handling account update."); // We are only interested in price account updates, all other types of updates // will be fetched using polling. @@ -383,7 +385,13 @@ impl Oracle { let price_entry = PriceEntry::load_from_account(&account.data) .with_context(|| format!("load price account {}", account_key))?; - debug!(self.logger, "observed on-chain price account update"; "pubkey" => account_key.to_string(), "price" => price_entry.agg.price, "conf" => price_entry.agg.conf, "status" => format!("{:?}", price_entry.agg.status)); + tracing::debug!( + pubkey = account_key.to_string(), + price = price_entry.agg.price, + conf = price_entry.agg.conf, + status = ?price_entry.agg.status, + "Observed on-chain price account update.", + ); self.data .price_accounts @@ -462,9 +470,6 @@ struct Poller { max_lookup_batch_size: usize, mapping_key: Pubkey, - - /// Logger - logger: Logger, } impl Poller { @@ -479,7 +484,6 @@ impl Poller { poll_interval_duration: Duration, max_lookup_batch_size: usize, mapping_key: Pubkey, - logger: Logger, ) -> Self { let rpc_client = RpcClient::new_with_timeout_and_commitment( rpc_url.to_string(), @@ -495,17 +499,15 @@ impl Poller { poll_interval, max_lookup_batch_size, mapping_key, - logger, } } pub async fn run(&mut self) { loop { self.poll_interval.tick().await; - info!(self.logger, "fetching all pyth account data"); + tracing::info!("Fetching all pyth account data."); if let Err(err) = self.poll_and_send().await { - error!(self.logger, "{}", err); - debug!(self.logger, "error context"; "context" => format!("{:?}", err)); + tracing::error!(err = ?err, "Oracle Poll/Send Failed."); } } } @@ -550,9 +552,10 @@ impl Poller { publish_interval: prod_entry.publish_interval.clone(), } } else { - warn!(&self.logger, "Oracle: INTERNAL: could not find product from price `prod` field, market hours falling back to 24/7."; - "price" => price_key.to_string(), - "missing_product" => price_entry.prod.to_string(), + tracing::warn!( + price = price_key.to_string(), + missing_product = price_entry.prod.to_string(), + "Oracle: INTERNAL: could not find product from price `prod` field, market hours falling back to 24/7.", ); Default::default() }; @@ -649,13 +652,12 @@ impl Poller { product.iter().find(|(k, _v)| *k == "weekly_schedule") { wsched_val.parse().unwrap_or_else(|err| { - warn!( - self.logger, - "Oracle: Product has weekly_schedule defined but it could not be parsed. Falling back to 24/7 publishing."; - "product_key" => product_key.to_string(), - "weekly_schedule" => wsched_val, + tracing::warn!( + product_key = product_key.to_string(), + weekly_schedule = wsched_val, + "Oracle: Product has weekly_schedule defined but it could not be parsed. Falling back to 24/7 publishing.", ); - debug!(self.logger, "parsing error context"; "context" => format!("{:?}", err)); + tracing::debug!(err = ?err, "Parsing error context."); Default::default() }) } else { @@ -671,13 +673,12 @@ impl Poller { match msched_val.parse::() { Ok(schedule) => Some(schedule), Err(err) => { - warn!( - self.logger, - "Oracle: Product has schedule defined but it could not be parsed. Falling back to legacy schedule."; - "product_key" => product_key.to_string(), - "schedule" => msched_val, + tracing::warn!( + product_key = product_key.to_string(), + schedule = msched_val, + "Oracle: Product has schedule defined but it could not be parsed. Falling back to legacy schedule.", ); - debug!(self.logger, "parsing error context"; "context" => format!("{:?}", err)); + tracing::debug!(err = ?err, "Parsing error context."); None } } @@ -694,13 +695,12 @@ impl Poller { match publish_interval_val.parse::() { Ok(interval) => Some(Duration::from_secs_f64(interval)), Err(err) => { - warn!( - self.logger, - "Oracle: Product has publish_interval defined but it could not be parsed. Falling back to None."; - "product_key" => product_key.to_string(), - "publish_interval" => publish_interval_val, + tracing::warn!( + product_key = product_key.to_string(), + publish_interval = publish_interval_val, + "Oracle: Product has publish_interval defined but it could not be parsed. Falling back to None.", ); - debug!(self.logger, "parsing error context"; "context" => format!("{:?}", err)); + tracing::debug!(err = ?err, "parsing error context"); None } } @@ -718,8 +718,10 @@ impl Poller { }, ); } else { - warn!(self.logger, "Oracle: Could not find product on chain, skipping"; - "product_key" => product_key.to_string(),); + tracing::warn!( + product_key = product_key.to_string(), + "Oracle: Could not find product on chain, skipping", + ); } } @@ -755,9 +757,10 @@ impl Poller { prod.price_accounts.push(*price_key); price_entries.insert(*price_key, price); } else { - warn!(self.logger, "Could not find product entry for price, listed in its prod field, skipping"; - "missing_product" => price.prod.to_string(), - "price_key" => price_key.to_string(), + tracing::warn!( + missing_product = price.prod.to_string(), + price_key = price_key.to_string(), + "Could not find product entry for price, listed in its prod field, skipping", ); continue; @@ -767,7 +770,10 @@ impl Poller { next_todo.push(next_price); } } else { - warn!(self.logger, "Could not look up price account on chain, skipping"; "price_key" => price_key.to_string(),); + tracing::warn!( + price_key = price_key.to_string(), + "Could not look up price account on chain, skipping", + ); continue; } } @@ -784,7 +790,6 @@ mod subscriber { anyhow, Result, }, - slog::Logger, solana_account_decoder::UiAccountEncoding, solana_client::{ nonblocking::pubsub_client::PubsubClient, @@ -823,8 +828,6 @@ mod subscriber { /// Channel on which updates are sent updates_tx: mpsc::Sender<(Pubkey, solana_sdk::account::Account)>, - - logger: Logger, } impl Subscriber { @@ -833,14 +836,12 @@ mod subscriber { commitment: CommitmentLevel, program_key: Pubkey, updates_tx: mpsc::Sender<(Pubkey, solana_sdk::account::Account)>, - logger: Logger, ) -> Self { Subscriber { wss_url, commitment, program_key, updates_tx, - logger, } } @@ -848,13 +849,9 @@ mod subscriber { loop { let current_time = Instant::now(); if let Err(ref err) = self.start().await { - error!(self.logger, "{}", err); - debug!(self.logger, "error context"; "context" => format!("{:?}", err)); + tracing::error!(err = ?err, "Oracle exited unexpectedly."); if current_time.elapsed() < Duration::from_secs(30) { - warn!( - self.logger, - "Subscriber restarting too quickly. Sleeping for 1 second." - ); + tracing::warn!("Subscriber restarting too quickly. Sleeping for 1 second."); tokio::time::sleep(Duration::from_secs(1)).await; } } @@ -880,7 +877,10 @@ mod subscriber { .program_subscribe(&self.program_key, Some(config)) .await?; - debug!(self.logger, "subscribed to program account updates"; "program_key" => self.program_key.to_string()); + tracing::debug!( + program_key = self.program_key.to_string(), + "subscribed to program account updates", + ); loop { match tokio_stream::StreamExt::next(&mut notif).await { @@ -888,7 +888,10 @@ mod subscriber { let account: Account = match update.value.account.decode() { Some(account) => account, None => { - error!(self.logger, "Failed to decode account from update."; "update" => format!("{:?}", update)); + tracing::error!( + update = ?update, + "Failed to decode account from update.", + ); continue; } }; @@ -899,7 +902,7 @@ mod subscriber { .map_err(|_| anyhow!("failed to send update to oracle"))?; } None => { - debug!(self.logger, "subscriber closed connection"); + tracing::debug!("subscriber closed connection"); return Ok(()); } } diff --git a/src/agent/state.rs b/src/agent/state.rs index cb996353..6c742850 100644 --- a/src/agent/state.rs +++ b/src/agent/state.rs @@ -11,7 +11,6 @@ use { Deserialize, Serialize, }, - slog::Logger, std::time::Duration, tokio::sync::mpsc, }; @@ -74,11 +73,11 @@ struct NotifyPriceSubscription { } impl State { - pub async fn new(config: Config, logger: Logger) -> Self { + pub async fn new(config: Config) -> Self { let registry = &mut *PROMETHEUS_REGISTRY.lock().await; State { - global_store: global::Store::new(logger.clone(), registry), - local_store: local::Store::new(logger.clone(), registry), + global_store: global::Store::new(registry), + local_store: local::Store::new(registry), keypairs: keypairs::KeypairState::default(), prices: api::PricesState::new(config), } @@ -119,7 +118,6 @@ mod tests { local::LocalStore, }, }, - iobuffer::IoBuffer, pyth_sdk::Identifier, pyth_sdk_solana::state::{ PriceComp, @@ -129,7 +127,6 @@ mod tests { Rational, SolanaPriceAccount, }, - slog_extlog::slog_test, std::{ collections::{ BTreeMap, @@ -156,15 +153,14 @@ mod tests { async fn setup() -> TestState { let notify_price_sched_interval_duration = Duration::from_nanos(10); - let logger = slog_test::new_test_logger(IoBuffer::new()); let config = Config { notify_price_sched_interval_duration, }; - let state = Arc::new(State::new(config, logger.clone()).await); + let state = Arc::new(State::new(config).await); let (shutdown_tx, _) = broadcast::channel(1); // Spawn Price Notifier - let jh = tokio::spawn(notifier(logger, state.clone())); + let jh = tokio::spawn(notifier(state.clone())); TestState { state, diff --git a/src/agent/state/api.rs b/src/agent/state/api.rs index 98944d21..dfa6e5ae 100644 --- a/src/agent/state/api.rs +++ b/src/agent/state/api.rs @@ -44,7 +44,6 @@ use { PriceComp, PriceStatus, }, - slog::Logger, std::{ collections::HashMap, sync::{ @@ -447,7 +446,7 @@ where } } -pub async fn notifier(logger: Logger, state: Arc) +pub async fn notifier(state: Arc) where for<'a> &'a S: Into<&'a PricesState>, S: Prices, @@ -459,13 +458,12 @@ where Prices::drop_closed_subscriptions(&*state).await; tokio::select! { _ = exit.changed() => { - info!(logger, "shutdown signal received"); + tracing::info!("Shutdown signal received."); return; } _ = interval.tick() => { if let Err(err) = state.send_notify_price_sched().await { - error!(logger, "{}", err); - debug!(logger, "error context"; "context" => format!("{:?}", err)); + tracing::error!(err = ?err, "Notifier: failed to send notify price sched."); } } } diff --git a/src/agent/state/global.rs b/src/agent/state/global.rs index cdfad98c..36033a76 100644 --- a/src/agent/state/global.rs +++ b/src/agent/state/global.rs @@ -22,7 +22,6 @@ use { Result, }, prometheus_client::registry::Registry, - slog::Logger, solana_sdk::pubkey::Pubkey, std::collections::{ BTreeMap, @@ -116,20 +115,16 @@ pub struct Store { /// Prometheus metrics for prices price_metrics: PriceGlobalMetrics, - - /// Shared logger configuration. - logger: Logger, } impl Store { - pub fn new(logger: Logger, registry: &mut Registry) -> Self { + pub fn new(registry: &mut Registry) -> Self { Store { - account_data_primary: Default::default(), + account_data_primary: Default::default(), account_data_secondary: Default::default(), - account_metadata: Default::default(), - product_metrics: ProductGlobalMetrics::new(registry), - price_metrics: PriceGlobalMetrics::new(registry), - logger, + account_metadata: Default::default(), + product_metrics: ProductGlobalMetrics::new(registry), + price_metrics: PriceGlobalMetrics::new(registry), } } } @@ -257,10 +252,11 @@ where // This message is not an error. It is common // for primary and secondary network to have // slight difference in their timestamps. - debug!(store.logger, "Global store: ignoring stale update of an existing newer price"; - "price_key" => account_key.to_string(), - "existing_timestamp" => existing_price.timestamp, - "new_timestamp" => account.timestamp, + tracing::debug!( + price_key = account_key.to_string(), + existing_timestamp = existing_price.timestamp, + new_timestamp = account.timestamp, + "Global store: ignoring stale update of an existing newer price" ); return Ok(()); } diff --git a/src/agent/state/keypairs.rs b/src/agent/state/keypairs.rs index 73d4da8a..a93eca5c 100644 --- a/src/agent/state/keypairs.rs +++ b/src/agent/state/keypairs.rs @@ -10,7 +10,6 @@ use { Result, }, serde::Deserialize, - slog::Logger, solana_client::nonblocking::rpc_client::RpcClient, solana_sdk::{ commitment_config::CommitmentConfig, @@ -117,7 +116,6 @@ pub async fn spawn( primary_rpc_url: String, secondary_rpc_url: Option, config: Config, - logger: Logger, state: Arc, ) -> Vec> where @@ -128,12 +126,14 @@ where let ip = config.bind_address.ip(); if !ip.is_loopback() { - warn!(logger, "Remote key loader: bind address is not localhost. Make sure the access on the selected address is secure."; "bind_address" => config.bind_address,); + tracing::warn!( + bind_address = ?config.bind_address, + "Remote key loader: bind address is not localhost. Make sure the access on the selected address is secure.", + ); } let primary_upload_route = { let state = state.clone(); - let logger = logger.clone(); let rpc_url = primary_rpc_url.clone(); let min_balance = config.primary_min_keypair_balance_sol; warp::path!("primary" / "load_keypair") @@ -143,7 +143,6 @@ where .and(warp::path::end()) .and_then(move |kp: Vec| { let state = state.clone(); - let logger = logger.clone(); let rpc_url = rpc_url.clone(); async move { let response = handle_new_keypair( @@ -153,7 +152,6 @@ where min_balance, rpc_url, "primary", - logger, ) .await; Result::, Rejection>::Ok(response) @@ -168,7 +166,6 @@ where .and(warp::path::end()) .and_then(move |kp: Vec| { let state = state.clone(); - let logger = logger.clone(); let rpc_url = secondary_rpc_url.clone(); async move { if let Some(rpc_url) = rpc_url { @@ -180,7 +177,6 @@ where min_balance, rpc_url, "secondary", - logger, ) .await; Result::, Rejection>::Ok(response) @@ -219,7 +215,6 @@ async fn handle_new_keypair<'a, 'b: 'a, S>( min_keypair_balance_sol: u64, rpc_url: String, network_name: &'b str, - logger: Logger, ) -> WithStatus<&'static str> where S: Keypairs, @@ -231,17 +226,19 @@ where Keypairs::update_keypair(&*state, network, kp).await; } Err(e) => { - warn!(logger, "Remote keypair loader: Keypair failed validation"; - "network" => network_name, - "error" => e.to_string(), + tracing::warn!( + network = network_name, + error = e.to_string(), + "Remote keypair loader: Keypair failed validation", ); upload_ok = false; } }, Err(e) => { - warn!(logger, "Remote keypair loader: Keypair failed validation"; - "network" => network_name, - "error" => e.to_string(), + tracing::warn!( + network = network_name, + error = e.to_string(), + "Remote keypair loader: Keypair failed validation", ); upload_ok = false; } diff --git a/src/agent/state/local.rs b/src/agent/state/local.rs index 355e0399..5e228298 100644 --- a/src/agent/state/local.rs +++ b/src/agent/state/local.rs @@ -11,7 +11,6 @@ use { chrono::NaiveDateTime, prometheus_client::registry::Registry, pyth_sdk_solana::state::PriceStatus, - slog::Logger, solana_sdk::bs58, std::collections::HashMap, tokio::sync::RwLock, @@ -44,15 +43,13 @@ impl PriceInfo { pub struct Store { prices: RwLock>, metrics: PriceLocalMetrics, - logger: Logger, } impl Store { - pub fn new(logger: Logger, registry: &mut Registry) -> Self { + pub fn new(registry: &mut Registry) -> Self { Store { - prices: RwLock::new(HashMap::new()), + prices: RwLock::new(HashMap::new()), metrics: PriceLocalMetrics::new(registry), - logger, } } } @@ -85,7 +82,10 @@ where price_identifier: pyth_sdk::Identifier, price_info: PriceInfo, ) -> Result<()> { - debug!(self.into().logger, "local store received price update"; "identifier" => bs58::encode(price_identifier.to_bytes()).into_string()); + tracing::debug!( + identifier = bs58::encode(price_identifier.to_bytes()).into_string(), + "Local store received price update." + ); // Drop the update if it is older than the current one stored for the price if let Some(current_price_info) = self.into().prices.read().await.get(&price_identifier) { diff --git a/src/bin/agent.rs b/src/bin/agent.rs index 98e5194a..3a515f7c 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -4,27 +4,13 @@ use { Context, Result, }, - clap::{ - Parser, - ValueEnum, - }, + clap::Parser, pyth_agent::agent::{ config::Config, Agent, }, - slog::{ - debug, - error, - o, - Drain, - Logger, - PushFnValue, - Record, - }, - slog_async::Async, - slog_envlogger::LogBuilder, std::{ - env, + io::IsTerminal, path::PathBuf, }, }; @@ -35,26 +21,30 @@ use { struct Arguments { #[clap(short, long, default_value = "config/config.toml")] /// Path to configuration file - config: PathBuf, - #[clap(short, long, default_value = "plain", value_enum)] - /// Log flavor to use - log_flavor: LogFlavor, + config: PathBuf, #[clap(short = 'L', long)] /// Whether to print file:line info for each log statement log_locations: bool, } -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] -enum LogFlavor { - /// Standard human-readable output - Plain, - /// Structured JSON output - Json, -} - #[tokio::main] async fn main() -> Result<()> { + // Initialize a Tracing Subscriber + let fmt_builder = tracing_subscriber::fmt() + .with_file(false) + .with_line_number(true) + .with_thread_ids(true) + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_ansi(std::io::stderr().is_terminal()); + + // Use the compact formatter if we're in a terminal, otherwise use the JSON formatter. + if std::io::stderr().is_terminal() { + tracing::subscriber::set_global_default(fmt_builder.compact().finish())?; + } else { + tracing::subscriber::set_global_default(fmt_builder.json().finish())?; + } + let args = Arguments::parse(); if !args.config.as_path().exists() { @@ -66,66 +56,18 @@ async fn main() -> Result<()> { // Parse config early for logging channel capacity let config = Config::new(args.config).context("Could not parse config")?; - let log_level = env::var("RUST_LOG").unwrap_or("info".to_string()); - - // Build an async drain with a different inner drain depending on - // log flavor choice in CLI - let async_drain = match args.log_flavor { - LogFlavor::Json => { - // JSON output using slog-bunyan - let inner_drain = LogBuilder::new( - slog_bunyan::with_name(env!("CARGO_PKG_NAME"), std::io::stdout()) - .build() - .fuse(), - ) - .parse(&log_level) - .build(); - - Async::new(inner_drain) - .chan_size(config.channel_capacities.logger_buffer) - .build() - .fuse() - } - LogFlavor::Plain => { - // Plain, colored output usind slog-term - let inner_drain = LogBuilder::new( - slog_term::FullFormat::new(slog_term::TermDecorator::new().stdout().build()) - .build() - .fuse(), - ) - .parse(&log_level) - .build(); - - Async::new(inner_drain) - .chan_size(config.channel_capacities.logger_buffer) - .build() - .fuse() - } - }; - - let mut logger = slog::Logger::root(async_drain, o!()); - - // Add location information to each log statement if enabled - if args.log_locations { - logger = logger.new(o!( - "loc" => PushFnValue( - move |r: &Record, ser| { - ser.emit(format!("{}:{}", r.file(), r.line())) - } - ), - )); - } - - if let Err(err) = start(config, logger.clone()).await { - error!(logger, "{}", err); - debug!(logger, "error context"; "context" => format!("{:?}", err)); + // Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE + // should be set to 1 for this otherwise it will only print the top-level error. + if let Err(err) = start(config).await { + eprintln!("{}", err.backtrace()); + err.chain().for_each(|cause| eprintln!("{cause}")); return Err(err); } Ok(()) } -async fn start(config: Config, logger: Logger) -> Result<()> { - Agent::new(config).start(logger).await; +async fn start(config: Config) -> Result<()> { + Agent::new(config).start().await; Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 8792531a..f17bc55d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1 @@ -// The typed-html crate does pretty deep macro calls. Bump if -// recursion limit compilation errors return for html!() calls. -#![recursion_limit = "256"] -#[macro_use] -extern crate slog; -extern crate slog_term; - pub mod agent;