Skip to content

Commit

Permalink
refactor(agent): convert to a tracing logger
Browse files Browse the repository at this point in the history
  • Loading branch information
Reisen committed Jun 5, 2024
1 parent 663a137 commit a8ccdab
Show file tree
Hide file tree
Showing 16 changed files with 369 additions and 648 deletions.
324 changes: 94 additions & 230 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 2 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,11 @@ solana-account-decoder = "1.18.8"
solana-client = "1.18.8"
solana-sdk = "1.18.8"
bincode = "1.3.3"
slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_trace"] }
slog-term = "2.9.1"
rand = "0.8.5"
slog-async = "2.8.0"
config = "0.14.0"
thiserror = "1.0.58"
clap = { version = "4.5.4", features = ["derive"] }
humantime-serde = "1.1.1"
slog-envlogger = "2.2.0"
serde-this-or-that = "0.4.2"
# The public typed-html 0.2.2 release is causing a recursion limit
# error that cannot be fixed from outside the crate.
Expand All @@ -52,17 +48,17 @@ humantime = "2.1.0"
prometheus-client = "0.22.2"
lazy_static = "1.4.0"
toml_edit = "0.22.9"
slog-bunyan = "2.5.0"
winnow = "0.6.5"
proptest = "1.4.0"
tracing = { version = "0.1.40", features = ["log"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }

[dev-dependencies]
tokio-util = { version = "0.7.10", features = ["full"] }
soketto = "0.8.0"
portpicker = "0.1.1"
rand = "0.8.5"
tokio-retry = "0.3.0"
slog-extlog = "8.1.0"
iobuffer = "0.2.0"

[profile.release]
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ async def test_update_price_discards_unpermissioned(self, client: PythAgentClien
lines_found += 1
expected_unperm_pubkey = final_price_account_unperm["account"]
# Must point at the expected account as all other attempts must be valid
assert f"price_account: {expected_unperm_pubkey}" in line
assert f'"unpermissioned_price_account":"{expected_unperm_pubkey}"' in line

# Must find at least one log discarding the account
assert lines_found > 0
Expand Down
34 changes: 15 additions & 19 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ use {
anyhow::Result,
futures_util::future::join_all,
lazy_static::lazy_static,
slog::Logger,
std::sync::Arc,
tokio::sync::watch,
};
Expand Down Expand Up @@ -105,31 +104,33 @@ impl Agent {
Agent { config }
}

pub async fn start(&self, logger: Logger) {
info!(logger, "Starting {}", env!("CARGO_PKG_NAME");
"config" => format!("{:?}", &self.config),
"version" => env!("CARGO_PKG_VERSION"),
"cwd" => std::env::current_dir().map(|p| format!("{}", p.display())).unwrap_or("<could not get current directory>".to_owned())
pub async fn start(&self) {
tracing::info!(
config = format!("{:?}", &self.config),
version = env!("CARGO_PKG_VERSION"),
cwd = std::env::current_dir()
.map(|p| format!("{}", p.display()))
.unwrap_or("<could not get current directory>".to_owned()),
"Starting {}",
env!("CARGO_PKG_NAME"),
);

if let Err(err) = self.spawn(logger.clone()).await {
error!(logger, "{}", err);
debug!(logger, "error context"; "context" => format!("{:?}", err));
if let Err(err) = self.spawn().await {
tracing::error!(err = ?err, "Agent spawn failed.");
};
}

async fn spawn(&self, logger: Logger) -> Result<()> {
async fn spawn(&self) -> Result<()> {
// job handles
let mut jhs = vec![];

// Create the Application State.
let state = Arc::new(state::State::new(self.config.state.clone(), logger.clone()).await);
let state = Arc::new(state::State::new(self.config.state.clone()).await);

// Spawn the primary network
jhs.extend(network::spawn_network(
self.config.primary_network.clone(),
network::Network::Primary,
logger.new(o!("primary" => true)),
state.clone(),
)?);

Expand All @@ -138,26 +139,22 @@ impl Agent {
jhs.extend(network::spawn_network(
config.clone(),
network::Network::Secondary,
logger.new(o!("primary" => false)),
state.clone(),
)?);
}

// Create the Notifier task for the Pythd RPC.
jhs.push(tokio::spawn(notifier(logger.clone(), state.clone())));
jhs.push(tokio::spawn(notifier(state.clone())));

// Spawn the Pythd API Server
jhs.push(tokio::spawn(rpc::run(
self.config.pythd_api_server.clone(),
logger.clone(),
state.clone(),
)));

// Spawn the metrics server
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(
jhs.push(tokio::spawn(metrics::spawn(
self.config.metrics_server.bind_address,
logger.clone(),
state.clone(),
)));

// Spawn the remote keypair loader endpoint for both networks
Expand All @@ -169,7 +166,6 @@ impl Agent {
.as_ref()
.map(|c| c.rpc_url.clone()),
self.config.remote_keypair_loader.clone(),
logger,
state,
)
.await,
Expand Down
77 changes: 26 additions & 51 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use {
super::state::{
local::PriceInfo,
State,
},
super::state::local::PriceInfo,
crate::agent::solana::oracle::PriceEntry,
lazy_static::lazy_static,
prometheus_client::{
Expand All @@ -18,15 +15,13 @@ use {
registry::Registry,
},
serde::Deserialize,
slog::Logger,
solana_sdk::pubkey::Pubkey,
std::{
net::SocketAddr,
sync::{
atomic::AtomicU64,
Arc,
},
time::Instant,
},
tokio::sync::Mutex,
warp::{
Expand Down Expand Up @@ -61,53 +56,33 @@ lazy_static! {
Arc::new(Mutex::new(<Registry>::default()));
}

/// Internal metrics server state, holds state needed for serving
/// metrics.
pub struct MetricsServer {
pub start_time: Instant,
pub logger: Logger,
pub state: Arc<State>,
}

impl MetricsServer {
/// Instantiate a metrics API.
pub async fn spawn(addr: impl Into<SocketAddr> + 'static, logger: Logger, state: Arc<State>) {
let server = MetricsServer {
start_time: Instant::now(),
logger,
state,
};

let shared_state = Arc::new(Mutex::new(server));
let shared_state4metrics = shared_state.clone();
let metrics_route = warp::path("metrics")
.and(warp::path::end())
.and_then(move || {
let shared_state = shared_state4metrics.clone();
async move {
let locked_state = shared_state.lock().await;
let mut buf = String::new();
let response = encode(&mut buf, &&PROMETHEUS_REGISTRY.lock().await)
.map_err(|e| -> Box<dyn std::error::Error> {
e.into()
})
.and_then(|_| -> Result<_, Box<dyn std::error::Error>> {
Ok(Box::new(reply::with_status(buf, StatusCode::OK)))
}).unwrap_or_else(|e| {
error!(locked_state.logger, "Metrics: Could not gather metrics from registry"; "error" => e.to_string());
Box::new(reply::with_status("Could not gather metrics. See logs for details".to_string(), StatusCode::INTERNAL_SERVER_ERROR))
});

Result::<Box<dyn Reply>, Rejection>::Ok(response)
}
});

let (_, serve) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async {
let _ = crate::agent::EXIT.subscribe().changed().await;
/// Instantiate a metrics API.
pub async fn spawn(addr: impl Into<SocketAddr> + 'static) {
let metrics_route = warp::path("metrics")
.and(warp::path::end())
.and_then(move || async move {
let mut buf = String::new();
let response = encode(&mut buf, &&PROMETHEUS_REGISTRY.lock().await)
.map_err(|e| -> Box<dyn std::error::Error> { e.into() })
.and_then(|_| -> Result<_, Box<dyn std::error::Error>> {
Ok(Box::new(reply::with_status(buf, StatusCode::OK)))
})
.unwrap_or_else(|e| {
tracing::error!(err = ?e, "Metrics: Could not gather metrics from registry");
Box::new(reply::with_status(
"Could not gather metrics. See logs for details".to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
))
});

Result::<Box<dyn Reply>, Rejection>::Ok(response)
});

serve.await
}
let (_, serve) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async {
let _ = crate::agent::EXIT.subscribe().changed().await;
});

serve.await
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
Expand Down
Loading

0 comments on commit a8ccdab

Please sign in to comment.