Skip to content

Commit

Permalink
refactor(agent): extract oracle component/service (#128)
Browse files Browse the repository at this point in the history
* refactor(agent): move notifier into services

* refactor(agent): move keypairs into services

* refactor(agent): extract config module

* refactor(agent): extract oracle component/service

* refactor(agent): extract exporter component/service

* chore: bump version

---------

Co-authored-by: Ali Behjati <[email protected]>
  • Loading branch information
Reisen and ali-bahjati authored Jun 28, 2024
1 parent fee170f commit 05ef724
Show file tree
Hide file tree
Showing 20 changed files with 2,552 additions and 1,359 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "2.8.0"
version = "2.9.0"
edition = "2021"

[[bin]]
Expand Down
122 changes: 24 additions & 98 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,23 @@ Note that there is an Oracle and Exporter for each network, but only one Local S
################################################################################################################################## */
use {
self::{
config::Config,
pyth::rpc,
solana::network,
state::notifier,
},
anyhow::Result,
config::Config,
futures_util::future::join_all,
lazy_static::lazy_static,
std::sync::Arc,
tokio::sync::watch,
};

pub mod config;
pub mod legacy_schedule;
pub mod market_schedule;
pub mod metrics;
pub mod pyth;
pub mod services;
pub mod solana;
pub mod state;

Expand Down Expand Up @@ -125,26 +126,38 @@ impl Agent {
let mut jhs = vec![];

// Create the Application State.
let state = Arc::new(state::State::new(self.config.state.clone()).await);
let state = Arc::new(state::State::new(&self.config).await);

// Spawn the primary network
jhs.extend(network::spawn_network(
// Spawn the primary network Oracle.
jhs.push(tokio::spawn(services::oracle(
self.config.primary_network.clone(),
network::Network::Primary,
state.clone(),
)?);
)));

jhs.push(tokio::spawn(services::exporter(
self.config.primary_network.clone(),
network::Network::Primary,
state.clone(),
)));

// Spawn the secondary network, if needed
// Spawn the secondary network Oracle, if needed.
if let Some(config) = &self.config.secondary_network {
jhs.extend(network::spawn_network(
jhs.push(tokio::spawn(services::oracle(
config.clone(),
network::Network::Secondary,
state.clone(),
)?);
)));

jhs.push(tokio::spawn(services::exporter(
config.clone(),
network::Network::Secondary,
state.clone(),
)));
}

// Create the Notifier task for the Pythd RPC.
jhs.push(tokio::spawn(notifier(state.clone())));
jhs.push(tokio::spawn(services::notifier(state.clone())));

// Spawn the Pythd API Server
jhs.push(tokio::spawn(rpc::run(
Expand All @@ -159,7 +172,7 @@ impl Agent {

// Spawn the remote keypair loader endpoint for both networks
jhs.append(
&mut state::keypairs::spawn(
&mut services::keypairs(
self.config.primary_network.rpc_url.clone(),
self.config
.secondary_network
Expand All @@ -177,90 +190,3 @@ impl Agent {
Ok(())
}
}

pub mod config {
use {
super::{
metrics,
pyth,
solana::network,
state,
},
anyhow::Result,
config as config_rs,
config_rs::{
Environment,
File,
},
serde::Deserialize,
std::path::Path,
};

/// Configuration for all components of the Agent
#[derive(Deserialize, Debug)]
pub struct Config {
#[serde(default)]
pub channel_capacities: ChannelCapacities,
pub primary_network: network::Config,
pub secondary_network: Option<network::Config>,
#[serde(default)]
#[serde(rename = "pythd_adapter")]
pub state: state::Config,
#[serde(default)]
pub pythd_api_server: pyth::rpc::Config,
#[serde(default)]
pub metrics_server: metrics::Config,
#[serde(default)]
pub remote_keypair_loader: state::keypairs::Config,
}

impl Config {
pub fn new(config_file: impl AsRef<Path>) -> Result<Self> {
// Build a new configuration object, allowing the default values to be
// overridden by those in the config_file or "AGENT_"-prefixed environment
// variables.
config_rs::Config::builder()
.add_source(File::from(config_file.as_ref()))
.add_source(Environment::with_prefix("agent"))
.build()?
.try_deserialize()
.map_err(|e| e.into())
}
}

/// Capacities of the channels top-level components use to communicate
#[derive(Deserialize, Debug)]
pub struct ChannelCapacities {
/// Capacity of the channel used to broadcast shutdown events to all components
pub shutdown: usize,
/// Capacity of the channel used to send updates from the primary Oracle to the Global Store
pub primary_oracle_updates: usize,
/// Capacity of the channel used to send updates from the secondary Oracle to the Global Store
pub secondary_oracle_updates: usize,
/// Capacity of the channel the Pythd API Adapter uses to send lookup requests to the Global Store
pub global_store_lookup: usize,
/// Capacity of the channel the Pythd API Adapter uses to communicate with the Local Store
pub local_store_lookup: usize,
/// Capacity of the channel on which the Local Store receives messages
pub local_store: usize,
/// Capacity of the channel on which the Pythd API Adapter receives messages
pub pythd_adapter: usize,
/// Capacity of the slog logging channel. Adjust this value if you see complaints about channel capacity from slog
pub logger_buffer: usize,
}

impl Default for ChannelCapacities {
fn default() -> Self {
Self {
shutdown: 10000,
primary_oracle_updates: 10000,
secondary_oracle_updates: 10000,
global_store_lookup: 10000,
local_store_lookup: 10000,
local_store: 10000,
pythd_adapter: 10000,
logger_buffer: 10000,
}
}
}
}
85 changes: 85 additions & 0 deletions src/agent/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use {
super::{
metrics,
pyth,
services,
solana::network,
state,
},
anyhow::Result,
config as config_rs,
config_rs::{
Environment,
File,
},
serde::Deserialize,
std::path::Path,
};

/// Configuration for all components of the Agent
#[derive(Deserialize, Debug)]
pub struct Config {
#[serde(default)]
pub channel_capacities: ChannelCapacities,
pub primary_network: network::Config,
pub secondary_network: Option<network::Config>,
#[serde(default)]
#[serde(rename = "pythd_adapter")]
pub state: state::Config,
#[serde(default)]
pub pythd_api_server: pyth::rpc::Config,
#[serde(default)]
pub metrics_server: metrics::Config,
#[serde(default)]
pub remote_keypair_loader: services::keypairs::Config,
}

impl Config {
pub fn new(config_file: impl AsRef<Path>) -> Result<Self> {
// Build a new configuration object, allowing the default values to be
// overridden by those in the config_file or "AGENT_"-prefixed environment
// variables.
config_rs::Config::builder()
.add_source(File::from(config_file.as_ref()))
.add_source(Environment::with_prefix("agent"))
.build()?
.try_deserialize()
.map_err(|e| e.into())
}
}

/// Capacities of the channels top-level components use to communicate
#[derive(Deserialize, Debug)]
pub struct ChannelCapacities {
/// Capacity of the channel used to broadcast shutdown events to all components
pub shutdown: usize,
/// Capacity of the channel used to send updates from the primary Oracle to the Global Store
pub primary_oracle_updates: usize,
/// Capacity of the channel used to send updates from the secondary Oracle to the Global Store
pub secondary_oracle_updates: usize,
/// Capacity of the channel the Pythd API Adapter uses to send lookup requests to the Global Store
pub global_store_lookup: usize,
/// Capacity of the channel the Pythd API Adapter uses to communicate with the Local Store
pub local_store_lookup: usize,
/// Capacity of the channel on which the Local Store receives messages
pub local_store: usize,
/// Capacity of the channel on which the Pythd API Adapter receives messages
pub pythd_adapter: usize,
/// Capacity of the slog logging channel. Adjust this value if you see complaints about channel capacity from slog
pub logger_buffer: usize,
}

impl Default for ChannelCapacities {
fn default() -> Self {
Self {
shutdown: 10000,
primary_oracle_updates: 10000,
secondary_oracle_updates: 10000,
global_store_lookup: 10000,
local_store_lookup: 10000,
local_store: 10000,
pythd_adapter: 10000,
logger_buffer: 10000,
}
}
}
2 changes: 1 addition & 1 deletion src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
super::state::local::PriceInfo,
crate::agent::solana::oracle::PriceEntry,
crate::agent::state::oracle::PriceEntry,
lazy_static::lazy_static,
prometheus_client::{
encoding::{
Expand Down
11 changes: 11 additions & 0 deletions src/agent/services.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
pub mod exporter;
pub mod keypairs;
pub mod notifier;
pub mod oracle;

pub use {
exporter::exporter,
keypairs::keypairs,
notifier::notifier,
oracle::oracle,
};
Loading

0 comments on commit 05ef724

Please sign in to comment.