From 05ef7249d93ee7035c1ed7bd818e495d58c51432 Mon Sep 17 00:00:00 2001 From: Reisen Date: Fri, 28 Jun 2024 09:42:07 +0100 Subject: [PATCH] refactor(agent): extract oracle component/service (#128) * refactor(agent): move notifier into services * refactor(agent): move keypairs into services * refactor(agent): extract config module * refactor(agent): extract oracle component/service * refactor(agent): extract exporter component/service * chore: bump version --------- Co-authored-by: Ali Behjati --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/agent.rs | 122 +---- src/agent/config.rs | 85 +++ src/agent/metrics.rs | 2 +- src/agent/services.rs | 11 + src/agent/services/exporter.rs | 338 ++++++++++++ src/agent/services/keypairs.rs | 229 ++++++++ src/agent/services/notifier.rs | 31 ++ src/agent/services/oracle.rs | 189 +++++++ src/agent/solana.rs | 61 +-- src/agent/solana/exporter.rs | 2 +- src/agent/solana/oracle.rs | 914 -------------------------------- src/agent/state.rs | 58 +- src/agent/state/api.rs | 49 +- src/agent/state/exporter.rs | 830 +++++++++++++++++++++++++++++ src/agent/state/global.rs | 29 +- src/agent/state/keypairs.rs | 228 +------- src/agent/state/oracle.rs | 618 +++++++++++++++++++++ src/agent/state/transactions.rs | 111 ++++ 20 files changed, 2552 insertions(+), 1359 deletions(-) create mode 100644 src/agent/config.rs create mode 100644 src/agent/services.rs create mode 100644 src/agent/services/exporter.rs create mode 100644 src/agent/services/keypairs.rs create mode 100644 src/agent/services/notifier.rs create mode 100644 src/agent/services/oracle.rs delete mode 100644 src/agent/solana/oracle.rs create mode 100644 src/agent/state/exporter.rs create mode 100644 src/agent/state/oracle.rs create mode 100644 src/agent/state/transactions.rs diff --git a/Cargo.lock b/Cargo.lock index a6aee01..7741ae1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3211,7 +3211,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.8.0" +version = "2.9.0" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index d66c5bc..489ec98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.8.0" +version = "2.9.0" edition = "2021" [[bin]] diff --git a/src/agent.rs b/src/agent.rs index d0f6620..0395401 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -63,22 +63,23 @@ Note that there is an Oracle and Exporter for each network, but only one Local S ################################################################################################################################## */ use { self::{ - config::Config, pyth::rpc, solana::network, - state::notifier, }, anyhow::Result, + config::Config, futures_util::future::join_all, lazy_static::lazy_static, std::sync::Arc, tokio::sync::watch, }; +pub mod config; pub mod legacy_schedule; pub mod market_schedule; pub mod metrics; pub mod pyth; +pub mod services; pub mod solana; pub mod state; @@ -125,26 +126,38 @@ impl Agent { let mut jhs = vec![]; // Create the Application State. - let state = Arc::new(state::State::new(self.config.state.clone()).await); + let state = Arc::new(state::State::new(&self.config).await); - // Spawn the primary network - jhs.extend(network::spawn_network( + // Spawn the primary network Oracle. + jhs.push(tokio::spawn(services::oracle( self.config.primary_network.clone(), network::Network::Primary, state.clone(), - )?); + ))); + + jhs.push(tokio::spawn(services::exporter( + self.config.primary_network.clone(), + network::Network::Primary, + state.clone(), + ))); - // Spawn the secondary network, if needed + // Spawn the secondary network Oracle, if needed. if let Some(config) = &self.config.secondary_network { - jhs.extend(network::spawn_network( + jhs.push(tokio::spawn(services::oracle( config.clone(), network::Network::Secondary, state.clone(), - )?); + ))); + + jhs.push(tokio::spawn(services::exporter( + config.clone(), + network::Network::Secondary, + state.clone(), + ))); } // Create the Notifier task for the Pythd RPC. - jhs.push(tokio::spawn(notifier(state.clone()))); + jhs.push(tokio::spawn(services::notifier(state.clone()))); // Spawn the Pythd API Server jhs.push(tokio::spawn(rpc::run( @@ -159,7 +172,7 @@ impl Agent { // Spawn the remote keypair loader endpoint for both networks jhs.append( - &mut state::keypairs::spawn( + &mut services::keypairs( self.config.primary_network.rpc_url.clone(), self.config .secondary_network @@ -177,90 +190,3 @@ impl Agent { Ok(()) } } - -pub mod config { - use { - super::{ - metrics, - pyth, - solana::network, - state, - }, - anyhow::Result, - config as config_rs, - config_rs::{ - Environment, - File, - }, - serde::Deserialize, - std::path::Path, - }; - - /// Configuration for all components of the Agent - #[derive(Deserialize, Debug)] - pub struct Config { - #[serde(default)] - pub channel_capacities: ChannelCapacities, - pub primary_network: network::Config, - pub secondary_network: Option, - #[serde(default)] - #[serde(rename = "pythd_adapter")] - pub state: state::Config, - #[serde(default)] - pub pythd_api_server: pyth::rpc::Config, - #[serde(default)] - pub metrics_server: metrics::Config, - #[serde(default)] - pub remote_keypair_loader: state::keypairs::Config, - } - - impl Config { - pub fn new(config_file: impl AsRef) -> Result { - // Build a new configuration object, allowing the default values to be - // overridden by those in the config_file or "AGENT_"-prefixed environment - // variables. - config_rs::Config::builder() - .add_source(File::from(config_file.as_ref())) - .add_source(Environment::with_prefix("agent")) - .build()? - .try_deserialize() - .map_err(|e| e.into()) - } - } - - /// Capacities of the channels top-level components use to communicate - #[derive(Deserialize, Debug)] - pub struct ChannelCapacities { - /// Capacity of the channel used to broadcast shutdown events to all components - pub shutdown: usize, - /// Capacity of the channel used to send updates from the primary Oracle to the Global Store - pub primary_oracle_updates: usize, - /// Capacity of the channel used to send updates from the secondary Oracle to the Global Store - pub secondary_oracle_updates: usize, - /// Capacity of the channel the Pythd API Adapter uses to send lookup requests to the Global Store - pub global_store_lookup: usize, - /// Capacity of the channel the Pythd API Adapter uses to communicate with the Local Store - pub local_store_lookup: usize, - /// Capacity of the channel on which the Local Store receives messages - pub local_store: usize, - /// Capacity of the channel on which the Pythd API Adapter receives messages - pub pythd_adapter: usize, - /// Capacity of the slog logging channel. Adjust this value if you see complaints about channel capacity from slog - pub logger_buffer: usize, - } - - impl Default for ChannelCapacities { - fn default() -> Self { - Self { - shutdown: 10000, - primary_oracle_updates: 10000, - secondary_oracle_updates: 10000, - global_store_lookup: 10000, - local_store_lookup: 10000, - local_store: 10000, - pythd_adapter: 10000, - logger_buffer: 10000, - } - } - } -} diff --git a/src/agent/config.rs b/src/agent/config.rs new file mode 100644 index 0000000..d43d6ba --- /dev/null +++ b/src/agent/config.rs @@ -0,0 +1,85 @@ +use { + super::{ + metrics, + pyth, + services, + solana::network, + state, + }, + anyhow::Result, + config as config_rs, + config_rs::{ + Environment, + File, + }, + serde::Deserialize, + std::path::Path, +}; + +/// Configuration for all components of the Agent +#[derive(Deserialize, Debug)] +pub struct Config { + #[serde(default)] + pub channel_capacities: ChannelCapacities, + pub primary_network: network::Config, + pub secondary_network: Option, + #[serde(default)] + #[serde(rename = "pythd_adapter")] + pub state: state::Config, + #[serde(default)] + pub pythd_api_server: pyth::rpc::Config, + #[serde(default)] + pub metrics_server: metrics::Config, + #[serde(default)] + pub remote_keypair_loader: services::keypairs::Config, +} + +impl Config { + pub fn new(config_file: impl AsRef) -> Result { + // Build a new configuration object, allowing the default values to be + // overridden by those in the config_file or "AGENT_"-prefixed environment + // variables. + config_rs::Config::builder() + .add_source(File::from(config_file.as_ref())) + .add_source(Environment::with_prefix("agent")) + .build()? + .try_deserialize() + .map_err(|e| e.into()) + } +} + +/// Capacities of the channels top-level components use to communicate +#[derive(Deserialize, Debug)] +pub struct ChannelCapacities { + /// Capacity of the channel used to broadcast shutdown events to all components + pub shutdown: usize, + /// Capacity of the channel used to send updates from the primary Oracle to the Global Store + pub primary_oracle_updates: usize, + /// Capacity of the channel used to send updates from the secondary Oracle to the Global Store + pub secondary_oracle_updates: usize, + /// Capacity of the channel the Pythd API Adapter uses to send lookup requests to the Global Store + pub global_store_lookup: usize, + /// Capacity of the channel the Pythd API Adapter uses to communicate with the Local Store + pub local_store_lookup: usize, + /// Capacity of the channel on which the Local Store receives messages + pub local_store: usize, + /// Capacity of the channel on which the Pythd API Adapter receives messages + pub pythd_adapter: usize, + /// Capacity of the slog logging channel. Adjust this value if you see complaints about channel capacity from slog + pub logger_buffer: usize, +} + +impl Default for ChannelCapacities { + fn default() -> Self { + Self { + shutdown: 10000, + primary_oracle_updates: 10000, + secondary_oracle_updates: 10000, + global_store_lookup: 10000, + local_store_lookup: 10000, + local_store: 10000, + pythd_adapter: 10000, + logger_buffer: 10000, + } + } +} diff --git a/src/agent/metrics.rs b/src/agent/metrics.rs index 4c830ce..424bc32 100644 --- a/src/agent/metrics.rs +++ b/src/agent/metrics.rs @@ -1,6 +1,6 @@ use { super::state::local::PriceInfo, - crate::agent::solana::oracle::PriceEntry, + crate::agent::state::oracle::PriceEntry, lazy_static::lazy_static, prometheus_client::{ encoding::{ diff --git a/src/agent/services.rs b/src/agent/services.rs new file mode 100644 index 0000000..3e4f829 --- /dev/null +++ b/src/agent/services.rs @@ -0,0 +1,11 @@ +pub mod exporter; +pub mod keypairs; +pub mod notifier; +pub mod oracle; + +pub use { + exporter::exporter, + keypairs::keypairs, + notifier::notifier, + oracle::oracle, +}; diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs new file mode 100644 index 0000000..cdee65d --- /dev/null +++ b/src/agent/services/exporter.rs @@ -0,0 +1,338 @@ +use { + crate::agent::{ + solana::network::{ + self, + Network, + }, + state::{ + exporter::Exporter, + transactions::Transactions, + }, + }, + anyhow::Result, + futures_util::future, + serde::{ + Deserialize, + Serialize, + }, + solana_client::nonblocking::rpc_client::RpcClient, + solana_sdk::commitment_config::CommitmentConfig, + std::{ + sync::Arc, + time::Duration, + }, + tokio::{ + sync::watch, + time::Interval, + }, +}; + +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(default)] +pub struct Config { + /// Duration of the interval at which to refresh the cached network state (current slot and blockhash). + /// It is recommended to set this to slightly less than the network's block time, + /// as the slot fetched will be used as the time of the price update. + #[serde(with = "humantime_serde")] + pub refresh_network_state_interval_duration: Duration, + /// Duration of the interval at which to publish updates + #[serde(with = "humantime_serde")] + pub publish_interval_duration: Duration, + /// Age after which a price update is considered stale and not published + #[serde(with = "humantime_serde")] + pub staleness_threshold: Duration, + /// Wait at least this long before publishing an unchanged price + /// state; unchanged price state means only timestamp has changed + /// with other state identical to last published state. + pub unchanged_publish_threshold: Duration, + /// Maximum size of a batch + pub max_batch_size: usize, + /// Capacity of the channel between the Exporter and the Transaction Monitor + pub inflight_transactions_channel_capacity: usize, + /// Configuration for the Transaction Monitor + pub transaction_monitor: transaction_monitor::Config, + /// Number of compute units requested per update_price instruction within the transaction + /// (i.e., requested units equals `n * compute_unit_limit`, where `n` is the number of update_price + /// instructions) + pub compute_unit_limit: u32, + /// Price per compute unit offered for update_price transactions If dynamic compute unit is + /// enabled and this value is set, the actual price per compute unit will be the maximum of the + /// network dynamic price and this value. + pub compute_unit_price_micro_lamports: Option, + /// Enable using dynamic price per compute unit based on the network previous prioritization + /// fees. + pub dynamic_compute_unit_pricing_enabled: bool, + /// Maximum total compute unit fee paid for a single transaction. Defaults to 0.001 SOL. This + /// is a safety measure while using dynamic compute price to prevent the exporter from paying + /// too much for a single transaction + pub maximum_compute_unit_price_micro_lamports: u64, + /// Maximum slot gap between the current slot and the oldest slot amongst all the accounts in + /// the batch. This is used to calculate the dynamic price per compute unit. When the slot gap + /// reaches this number we will use the maximum total_compute_fee for the transaction. + pub maximum_slot_gap_for_dynamic_compute_unit_price: u64, +} + +impl Default for Config { + fn default() -> Self { + Self { + refresh_network_state_interval_duration: Duration::from_millis(200), + publish_interval_duration: Duration::from_secs(1), + staleness_threshold: Duration::from_secs(5), + unchanged_publish_threshold: Duration::from_secs(5), + max_batch_size: 12, + inflight_transactions_channel_capacity: 10000, + transaction_monitor: Default::default(), + // The largest transactions without accumulator spend around 38k compute units + // and accumulator cpi costs around 10k compute units. We set the limit to 60k + // to have some buffer. + compute_unit_limit: 60000, + compute_unit_price_micro_lamports: None, + dynamic_compute_unit_pricing_enabled: false, + // Maximum compute unit price (as a cap on the dynamic price) + maximum_compute_unit_price_micro_lamports: 1_000_000, + // A publisher update is not included if it is 25 slots behind the current slot. + // Due to the delay in the network (until a block gets confirmed) and potential + // ws issues we add 15 slots to make sure we do not overpay. + maximum_slot_gap_for_dynamic_compute_unit_price: 40, + } + } +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct NetworkState { + pub blockhash: solana_sdk::hash::Hash, + pub current_slot: u64, +} + +/// NetworkStateQuerier periodically queries the current state of the network, +/// fetching the blockhash and slot number. +struct NetworkStateQuerier { + /// The RPC client + rpc_client: RpcClient, + + /// The interval with which to query the network state + query_interval: Interval, + + /// Channel the current network state is sent on + network_state_tx: watch::Sender, +} + +impl NetworkStateQuerier { + pub fn new( + rpc_endpoint: &str, + rpc_timeout: Duration, + query_interval: Interval, + network_state_tx: watch::Sender, + ) -> Self { + NetworkStateQuerier { + rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout), + query_interval, + network_state_tx, + } + } + + pub async fn run(&mut self) { + loop { + self.query_interval.tick().await; + if let Err(err) = self.query_network_state().await { + tracing::error!(err = ?err, "Network state query failed"); + } + } + } + + async fn query_network_state(&mut self) -> Result<()> { + // Fetch the blockhash and current slot in parallel + let current_slot_future = self + .rpc_client + .get_slot_with_commitment(CommitmentConfig::confirmed()); + let latest_blockhash_future = self.rpc_client.get_latest_blockhash(); + + let (current_slot_result, latest_blockhash_result) = + future::join(current_slot_future, latest_blockhash_future).await; + + // Send the result on the channel + self.network_state_tx.send(NetworkState { + blockhash: latest_blockhash_result?, + current_slot: current_slot_result?, + })?; + + Ok(()) + } +} + +pub async fn exporter(config: network::Config, network: Network, state: Arc) +where + S: Exporter, + S: Transactions, + S: Send + Sync + 'static, +{ + // Create and spawn the network state querier + let (network_state_tx, network_state_rx) = watch::channel(Default::default()); + let mut network_state_querier = NetworkStateQuerier::new( + &config.rpc_url, + config.rpc_timeout, + tokio::time::interval(config.exporter.refresh_network_state_interval_duration), + network_state_tx, + ); + + tokio::spawn(transaction_monitor::transaction_monitor( + config.clone(), + state.clone(), + )); + tokio::spawn(exporter::exporter(config, network, state, network_state_rx)); + tokio::spawn(async move { network_state_querier.run().await }); +} + +mod exporter { + use { + super::NetworkState, + crate::agent::{ + solana::{ + key_store::KeyStore, + network::{ + Config, + Network, + }, + }, + state::exporter::{ + get_publish_keypair, + publish_batches, + Exporter, + }, + }, + solana_client::nonblocking::rpc_client::RpcClient, + std::sync::Arc, + tokio::sync::watch, + }; + + pub async fn exporter( + config: Config, + network: Network, + state: Arc, + network_state_rx: watch::Receiver, + ) where + S: Exporter, + S: Send + Sync + 'static, + { + let mut publish_interval = tokio::time::interval(config.exporter.publish_interval_duration); + let mut dynamic_compute_unit_price_update_interval = + tokio::time::interval(config.exporter.publish_interval_duration); + + let client = Arc::new(RpcClient::new_with_timeout( + config.rpc_url.to_string(), + config.rpc_timeout, + )); + let Ok(key_store) = KeyStore::new(config.key_store.clone()) else { + tracing::warn!("Key store not available, Exporter won't start."); + return; + }; + + loop { + tokio::select! { + _ = publish_interval.tick() => { + if let Ok(publish_keypair) = get_publish_keypair(&*state, network, key_store.publish_keypair.as_ref()).await { + if let Ok(permissioned_updates) = Exporter::get_permissioned_updates( + &*state, + network, + &publish_keypair, + config.exporter.staleness_threshold, + config.exporter.unchanged_publish_threshold, + ).await { + if let Err(err) = publish_batches( + &*state, + client.clone(), + network, + &network_state_rx, + key_store.accumulator_key, + &publish_keypair, + key_store.program_key, + config.exporter.max_batch_size, + config.exporter.staleness_threshold, + config.exporter.compute_unit_limit, + config.exporter.compute_unit_price_micro_lamports, + config.exporter.maximum_compute_unit_price_micro_lamports, + config.exporter.maximum_slot_gap_for_dynamic_compute_unit_price, + config.exporter.dynamic_compute_unit_pricing_enabled, + permissioned_updates, + ).await { + tracing::error!(err = ?err, "Exporter failed to publish."); + } + } + } + } + _ = dynamic_compute_unit_price_update_interval.tick() => { + if config.exporter.dynamic_compute_unit_pricing_enabled { + if let Ok(publish_keypair) = get_publish_keypair(&*state, network, key_store.publish_keypair.as_ref()).await { + if let Err(err) = Exporter::update_recent_compute_unit_price( + &*state, + network, + &publish_keypair, + &client, + config.exporter.staleness_threshold, + config.exporter.unchanged_publish_threshold, + ).await { + tracing::error!(err = ?err, "Exporter failed to compute unit price."); + } + } + } + } + } + } + } +} + +mod transaction_monitor { + use { + crate::agent::{ + solana::network, + state::transactions::Transactions, + }, + serde::{ + Deserialize, + Serialize, + }, + solana_client::nonblocking::rpc_client::RpcClient, + std::{ + sync::Arc, + time::Duration, + }, + }; + + #[derive(Clone, Serialize, Deserialize, Debug)] + #[serde(default)] + pub struct Config { + /// Duration of the interval with which to poll the status of transactions. + /// It is recommended to set this to a value close to the Exporter's publish_interval. + #[serde(with = "humantime_serde")] + pub poll_interval_duration: Duration, + /// Maximum number of recent transactions to monitor. When this number is exceeded, + /// the oldest transactions are no longer monitored. It is recommended to set this to + /// a value at least as large as (number of products published / number of products in a batch). + pub max_transactions: usize, + } + + impl Default for Config { + fn default() -> Self { + Self { + poll_interval_duration: Duration::from_secs(4), + max_transactions: 100, + } + } + } + + pub async fn transaction_monitor(config: network::Config, state: Arc) + where + S: Transactions, + { + let client = RpcClient::new_with_timeout(config.rpc_url.to_string(), config.rpc_timeout); + let mut poll_interval = + tokio::time::interval(config.exporter.transaction_monitor.poll_interval_duration); + + loop { + poll_interval.tick().await; + if let Err(err) = Transactions::poll_transactions_status(&*state, &client).await { + tracing::error!(err = ?err, "Transaction monitor failed."); + } + } + } +} diff --git a/src/agent/services/keypairs.rs b/src/agent/services/keypairs.rs new file mode 100644 index 0000000..0d2863d --- /dev/null +++ b/src/agent/services/keypairs.rs @@ -0,0 +1,229 @@ +//! Keypairs +//! +//! The Keypairs Service allows hotloading keys for the running agent. + +use { + crate::agent::{ + solana::network::Network, + state::keypairs::Keypairs, + }, + anyhow::{ + Context, + Result, + }, + serde::Deserialize, + solana_client::nonblocking::rpc_client::RpcClient, + solana_sdk::{ + commitment_config::CommitmentConfig, + signature::Keypair, + signer::Signer, + }, + std::{ + net::SocketAddr, + sync::Arc, + }, + tokio::task::JoinHandle, + warp::{ + hyper::StatusCode, + reject::Rejection, + reply::{ + self, + WithStatus, + }, + Filter, + }, +}; + +const DEFAULT_MIN_KEYPAIR_BALANCE_SOL: u64 = 1; + +pub fn default_bind_address() -> SocketAddr { + "127.0.0.1:9001" + .parse() + .expect("INTERNAL: Could not build default remote keypair loader bind address") +} + +#[derive(Clone, Debug, Deserialize)] +#[serde(default)] +pub struct Config { + primary_min_keypair_balance_sol: u64, + secondary_min_keypair_balance_sol: u64, + bind_address: SocketAddr, +} + +impl Default for Config { + fn default() -> Self { + Self { + primary_min_keypair_balance_sol: DEFAULT_MIN_KEYPAIR_BALANCE_SOL, + secondary_min_keypair_balance_sol: DEFAULT_MIN_KEYPAIR_BALANCE_SOL, + bind_address: default_bind_address(), + } + } +} + +pub async fn keypairs( + primary_rpc_url: String, + secondary_rpc_url: Option, + config: Config, + state: Arc, +) -> Vec> +where + S: Keypairs, + S: Send + Sync + 'static, +{ + let ip = config.bind_address.ip(); + + if !ip.is_loopback() { + tracing::warn!( + bind_address = ?config.bind_address, + "Remote key loader: bind address is not localhost. Make sure the access on the selected address is secure.", + ); + } + + let primary_upload_route = { + let state = state.clone(); + let rpc_url = primary_rpc_url.clone(); + let min_balance = config.primary_min_keypair_balance_sol; + warp::path!("primary" / "load_keypair") + .and(warp::post()) + .and(warp::body::content_length_limit(1024)) + .and(warp::body::json()) + .and(warp::path::end()) + .and_then(move |kp: Vec| { + let state = state.clone(); + let rpc_url = rpc_url.clone(); + async move { + let response = handle_new_keypair( + state, + Network::Primary, + kp, + min_balance, + rpc_url, + "primary", + ) + .await; + Result::, Rejection>::Ok(response) + } + }) + }; + + let secondary_upload_route = warp::path!("secondary" / "load_keypair") + .and(warp::post()) + .and(warp::body::content_length_limit(1024)) + .and(warp::body::json()) + .and(warp::path::end()) + .and_then(move |kp: Vec| { + let state = state.clone(); + let rpc_url = secondary_rpc_url.clone(); + async move { + if let Some(rpc_url) = rpc_url { + let min_balance = config.secondary_min_keypair_balance_sol; + let response = handle_new_keypair( + state, + Network::Secondary, + kp, + min_balance, + rpc_url, + "secondary", + ) + .await; + Result::, Rejection>::Ok(response) + } else { + Result::, Rejection>::Ok(reply::with_status( + "Secondary network is not active", + StatusCode::SERVICE_UNAVAILABLE, + )) + } + } + }); + + let http_api_jh = { + let (_, serve) = warp::serve(primary_upload_route.or(secondary_upload_route)) + .bind_with_graceful_shutdown(config.bind_address, async { + let _ = crate::agent::EXIT.subscribe().changed().await; + }); + tokio::spawn(serve) + }; + + // WARNING: All jobs spawned here must report their join handles in this vec + vec![http_api_jh] +} + +/// Validate and apply a keypair to the specified mut reference, +/// hiding errors in logs. +/// +/// Returns the appropriate HTTP response depending on checks success. +/// +/// NOTE(2023-03-22): Lifetime bounds are currently necessary +/// because of https://github.com/rust-lang/rust/issues/63033 +async fn handle_new_keypair<'a, 'b: 'a, S>( + state: Arc, + network: Network, + new_keypair_bytes: Vec, + min_keypair_balance_sol: u64, + rpc_url: String, + network_name: &'b str, +) -> WithStatus<&'static str> +where + S: Keypairs, +{ + let mut upload_ok = true; + match Keypair::from_bytes(&new_keypair_bytes) { + Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_url.clone()).await { + Ok(()) => { + Keypairs::update_keypair(&*state, network, kp).await; + } + Err(e) => { + tracing::warn!( + network = network_name, + error = e.to_string(), + "Remote keypair loader: Keypair failed validation", + ); + upload_ok = false; + } + }, + Err(e) => { + tracing::warn!( + network = network_name, + error = e.to_string(), + "Remote keypair loader: Keypair failed validation", + ); + upload_ok = false; + } + } + + if upload_ok { + reply::with_status("keypair upload OK", StatusCode::OK) + } else { + reply::with_status( + "Could not upload keypair. See logs for details.", + StatusCode::BAD_REQUEST, + ) + } +} + +/// Validate keypair balance before using it in transactions. +pub async fn validate_keypair( + kp: &Keypair, + min_keypair_balance_sol: u64, + rpc_url: String, +) -> Result<()> { + let c = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); + + let balance_lamports = c + .get_balance(&kp.pubkey()) + .await + .context("Could not check keypair's balance")?; + + let lamports_in_sol = 1_000_000_000; + + if balance_lamports > min_keypair_balance_sol * lamports_in_sol { + Ok(()) + } else { + Err(anyhow::anyhow!(format!( + "Keypair {} balance of {} SOL below threshold of {} SOL", + kp.pubkey(), + balance_lamports as f64 / lamports_in_sol as f64, + min_keypair_balance_sol + ))) + } +} diff --git a/src/agent/services/notifier.rs b/src/agent/services/notifier.rs new file mode 100644 index 0000000..aecc682 --- /dev/null +++ b/src/agent/services/notifier.rs @@ -0,0 +1,31 @@ +//! Notifier +//! +//! The notifier is responsible for notifying subscribers who have registered +//! for price sched updates. + +use { + crate::agent::state::Prices, + std::sync::Arc, +}; + +pub async fn notifier(state: Arc) +where + S: Prices, +{ + let mut interval = tokio::time::interval(state.notify_interval_duration()); + let mut exit = crate::agent::EXIT.subscribe(); + loop { + Prices::drop_closed_subscriptions(&*state).await; + tokio::select! { + _ = exit.changed() => { + tracing::info!("Shutdown signal received."); + return; + } + _ = interval.tick() => { + if let Err(err) = state.send_notify_price_sched().await { + tracing::error!(err = ?err, "Notifier: failed to send notify price sched."); + } + } + } + } +} diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs new file mode 100644 index 0000000..dd0e87c --- /dev/null +++ b/src/agent/services/oracle.rs @@ -0,0 +1,189 @@ +//! Oracle +//! +//! The Oracle service is respoinsible for reacting to all remote/on-chain events. + +use { + crate::agent::{ + solana::{ + key_store::KeyStore, + network::{ + Config, + Network, + }, + }, + state::oracle::Oracle, + }, + anyhow::Result, + solana_account_decoder::UiAccountEncoding, + solana_client::{ + nonblocking::{ + pubsub_client::PubsubClient, + rpc_client::RpcClient, + }, + rpc_config::{ + RpcAccountInfoConfig, + RpcProgramAccountsConfig, + }, + }, + solana_sdk::{ + account::Account, + commitment_config::CommitmentConfig, + pubkey::Pubkey, + signature::Keypair, + }, + std::{ + sync::Arc, + time::{ + Duration, + Instant, + }, + }, + tokio_stream::StreamExt, +}; + +pub async fn oracle(config: Config, network: Network, state: Arc) +where + S: Oracle, + S: Send + Sync + 'static, +{ + let Ok(key_store) = KeyStore::new(config.key_store.clone()) else { + tracing::warn!("Key store not available, Oracle won't start."); + return; + }; + + tokio::spawn(poller( + config.clone(), + network, + state.clone(), + key_store.mapping_key, + key_store.publish_keypair, + config.oracle.max_lookup_batch_size, + )); + + if config.oracle.subscriber_enabled { + tokio::spawn(async move { + loop { + let current_time = Instant::now(); + if let Err(ref err) = subscriber( + config.clone(), + network, + state.clone(), + key_store.program_key, + ) + .await + { + tracing::error!(err = ?err, "Subscriber exited unexpectedly."); + if current_time.elapsed() < Duration::from_secs(30) { + tracing::warn!("Subscriber restarting too quickly. Sleeping for 1 second."); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + } +} + +/// When an account RPC Subscription update is receiveed. +/// +/// We check if the account is one we're aware of and tracking, and if so, spawn +/// a small background task that handles that update. We only do this for price +/// accounts, all other accounts are handled below in the poller. +async fn subscriber( + config: Config, + network: Network, + state: Arc, + program_key: Pubkey, +) -> Result<()> +where + S: Oracle, + S: Send + Sync + 'static, +{ + // Setup PubsubClient to listen for account changes on the Oracle program. + let client = PubsubClient::new(config.wss_url.as_str()).await?; + + let (mut notifier, _unsub) = { + let program_key = program_key; + let commitment = config.oracle.commitment; + let config = RpcProgramAccountsConfig { + account_config: RpcAccountInfoConfig { + commitment: Some(CommitmentConfig { commitment }), + encoding: Some(UiAccountEncoding::Base64Zstd), + ..Default::default() + }, + filters: None, + with_context: Some(true), + }; + client.program_subscribe(&program_key, Some(config)).await + }?; + + while let Some(update) = notifier.next().await { + match update.value.account.decode::() { + Some(account) => { + let pubkey: Pubkey = update.value.pubkey.as_str().try_into()?; + let state = state.clone(); + tokio::spawn(async move { + if let Err(err) = + Oracle::handle_price_account_update(&*state, network, &pubkey, &account) + .await + { + tracing::error!(err = ?err, "Failed to handle account update."); + } + }); + } + + None => { + tracing::error!( + update = ?update, + "Failed to decode account from update.", + ); + } + } + } + + tracing::debug!("Subscriber closed connection."); + return Ok(()); +} + +/// On poll lookup all Pyth Mapping/Product/Price accounts and sync. +async fn poller( + config: Config, + network: Network, + state: Arc, + mapping_key: Pubkey, + publish_keypair: Option, + max_lookup_batch_size: usize, +) where + S: Oracle, + S: Send + Sync + 'static, +{ + // Setup an RpcClient for manual polling. + let mut tick = tokio::time::interval(config.oracle.poll_interval_duration); + let client = Arc::new(RpcClient::new_with_timeout_and_commitment( + config.rpc_url, + config.rpc_timeout, + CommitmentConfig { + commitment: config.oracle.commitment, + }, + )); + + loop { + if let Err(err) = async { + tick.tick().await; + tracing::debug!("Polling for updates."); + Oracle::poll_updates( + &*state, + network, + mapping_key, + publish_keypair.as_ref(), + &client, + max_lookup_batch_size, + ) + .await?; + Oracle::sync_global_store(&*state, network).await + } + .await + { + tracing::error!(err = ?err, "Failed to handle poll updates."); + } + } +} diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 5a32ff4..80603a7 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -1,33 +1,24 @@ pub mod exporter; -pub mod oracle; /// This module encapsulates all the interaction with a single Solana network: /// - The Oracle, which reads data from the network /// - The Exporter, which publishes data to the network pub mod network { use { - super::{ - exporter, - key_store::{ + super::key_store::{ + self, + }, + crate::agent::{ + services::exporter, + state::oracle::{ self, - KeyStore, }, - oracle, }, - crate::agent::state::State, - anyhow::Result, serde::{ Deserialize, Serialize, }, - std::{ - sync::Arc, - time::Duration, - }, - tokio::{ - sync::watch, - task::JoinHandle, - }, + std::time::Duration, }; #[derive(Clone, Copy, Serialize, Deserialize, Debug)] @@ -69,46 +60,10 @@ pub mod network { #[serde(default)] pub exporter: exporter::Config, } - - pub fn spawn_network( - config: Config, - network: Network, - state: Arc, - ) -> Result>> { - // Publisher permissions updates between oracle and exporter - let (publisher_permissions_tx, publisher_permissions_rx) = watch::channel(<_>::default()); - - // Spawn the Oracle - let mut jhs = oracle::spawn_oracle( - config.oracle.clone(), - network, - &config.rpc_url, - &config.wss_url, - config.rpc_timeout, - publisher_permissions_tx, - KeyStore::new(config.key_store.clone())?, - state.clone(), - ); - - // Spawn the Exporter - let exporter_jhs = exporter::spawn_exporter( - config.exporter, - network, - &config.rpc_url, - config.rpc_timeout, - publisher_permissions_rx, - KeyStore::new(config.key_store.clone())?, - state, - )?; - - jhs.extend(exporter_jhs); - - Ok(jhs) - } } /// The key_store module is responsible for parsing the pythd key store. -mod key_store { +pub mod key_store { use { anyhow::Result, serde::{ diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index 870fa87..036c0ad 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -3,7 +3,6 @@ use { super::{ key_store, network::Network, - oracle::PricePublishingMetadata, }, crate::agent::state::{ global::GlobalStore, @@ -12,6 +11,7 @@ use { LocalStore, PriceInfo, }, + oracle::PricePublishingMetadata, State, }, anyhow::{ diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs deleted file mode 100644 index 148f7d6..0000000 --- a/src/agent/solana/oracle.rs +++ /dev/null @@ -1,914 +0,0 @@ -// This module is responsible for loading the current state of the -// on-chain Oracle program accounts from Solana. -#[allow(deprecated)] -use { - self::subscriber::Subscriber, - super::{ - key_store::KeyStore, - network::Network, - }, - crate::agent::{ - legacy_schedule::LegacySchedule, - market_schedule::MarketSchedule, - state::{ - global::Update, - Prices, - State, - }, - }, - anyhow::{ - anyhow, - Context, - Result, - }, - pyth_sdk_solana::state::{ - load_mapping_account, - load_product_account, - GenericPriceAccount, - PriceComp, - PythnetPriceAccount, - SolanaPriceAccount, - }, - serde::{ - Deserialize, - Serialize, - }, - solana_client::nonblocking::rpc_client::RpcClient, - solana_sdk::{ - account::Account, - commitment_config::{ - CommitmentConfig, - CommitmentLevel, - }, - pubkey::Pubkey, - }, - std::{ - collections::{ - HashMap, - HashSet, - }, - sync::Arc, - time::Duration, - }, - tokio::{ - sync::{ - mpsc, - watch, - }, - task::JoinHandle, - time::Interval, - }, -}; - -/// This shim is used to abstract over SolanaPriceAccount and PythnetPriceAccount so we -/// can iterate over either of these. The API is intended to force users to be aware of -/// the account type they have, and so doesn't provide this abstraction (a good thing) -/// and the agent should implement this in a better way. -/// -/// For now, to implement the abstraction in the smallest way possible we use a shim -/// type that uses the size of the accounts to determine the underlying representation -/// and construct the right one regardless of which network we read. This will only work -/// as long as we don't care about any extended fields. -/// -/// TODO: Refactor the agent's network handling code. -#[derive(Copy, Clone, Debug)] -pub struct PriceEntry { - // We intentionally act as if we have a truncated account where the underlying memory is unavailable. - account: GenericPriceAccount<0, ()>, - pub comp: [PriceComp; 64], -} - -impl From for PriceEntry { - fn from(other: SolanaPriceAccount) -> PriceEntry { - unsafe { - // NOTE: We know the size is 32 because It's a Solana account. This is for tests only. - let comp_mem = std::slice::from_raw_parts(other.comp.as_ptr(), 32); - let account = - *(&other as *const SolanaPriceAccount as *const GenericPriceAccount<0, ()>); - let mut comp = [PriceComp::default(); 64]; - comp[0..32].copy_from_slice(comp_mem); - PriceEntry { account, comp } - } - } -} - -impl PriceEntry { - /// Construct the right underlying GenericPriceAccount based on the account size. - fn load_from_account(acc: &[u8]) -> Option { - unsafe { - let size = match acc.len() { - n if n == std::mem::size_of::() => 32, - n if n == std::mem::size_of::() => 64, - _ => return None, - }; - - // Getting a pointer to avoid copying the account - let account_ptr = &*(acc.as_ptr() as *const GenericPriceAccount<0, ()>); - let comp_mem = std::slice::from_raw_parts(account_ptr.comp.as_ptr(), size); - let mut comp = [PriceComp::default(); 64]; - comp[0..size].copy_from_slice(comp_mem); - Some(Self { - account: *account_ptr, - comp, - }) - } - } -} - -/// Implement `Deref` so we can access the underlying account fields. -impl std::ops::Deref for PriceEntry { - type Target = GenericPriceAccount<0, ()>; - fn deref(&self) -> &Self::Target { - &self.account - } -} - -#[derive(Default, Debug, Clone)] -pub struct PricePublishingMetadata { - pub schedule: MarketSchedule, - pub publish_interval: Option, -} - -#[derive(Default, Debug, Clone)] -pub struct Data { - pub mapping_accounts: HashMap, - pub product_accounts: HashMap, - pub price_accounts: HashMap, - /// publisher => {their permissioned price accounts => price publishing metadata} - pub publisher_permissions: HashMap>, -} - -impl Data { - fn new( - mapping_accounts: HashMap, - product_accounts: HashMap, - price_accounts: HashMap, - publisher_permissions: HashMap>, - ) -> Self { - Data { - mapping_accounts, - product_accounts, - price_accounts, - publisher_permissions, - } - } -} - -pub type MappingAccount = pyth_sdk_solana::state::MappingAccount; -#[derive(Debug, Clone)] -pub struct ProductEntry { - pub account_data: pyth_sdk_solana::state::ProductAccount, - pub schedule: MarketSchedule, - pub price_accounts: Vec, - pub publish_interval: Option, -} - -// Oracle is responsible for fetching Solana account data stored in the Pyth on-chain Oracle. -pub struct Oracle { - /// The Solana account data - data: Data, - - /// Channel on which polled data are received from the Poller - data_rx: mpsc::Receiver, - - /// Channel on which account updates are received from the Subscriber - updates_rx: mpsc::Receiver<(Pubkey, solana_sdk::account::Account)>, - - network: Network, - - state: Arc, -} - -#[derive(Clone, Serialize, Deserialize, Debug)] -#[serde(default)] -pub struct Config { - /// The commitment level to use when reading data from the RPC node. - pub commitment: CommitmentLevel, - /// The interval with which to poll account information. - #[serde(with = "humantime_serde")] - pub poll_interval_duration: Duration, - /// Whether subscribing to account updates over websocket is enabled - pub subscriber_enabled: bool, - /// Capacity of the channel over which the Subscriber sends updates to the Oracle - pub updates_channel_capacity: usize, - /// Capacity of the channel over which the Poller sends data to the Oracle - pub data_channel_capacity: usize, - - /// Ask the RPC for up to this many product/price accounts in a - /// single request. Tune this setting if you're experiencing - /// timeouts on data fetching. In order to keep concurrent open - /// socket count at bay, the batches are looked up sequentially, - /// trading off overall time it takes to fetch all symbols. - pub max_lookup_batch_size: usize, -} - -impl Default for Config { - fn default() -> Self { - Self { - commitment: CommitmentLevel::Confirmed, - poll_interval_duration: Duration::from_secs(5), - subscriber_enabled: true, - updates_channel_capacity: 10000, - data_channel_capacity: 10000, - max_lookup_batch_size: 100, - } - } -} - -pub fn spawn_oracle( - config: Config, - network: Network, - rpc_url: &str, - wss_url: &str, - rpc_timeout: Duration, - publisher_permissions_tx: watch::Sender< - HashMap>, - >, - key_store: KeyStore, - state: Arc, -) -> Vec> { - let mut jhs = vec![]; - - // Create and spawn the account subscriber - let (updates_tx, updates_rx) = mpsc::channel(config.updates_channel_capacity); - if config.subscriber_enabled { - let subscriber = Subscriber::new( - wss_url.to_string(), - config.commitment, - key_store.program_key, - updates_tx, - ); - jhs.push(tokio::spawn(async move { subscriber.run().await })); - } - - // Create and spawn the Poller - let (data_tx, data_rx) = mpsc::channel(config.data_channel_capacity); - let mut poller = Poller::new( - data_tx, - publisher_permissions_tx, - rpc_url, - rpc_timeout, - config.commitment, - config.poll_interval_duration, - config.max_lookup_batch_size, - key_store.mapping_key, - ); - jhs.push(tokio::spawn(async move { poller.run().await })); - - // Create and spawn the Oracle - let mut oracle = Oracle::new(data_rx, updates_rx, network, state); - jhs.push(tokio::spawn(async move { oracle.run().await })); - - jhs -} - -impl Oracle { - pub fn new( - data_rx: mpsc::Receiver, - updates_rx: mpsc::Receiver<(Pubkey, solana_sdk::account::Account)>, - network: Network, - state: Arc, - ) -> Self { - Oracle { - data: Default::default(), - data_rx, - updates_rx, - network, - state, - } - } - - pub async fn run(&mut self) { - loop { - if let Err(err) = self.handle_next().await { - tracing::error!(err = ?err, "Oracle failed to handle next update."); - } - } - } - - async fn handle_next(&mut self) -> Result<()> { - tokio::select! { - Some((account_key, account)) = self.updates_rx.recv() => { - self.handle_account_update(&account_key, &account).await - } - Some(data) = self.data_rx.recv() => { - self.handle_data_update(data); - self.send_all_data_to_global_store().await - } - } - } - - fn handle_data_update(&mut self, data: Data) { - // Log new accounts which have been found - let previous_mapping_accounts = self - .data - .mapping_accounts - .keys() - .cloned() - .collect::>(); - tracing::info!( - new = ?data - .mapping_accounts - .keys() - .cloned() - .collect::>().difference(&previous_mapping_accounts), - total = data.mapping_accounts.len(), - "Fetched mapping accounts." - ); - let previous_product_accounts = self - .data - .product_accounts - .keys() - .cloned() - .collect::>(); - tracing::info!( - new = ?data - .product_accounts - .keys() - .cloned() - .collect::>().difference(&previous_product_accounts), - total = data.product_accounts.len(), - "Fetched product accounts.", - ); - let previous_price_accounts = self - .data - .price_accounts - .keys() - .cloned() - .collect::>(); - tracing::info!( - new = ?data - .price_accounts - .keys() - .cloned() - .collect::>().difference(&previous_price_accounts), - total = data.price_accounts.len(), - "Fetched price accounts.", - ); - - let previous_publishers = self - .data - .publisher_permissions - .keys() - .collect::>(); - let new_publishers = data.publisher_permissions.keys().collect::>(); - tracing::info!( - new_publishers = ?new_publishers.difference(&previous_publishers).collect::>(), - total_publishers = new_publishers.len(), - "Updated publisher permissions.", - ); - - // Update the data with the new data structs - self.data = data; - } - - async fn handle_account_update( - &mut self, - account_key: &Pubkey, - account: &Account, - ) -> Result<()> { - tracing::debug!("Handling account update."); - - // We are only interested in price account updates, all other types of updates - // will be fetched using polling. - if !self.data.price_accounts.contains_key(account_key) { - return Ok(()); - } - - self.handle_price_account_update(account_key, account).await - } - - async fn handle_price_account_update( - &mut self, - account_key: &Pubkey, - account: &Account, - ) -> Result<()> { - let price_entry = PriceEntry::load_from_account(&account.data) - .with_context(|| format!("load price account {}", account_key))?; - - tracing::debug!( - pubkey = account_key.to_string(), - price = price_entry.agg.price, - conf = price_entry.agg.conf, - status = ?price_entry.agg.status, - "Observed on-chain price account update.", - ); - - self.data - .price_accounts - .insert(*account_key, price_entry.clone()); - - self.notify_price_account_update(account_key, &price_entry) - .await?; - - Ok(()) - } - - async fn send_all_data_to_global_store(&self) -> Result<()> { - for (product_account_key, product_account) in &self.data.product_accounts { - self.notify_product_account_update(product_account_key, product_account) - .await?; - } - - for (price_account_key, price_account) in &self.data.price_accounts { - self.notify_price_account_update(price_account_key, price_account) - .await?; - } - - Ok(()) - } - - async fn notify_product_account_update( - &self, - account_key: &Pubkey, - account: &ProductEntry, - ) -> Result<()> { - Prices::update_global_price( - &*self.state, - self.network, - &Update::ProductAccountUpdate { - account_key: *account_key, - account: account.clone(), - }, - ) - .await - .map_err(|_| anyhow!("failed to notify product account update")) - } - - async fn notify_price_account_update( - &self, - account_key: &Pubkey, - account: &PriceEntry, - ) -> Result<()> { - Prices::update_global_price( - &*self.state, - self.network, - &Update::PriceAccountUpdate { - account_key: *account_key, - account: account.clone(), - }, - ) - .await - .map_err(|_| anyhow!("failed to notify price account update")) - } -} - -struct Poller { - /// The channel on which to send polled update data - data_tx: mpsc::Sender, - - /// Updates about permissioned price accounts from oracle to exporter - publisher_permissions_tx: - watch::Sender>>, - - /// The RPC client to use to poll data from the RPC node - rpc_client: RpcClient, - - /// The interval with which to poll for data - poll_interval: Interval, - - /// Passed from Oracle config - max_lookup_batch_size: usize, - - mapping_key: Pubkey, -} - -impl Poller { - pub fn new( - data_tx: mpsc::Sender, - publisher_permissions_tx: watch::Sender< - HashMap>, - >, - rpc_url: &str, - rpc_timeout: Duration, - commitment: CommitmentLevel, - poll_interval_duration: Duration, - max_lookup_batch_size: usize, - mapping_key: Pubkey, - ) -> Self { - let rpc_client = RpcClient::new_with_timeout_and_commitment( - rpc_url.to_string(), - rpc_timeout, - CommitmentConfig { commitment }, - ); - let poll_interval = tokio::time::interval(poll_interval_duration); - - Poller { - data_tx, - publisher_permissions_tx, - rpc_client, - poll_interval, - max_lookup_batch_size, - mapping_key, - } - } - - pub async fn run(&mut self) { - loop { - self.poll_interval.tick().await; - tracing::info!("Fetching all pyth account data."); - if let Err(err) = self.poll_and_send().await { - tracing::error!(err = ?err, "Oracle Poll/Send Failed."); - } - } - } - - async fn poll_and_send(&mut self) -> Result<()> { - let fresh_data = self.poll().await?; - - self.publisher_permissions_tx - .send_replace(fresh_data.publisher_permissions.clone()); - - self.data_tx - .send(fresh_data) - .await - .context("failed to send data to oracle")?; - - Ok(()) - } - - async fn poll(&self) -> Result { - let mapping_accounts = self.fetch_mapping_accounts(self.mapping_key).await?; - let (product_accounts, price_accounts) = self - .fetch_product_and_price_accounts(mapping_accounts.values()) - .await?; - - let mut publisher_permissions = HashMap::new(); - - for (price_key, price_entry) in price_accounts.iter() { - for component in price_entry.comp { - if component.publisher == Pubkey::default() { - continue; - } - - let component_pub_entry = publisher_permissions - .entry(component.publisher) - .or_insert(HashMap::new()); - - let publisher_permission = if let Some(prod_entry) = - product_accounts.get(&price_entry.prod) - { - PricePublishingMetadata { - schedule: prod_entry.schedule.clone(), - publish_interval: prod_entry.publish_interval.clone(), - } - } else { - tracing::warn!( - price = price_key.to_string(), - missing_product = price_entry.prod.to_string(), - "Oracle: INTERNAL: could not find product from price `prod` field, market hours falling back to 24/7.", - ); - Default::default() - }; - - component_pub_entry.insert(*price_key, publisher_permission); - } - } - - Ok(Data::new( - mapping_accounts, - product_accounts, - price_accounts, - publisher_permissions, - )) - } - - async fn fetch_mapping_accounts( - &self, - mapping_account_key: Pubkey, - ) -> Result> { - let mut accounts = HashMap::new(); - - let mut account_key = mapping_account_key; - while account_key != Pubkey::default() { - let account = *load_mapping_account( - &self - .rpc_client - .get_account_data(&account_key) - .await - .with_context(|| format!("load mapping account {}", account_key))?, - )?; - accounts.insert(account_key, account); - - account_key = account.next; - } - - Ok(accounts) - } - - async fn fetch_product_and_price_accounts<'a, A>( - &self, - mapping_accounts: A, - ) -> Result<(HashMap, HashMap)> - where - A: IntoIterator, - { - let mut product_keys = vec![]; - - // Get all product keys - for mapping_account in mapping_accounts { - for account_key in mapping_account - .products - .iter() - .filter(|pubkey| **pubkey != Pubkey::default()) - { - product_keys.push(*account_key); - } - } - - let mut product_entries = HashMap::new(); - let mut price_entries = HashMap::new(); - - // Lookup products and their prices using the configured batch size - for product_key_batch in product_keys.as_slice().chunks(self.max_lookup_batch_size) { - let (mut batch_products, mut batch_prices) = self - .fetch_batch_of_product_and_price_accounts(product_key_batch) - .await?; - - product_entries.extend(batch_products.drain()); - price_entries.extend(batch_prices.drain()); - } - - Ok((product_entries, price_entries)) - } - - async fn fetch_batch_of_product_and_price_accounts( - &self, - product_key_batch: &[Pubkey], - ) -> Result<(HashMap, HashMap)> { - let mut product_entries = HashMap::new(); - - let product_keys = product_key_batch; - - // Look up the batch with a single request - let product_accounts = self.rpc_client.get_multiple_accounts(product_keys).await?; - - // Log missing products, fill the product entries with initial values - for (product_key, product_account) in product_keys.iter().zip(product_accounts) { - if let Some(prod_acc) = product_account { - let product = load_product_account(prod_acc.data.as_slice()) - .context(format!("Could not parse product account {}", product_key))?; - - #[allow(deprecated)] - let legacy_schedule: LegacySchedule = if let Some((_wsched_key, wsched_val)) = - product.iter().find(|(k, _v)| *k == "weekly_schedule") - { - wsched_val.parse().unwrap_or_else(|err| { - tracing::warn!( - product_key = product_key.to_string(), - weekly_schedule = wsched_val, - "Oracle: Product has weekly_schedule defined but it could not be parsed. Falling back to 24/7 publishing.", - ); - tracing::debug!(err = ?err, "Parsing error context."); - Default::default() - }) - } else { - Default::default() // No market hours specified, meaning 24/7 publishing - }; - - let market_schedule: Option = if let Some(( - _msched_key, - msched_val, - )) = - product.iter().find(|(k, _v)| *k == "schedule") - { - match msched_val.parse::() { - Ok(schedule) => Some(schedule), - Err(err) => { - tracing::warn!( - product_key = product_key.to_string(), - schedule = msched_val, - "Oracle: Product has schedule defined but it could not be parsed. Falling back to legacy schedule.", - ); - tracing::debug!(err = ?err, "Parsing error context."); - None - } - } - } else { - None - }; - - let publish_interval: Option = if let Some(( - _publish_interval_key, - publish_interval_val, - )) = - product.iter().find(|(k, _v)| *k == "publish_interval") - { - match publish_interval_val.parse::() { - Ok(interval) => Some(Duration::from_secs_f64(interval)), - Err(err) => { - tracing::warn!( - product_key = product_key.to_string(), - publish_interval = publish_interval_val, - "Oracle: Product has publish_interval defined but it could not be parsed. Falling back to None.", - ); - tracing::debug!(err = ?err, "parsing error context"); - None - } - } - } else { - None - }; - - product_entries.insert( - *product_key, - ProductEntry { - account_data: *product, - schedule: market_schedule.unwrap_or_else(|| legacy_schedule.into()), - price_accounts: vec![], - publish_interval, - }, - ); - } else { - tracing::warn!( - product_key = product_key.to_string(), - "Oracle: Could not find product on chain, skipping", - ); - } - } - - let mut price_entries = HashMap::new(); - - // Starting with top-level prices, look up price accounts in - // batches, filling price entries and adding found prices to - // the product entries - let mut todo = product_entries - .values() - .map(|p| p.account_data.px_acc) - .collect::>(); - - while !todo.is_empty() { - let price_accounts = self - .rpc_client - .get_multiple_accounts(todo.as_slice()) - .await?; - - // Any non-zero price.next pubkey will be gathered here and looked up on next iteration - let mut next_todo = vec![]; - - // Process the response of each lookup request. If there's - // a next price, it will be looked up on next iteration, - // as todo gets replaced with next_todo. - for (price_key, price_account) in todo.iter().zip(price_accounts) { - if let Some(price_acc) = price_account { - let price = PriceEntry::load_from_account(&price_acc.data) - .context(format!("Could not parse price account at {}", price_key))?; - - let next_price = price.next; - if let Some(prod) = product_entries.get_mut(&price.prod) { - prod.price_accounts.push(*price_key); - price_entries.insert(*price_key, price); - } else { - tracing::warn!( - missing_product = price.prod.to_string(), - price_key = price_key.to_string(), - "Could not find product entry for price, listed in its prod field, skipping", - ); - - continue; - } - - if next_price != Pubkey::default() { - next_todo.push(next_price); - } - } else { - tracing::warn!( - price_key = price_key.to_string(), - "Could not look up price account on chain, skipping", - ); - continue; - } - } - - todo = next_todo; - } - Ok((product_entries, price_entries)) - } -} - -mod subscriber { - use { - anyhow::{ - anyhow, - Result, - }, - solana_account_decoder::UiAccountEncoding, - solana_client::{ - nonblocking::pubsub_client::PubsubClient, - rpc_config::{ - RpcAccountInfoConfig, - RpcProgramAccountsConfig, - }, - }, - solana_sdk::{ - account::Account, - commitment_config::{ - CommitmentConfig, - CommitmentLevel, - }, - pubkey::Pubkey, - }, - std::time::Duration, - tokio::{ - sync::mpsc, - time::Instant, - }, - }; - - /// Subscriber subscribes to all changes on the given account, and sends those changes - /// on updates_tx. This is a convenience wrapper around the Blockchain Shadow crate. - pub struct Subscriber { - /// WSS RPC endpoint - wss_url: String, - - /// Commitment level used to read account data - commitment: CommitmentLevel, - - /// Public key of the root program account to monitor. Note that all - /// accounts owned by this account are also monitored. - program_key: Pubkey, - - /// Channel on which updates are sent - updates_tx: mpsc::Sender<(Pubkey, solana_sdk::account::Account)>, - } - - impl Subscriber { - pub fn new( - wss_url: String, - commitment: CommitmentLevel, - program_key: Pubkey, - updates_tx: mpsc::Sender<(Pubkey, solana_sdk::account::Account)>, - ) -> Self { - Subscriber { - wss_url, - commitment, - program_key, - updates_tx, - } - } - - pub async fn run(&self) { - loop { - let current_time = Instant::now(); - if let Err(ref err) = self.start().await { - tracing::error!(err = ?err, "Oracle exited unexpectedly."); - if current_time.elapsed() < Duration::from_secs(30) { - tracing::warn!("Subscriber restarting too quickly. Sleeping for 1 second."); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - } - } - - pub async fn start(&self) -> Result<()> { - let client = PubsubClient::new(self.wss_url.as_str()).await?; - - let config = RpcProgramAccountsConfig { - account_config: RpcAccountInfoConfig { - commitment: Some(CommitmentConfig { - commitment: self.commitment, - }), - encoding: Some(UiAccountEncoding::Base64Zstd), - ..Default::default() - }, - filters: None, - with_context: Some(true), - }; - - let (mut notif, _unsub) = client - .program_subscribe(&self.program_key, Some(config)) - .await?; - - tracing::debug!( - program_key = self.program_key.to_string(), - "subscribed to program account updates", - ); - - loop { - match tokio_stream::StreamExt::next(&mut notif).await { - Some(update) => { - let account: Account = match update.value.account.decode() { - Some(account) => account, - None => { - tracing::error!( - update = ?update, - "Failed to decode account from update.", - ); - continue; - } - }; - - self.updates_tx - .send((update.value.pubkey.as_str().try_into()?, account)) - .await - .map_err(|_| anyhow!("failed to send update to oracle"))?; - } - None => { - tracing::debug!("subscriber closed connection"); - return Ok(()); - } - } - } - } - } -} diff --git a/src/agent/state.rs b/src/agent/state.rs index 99f7b6c..1edcbd1 100644 --- a/src/agent/state.rs +++ b/src/agent/state.rs @@ -16,13 +16,13 @@ use { }; pub mod api; +pub mod exporter; pub mod global; pub mod keypairs; pub mod local; -pub use api::{ - notifier, - Prices, -}; +pub mod oracle; +pub mod transactions; +pub use api::Prices; #[derive(Clone, Serialize, Deserialize, Debug)] #[serde(default)] @@ -54,6 +54,15 @@ pub struct State { /// State for Price related functionality. prices: api::PricesState, + + /// State for the Solana-based Oracle functionality. + oracle: oracle::OracleState, + + /// State for the Solana-based Exporter functionality. + exporter: exporter::ExporterState, + + /// State for the Solana transaction monitor + transactions: transactions::TransactionsState, } /// Represents a single Notify Price Sched subscription @@ -73,13 +82,36 @@ struct NotifyPriceSubscription { } impl State { - pub async fn new(config: Config) -> Self { + pub async fn new(config: &crate::agent::config::Config) -> Self { + let registry = &mut *PROMETHEUS_REGISTRY.lock().await; + State { + global_store: global::Store::new(registry), + local_store: local::Store::new(registry), + keypairs: keypairs::KeypairState::default(), + prices: api::PricesState::new(config.state.clone()), + oracle: oracle::OracleState::new(), + exporter: exporter::ExporterState::new(), + transactions: transactions::TransactionsState::new( + config + .primary_network + .exporter + .transaction_monitor + .max_transactions, + ), + } + } + + #[cfg(test)] + pub async fn new_tests(config: Config) -> Self { let registry = &mut *PROMETHEUS_REGISTRY.lock().await; State { global_store: global::Store::new(registry), local_store: local::Store::new(registry), keypairs: keypairs::KeypairState::default(), prices: api::PricesState::new(config), + oracle: oracle::OracleState::new(), + exporter: exporter::ExporterState::new(), + transactions: transactions::TransactionsState::new(100), } } } @@ -92,7 +124,7 @@ mod tests { self, AllAccountsData, }, - notifier, + oracle::ProductEntry, Config, Prices, State, @@ -108,14 +140,12 @@ mod tests { ProductAccountMetadata, PublisherAccount, }, - solana::{ - self, - network::Network, - oracle::PriceEntry, - }, + services::notifier, + solana::network::Network, state::{ global::Update, local::LocalStore, + oracle::PriceEntry, }, }, pyth_sdk::Identifier, @@ -156,7 +186,7 @@ mod tests { let config = Config { notify_price_sched_interval_duration, }; - let state = Arc::new(State::new(config).await); + let state = Arc::new(State::new_tests(config).await); let (shutdown_tx, _) = broadcast::channel(1); // Spawn Price Notifier @@ -412,7 +442,7 @@ mod tests { "CkMrDWtmFJZcmAUC11qNaWymbXQKvnRx4cq1QudLav7t", ) .unwrap(), - solana::oracle::ProductEntry { + ProductEntry { account_data: pyth_sdk_solana::state::ProductAccount { magic: 0xa1b2c3d4, ver: 6, @@ -473,7 +503,7 @@ mod tests { "BjHoZWRxo9dgbR1NQhPyTiUs6xFiX6mGS4TMYvy3b2yc", ) .unwrap(), - solana::oracle::ProductEntry { + ProductEntry { account_data: pyth_sdk_solana::state::ProductAccount { magic: 0xa1b2c3d4, ver: 5, diff --git a/src/agent/state/api.rs b/src/agent/state/api.rs index dfa6e5a..3ea1308 100644 --- a/src/agent/state/api.rs +++ b/src/agent/state/api.rs @@ -14,11 +14,7 @@ use { PublisherAccount, SubscriptionID, }, - solana::{ - self, - network::Network, - oracle::PriceEntry, - }, + solana::network::Network, }, global::{ AllAccountsData, @@ -29,6 +25,10 @@ use { self, LocalStore, }, + oracle::{ + PriceEntry, + ProductEntry, + }, Config, NotifyPriceSchedSubscription, NotifyPriceSubscription, @@ -46,10 +46,7 @@ use { }, std::{ collections::HashMap, - sync::{ - atomic::AtomicI64, - Arc, - }, + sync::atomic::AtomicI64, time::Duration, }, tokio::sync::{ @@ -71,7 +68,7 @@ fn price_status_to_str(price_status: PriceStatus) -> String { } fn solana_product_account_to_pythd_api_product_account( - product_account: &solana::oracle::ProductEntry, + product_account: &ProductEntry, all_accounts_data: &AllAccountsData, product_account_key: &solana_sdk::pubkey::Pubkey, ) -> ProductAccount { @@ -171,7 +168,6 @@ pub trait Prices { account_pubkey: &solana_sdk::pubkey::Pubkey, notify_price_sched_tx: mpsc::Sender, ) -> SubscriptionID; - fn next_subscription_id(&self) -> SubscriptionID; async fn subscribe_price( &self, account: &solana_sdk::pubkey::Pubkey, @@ -187,7 +183,8 @@ pub trait Prices { status: String, ) -> Result<()>; async fn update_global_price(&self, network: Network, update: &Update) -> Result<()>; - // TODO: implement FromStr method on PriceStatus + fn notify_interval_duration(&self) -> Duration; + fn next_subscription_id(&self) -> SubscriptionID; fn map_status(status: &str) -> Result; } @@ -433,6 +430,10 @@ where } } + fn notify_interval_duration(&self) -> Duration { + self.into().notify_price_sched_interval_duration + } + // TODO: implement FromStr method on PriceStatus fn map_status(status: &str) -> Result { match status { @@ -445,27 +446,3 @@ where } } } - -pub async fn notifier(state: Arc) -where - for<'a> &'a S: Into<&'a PricesState>, - S: Prices, -{ - let prices: &PricesState = (&*state).into(); - let mut interval = tokio::time::interval(prices.notify_price_sched_interval_duration); - let mut exit = crate::agent::EXIT.subscribe(); - loop { - Prices::drop_closed_subscriptions(&*state).await; - tokio::select! { - _ = exit.changed() => { - tracing::info!("Shutdown signal received."); - return; - } - _ = interval.tick() => { - if let Err(err) = state.send_notify_price_sched().await { - tracing::error!(err = ?err, "Notifier: failed to send notify price sched."); - } - } - } - } -} diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs new file mode 100644 index 0000000..0462abb --- /dev/null +++ b/src/agent/state/exporter.rs @@ -0,0 +1,830 @@ +use { + super::{ + local::PriceInfo, + oracle::PricePublishingMetadata, + transactions::Transactions, + State, + }, + crate::agent::{ + services::exporter::NetworkState, + solana::network::Network, + state::{ + global::GlobalStore, + keypairs::Keypairs, + local::LocalStore, + }, + }, + anyhow::{ + anyhow, + Context, + Result, + }, + bincode::Options, + chrono::Utc, + futures_util::future::join_all, + pyth_sdk::Identifier, + pyth_sdk_solana::state::PriceStatus, + serde::Serialize, + solana_client::{ + nonblocking::rpc_client::RpcClient, + rpc_config::RpcSendTransactionConfig, + }, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + instruction::{ + AccountMeta, + Instruction, + }, + pubkey::Pubkey, + signature::Keypair, + signer::Signer, + sysvar::clock, + transaction::Transaction, + }, + std::{ + collections::{ + BTreeMap, + HashMap, + }, + sync::Arc, + time::Duration, + }, + tokio::sync::{ + watch, + RwLock, + }, +}; + +const PYTH_ORACLE_VERSION: u32 = 2; +const UPDATE_PRICE_NO_FAIL_ON_ERROR: i32 = 13; + +#[repr(C)] +#[derive(Serialize, PartialEq, Debug, Clone)] +struct UpdPriceCmd { + version: u32, + cmd: i32, + status: PriceStatus, + unused_: u32, + price: i64, + conf: u64, + pub_slot: u64, +} + +#[derive(Default)] +pub struct ExporterState { + /// The last state published for each price identifier. Used to + /// rule out stale data and prevent repetitive publishing of + /// unchanged prices. + last_published_state: RwLock>, + + /// Currently known permissioned prices of this publisher along with their market hours + our_prices: RwLock>, + + /// Recent compute unit price in micro lamports (set if dynamic compute unit pricing is enabled) + recent_compute_unit_price_micro_lamports: RwLock>, +} + +impl ExporterState { + pub fn new() -> Self { + Self::default() + } +} + +#[async_trait::async_trait] +pub trait Exporter +where + Self: Keypairs, + Self: LocalStore, + Self: GlobalStore, + Self: Transactions, +{ + async fn record_publish(&self, batch_state: HashMap); + async fn get_permissioned_updates( + &self, + network: Network, + publish_keypair: &Keypair, + staleness_threshold: Duration, + unchanged_publish_threshold: Duration, + ) -> Result>; + async fn get_recent_compute_unit_price_micro_lamports(&self) -> Option; + async fn update_recent_compute_unit_price( + &self, + network: Network, + publish_keypair: &Keypair, + rpc_client: &RpcClient, + staleness_threshold: Duration, + unchanged_publish_threshold: Duration, + ) -> Result<()>; + async fn update_permissions( + &self, + network: Network, + publish_keypair: Option<&Keypair>, + publisher_permissions: HashMap>, + ) -> Result<()>; +} + +/// Allow downcasting State into ExporterState for functions that depend on the `Exporter` service. +impl<'a> From<&'a State> for &'a ExporterState { + fn from(state: &'a State) -> &'a ExporterState { + &state.exporter + } +} + +#[async_trait::async_trait] +impl Exporter for T +where + for<'a> &'a T: Into<&'a ExporterState>, + T: Sync + Send + 'static, + T: Keypairs, + T: LocalStore, + T: GlobalStore, + T: Transactions, +{ + async fn record_publish(&self, batch_state: HashMap) { + self.into() + .last_published_state + .write() + .await + .extend(batch_state); + } + + async fn get_permissioned_updates( + &self, + network: Network, + publish_keypair: &Keypair, + staleness_threshold: Duration, + unchanged_publish_threshold: Duration, + ) -> Result> { + let local_store_contents = LocalStore::get_all_price_infos(self).await; + let now = Utc::now().naive_utc(); + + { + let keys = self.into().our_prices.read().await; + let keys = keys.keys(); + tracing::debug!( + our_prices = ?keys, + publish_pubkey = publish_keypair.pubkey().to_string(), + "Exporter: filtering prices permissioned to us", + ); + } + + let last_published_state = self.into().last_published_state.read().await; + let our_prices = self.into().our_prices.read().await; + + // Filter the contents to only include information we haven't already sent, + // and to ignore stale information. + Ok(local_store_contents + .into_iter() + .filter(|(_identifier, info)| { + // Filter out timestamps that are old + now < info.timestamp + staleness_threshold + }) + .filter(|(identifier, info)| { + // Filter out unchanged price data if the max delay wasn't reached + if let Some(last_info) = last_published_state.get(identifier) { + if info.timestamp > last_info.timestamp + unchanged_publish_threshold { + true // max delay since last published state reached, we publish anyway + } else { + !last_info.cmp_no_timestamp(info) // Filter out if data is unchanged + } + } else { + true // No prior data found, letting the price through + } + }) + .filter(|(id, _data)| { + let key_from_id = Pubkey::from((*id).clone().to_bytes()); + if let Some(publisher_permission) = our_prices.get(&key_from_id) { + let now_utc = Utc::now(); + let ret = publisher_permission.schedule.can_publish_at(&now_utc); + + if !ret { + tracing::debug!( + price_account = key_from_id.to_string(), + schedule = ?publisher_permission.schedule, + utc_time = now_utc.format("%c").to_string(), + "Exporter: Attempted to publish price outside market hours", + ); + } + + ret + } else { + // Note: This message is not an error. Some + // publishers have different permissions on + // primary/secondary networks + tracing::debug!( + unpermissioned_price_account = key_from_id.to_string(), + permissioned_accounts = ?self.into().our_prices, + "Exporter: Attempted to publish a price without permission, skipping", + ); + false + } + }) + .filter(|(id, info)| { + // Filtering out prices that are being updated too frequently according to publisher_permission.publish_interval + let last_info = match last_published_state.get(id) { + Some(last_info) => last_info, + None => { + // No prior data found, letting the price through + return true; + } + }; + + let key_from_id = Pubkey::from((*id).clone().to_bytes()); + let publisher_metadata = match our_prices.get(&key_from_id) { + Some(metadata) => metadata, + None => { + // Should never happen since we have filtered out the price above + return false; + } + }; + + if let Some(publish_interval) = publisher_metadata.publish_interval { + if info.timestamp < last_info.timestamp + publish_interval { + // Updating the price too soon after the last update, skipping + return false; + } + } + true + }) + .collect::>()) + } + + async fn get_recent_compute_unit_price_micro_lamports(&self) -> Option { + *self + .into() + .recent_compute_unit_price_micro_lamports + .read() + .await + } + + async fn update_recent_compute_unit_price( + &self, + network: Network, + publish_keypair: &Keypair, + rpc_client: &RpcClient, + staleness_threshold: Duration, + unchanged_publish_threshold: Duration, + ) -> Result<()> { + let permissioned_updates = self + .get_permissioned_updates( + network, + publish_keypair, + staleness_threshold, + unchanged_publish_threshold, + ) + .await?; + let price_accounts = permissioned_updates + .iter() + .map(|(identifier, _)| Pubkey::from(identifier.to_bytes())) + .collect::>(); + + *self + .into() + .recent_compute_unit_price_micro_lamports + .write() + .await = + estimate_compute_unit_price_micro_lamports(rpc_client, &price_accounts).await?; + + Ok(()) + } + + async fn update_permissions( + &self, + network: Network, + publish_keypair: Option<&Keypair>, + publisher_permissions: HashMap>, + ) -> Result<()> { + let publish_keypair = get_publish_keypair(self, network, publish_keypair).await?; + *self.into().our_prices.write().await = publisher_permissions + .get(&publish_keypair.pubkey()) + .cloned() + .unwrap_or_else(|| { + tracing::warn!( + publish_pubkey = &publish_keypair.pubkey().to_string(), + "Exporter: No permissioned prices were found for the publishing keypair on-chain. This is expected only on startup.", + ); + HashMap::new() + }); + + Ok(()) + } +} + +pub async fn get_publish_keypair( + state: &S, + network: Network, + publish_keypair: Option<&Keypair>, +) -> Result +where + S: Exporter, + S: Keypairs, +{ + if let Some(kp) = publish_keypair.as_ref() { + // It's impossible to sanely return a &Keypair in the + // other if branch, so we clone the reference. + Ok(Keypair::from_bytes(&kp.to_bytes()) + .context("INTERNAL: Could not convert keypair to bytes and back")?) + } else { + // Request the keypair from remote keypair loader. Doing + // this here guarantees that the up to date loaded keypair + // is being used. + // + // Currently, we're guaranteed not to clog memory or block + // the keypair loader under the following assumptions: + // - The Exporter publishing loop waits for a publish + // attempt to finish before beginning the next + // one. Currently realized in run() + // - The Remote Key Loader does not read channels for + // keypairs it does not have. Currently expressed in + // handle_key_requests() in remote_keypair_loader.rs + + tracing::debug!("Exporter: Publish keypair is None, requesting remote loaded key"); + let kp = Keypairs::request_keypair(state, network).await?; + tracing::debug!("Exporter: Keypair received"); + Ok(kp) + } +} + +async fn estimate_compute_unit_price_micro_lamports( + rpc_client: &RpcClient, + price_accounts: &[Pubkey], +) -> Result> +where + for<'a> &'a S: Into<&'a ExporterState>, + S: Exporter, +{ + let mut slot_compute_fee: BTreeMap = BTreeMap::new(); + + // Maximum allowed number of accounts is 128. So we need to chunk the requests + let prioritization_fees_batches = futures_util::future::join_all( + price_accounts + .chunks(128) + .map(|price_accounts| rpc_client.get_recent_prioritization_fees(price_accounts)), + ) + .await + .into_iter() + .collect::, _>>() + .context("Failed to get recent prioritization fees")?; + + prioritization_fees_batches + .iter() + .for_each(|prioritization_fees| { + prioritization_fees.iter().for_each(|fee| { + // Get the maximum prioritaztion fee over all fees retrieved for this slot + let prioritization_fee = slot_compute_fee + .get(&fee.slot) + .map_or(fee.prioritization_fee, |other| { + fee.prioritization_fee.max(*other) + }); + slot_compute_fee.insert(fee.slot, prioritization_fee); + }) + }); + + let mut prioritization_fees = slot_compute_fee + .iter() + .rev() + .take(20) // Only take the last 20 slot priority fees + .map(|(_, fee)| *fee) + .collect::>(); + + prioritization_fees.sort(); + + let median_priority_fee = prioritization_fees + .get(prioritization_fees.len() / 2) + .cloned(); + + Ok(median_priority_fee) +} + +/// Publishes any price updates in the local store that we haven't sent to this network. +/// +/// The strategy used to do this is as follows: +/// - Fetch all the price updates currently present in the local store +/// - Filter out price updates we have previously attempted to publish, or which are +/// too old to publish. +/// - Collect the price updates into batches. +/// - Publish all the batches, staggering them evenly over the interval at which this method is called. +/// +/// This design is intended to: +/// - Decouple the rate at which the local store is updated and the rate at which we publish transactions. +/// A user sending an unusually high rate of price updates shouldn't be reflected in the transaction rate. +/// - Degrade gracefully if the blockchain RPC node exhibits poor performance. If the RPC node takes a long +/// time to respond, no internal queues grow unboundedly. At any single point in time there are at most +/// (n / batch_size) requests in flight. +pub async fn publish_batches( + state: &S, + client: Arc, + network: Network, + network_state_rx: &watch::Receiver, + accumulator_key: Option, + publish_keypair: &Keypair, + program_key: Pubkey, + max_batch_size: usize, + staleness_threshold: Duration, + compute_unit_limit: u32, + compute_unit_price_micro_lamports: Option, + maximum_compute_unit_price_micro_lamports: u64, + maximum_slot_gap_for_dynamic_compute_unit_price: u64, + dynamic_compute_unit_pricing_enabled: bool, + permissioned_updates: Vec<(pyth_sdk::Identifier, PriceInfo)>, +) -> Result<()> +where + S: Sync + Send + 'static, + S: Keypairs, + S: LocalStore, + S: GlobalStore, + S: Transactions, + S: Exporter, +{ + if permissioned_updates.is_empty() { + return Ok(()); + } + + // Split the updates up into batches + let batches = permissioned_updates.chunks(max_batch_size); + + let mut batch_state = HashMap::new(); + let mut batch_futures = vec![]; + + let network_state = network_state_rx.borrow().clone(); + for batch in batches { + batch_futures.push(publish_batch( + state, + client.clone(), + network, + network_state, + accumulator_key, + publish_keypair, + program_key, + batch, + staleness_threshold, + compute_unit_limit, + compute_unit_price_micro_lamports, + maximum_compute_unit_price_micro_lamports, + maximum_slot_gap_for_dynamic_compute_unit_price, + dynamic_compute_unit_pricing_enabled, + )); + + for (identifier, info) in batch { + batch_state.insert(*identifier, (*info).clone()); + } + } + + // Wait for all the update requests to complete. Note that this doesn't wait for the + // transactions themselves to be processed or confirmed, just the RPC requests to return. + join_all(batch_futures) + .await + .into_iter() + .collect::>>()?; + + Exporter::record_publish(state, batch_state).await; + Ok(()) +} + +async fn publish_batch( + state: &S, + client: Arc, + network: Network, + network_state: NetworkState, + accumulator_key: Option, + publish_keypair: &Keypair, + program_key: Pubkey, + batch: &[(Identifier, PriceInfo)], + staleness_threshold: Duration, + compute_unit_limit: u32, + compute_unit_price_micro_lamports_opt: Option, + maximum_compute_unit_price_micro_lamports: u64, + maximum_slot_gap_for_dynamic_compute_unit_price: u64, + dynamic_compute_unit_pricing_enabled: bool, +) -> Result<()> +where + S: Sync + Send + 'static, + S: Keypairs, + S: LocalStore, + S: GlobalStore, + S: Transactions, + S: Exporter, +{ + let mut instructions = Vec::new(); + + // Refresh the data in the batch + let local_store_contents = LocalStore::get_all_price_infos(&*state).await; + let refreshed_batch = batch.iter().map(|(identifier, _)| { + ( + identifier, + local_store_contents + .get(identifier) + .ok_or_else(|| anyhow!("price identifier not found in local store")) + .with_context(|| identifier.to_string()), + ) + }); + let price_accounts = refreshed_batch + .clone() + .map(|(identifier, _)| Pubkey::from(identifier.to_bytes())) + .collect::>(); + + for (identifier, price_info_result) in refreshed_batch { + let price_info = price_info_result?; + let now = Utc::now().naive_utc(); + + let stale_price = now > price_info.timestamp + staleness_threshold; + if stale_price { + continue; + } + + let instruction = if let Some(accumulator_program_key) = accumulator_key { + create_instruction_with_accumulator( + publish_keypair.pubkey(), + program_key, + Pubkey::from(identifier.to_bytes()), + price_info, + network_state.current_slot, + accumulator_program_key, + )? + } else { + create_instruction_without_accumulator( + publish_keypair.pubkey(), + program_key, + Pubkey::from(identifier.to_bytes()), + price_info, + network_state.current_slot, + )? + }; + + instructions.push(instruction); + } + + // Pay priority fees, if configured + let total_compute_limit: u32 = compute_unit_limit * instructions.len() as u32; + + instructions.push(ComputeBudgetInstruction::set_compute_unit_limit( + total_compute_limit, + )); + + // Calculate the compute unit price in micro lamports + let mut compute_unit_price_micro_lamports = None; + + // If the unit price value is set, use it as the minimum price + if let Some(price) = compute_unit_price_micro_lamports_opt { + compute_unit_price_micro_lamports = Some(price); + } + + // If dynamic compute unit pricing is enabled, we use the following two methods to calculate an + // estimate of the price: + // - We exponentially increase price based on the price staleness (slot gap between the + // current slot and the oldest slot amongst all the accounts in this batch). + // - We use the network recent prioritization fees to get the minimum unit price + // that landed a transaction using Pyth price accounts (permissioned to this publisher) + // as writable. We take the median over the last 20 slots and divide it by two to make + // sure that it decays over time. The API doesn't return the priority fees for the Pyth + // price reads and so, this reflects the unit price that publishers have paid in the + // pverious slots. + // + // The two methods above combined act like massively increasing the price when they cannot + // land transactions on-chain that decays over time. The decaying behaviour is important to + // keep the uptime high during congestion whereas without it we would publish price after a + // large gap and then we can publish it again after the next large gap. + if dynamic_compute_unit_pricing_enabled { + // Use the estimated previous price if it is higher + // than the current price. + let recent_compute_unit_price_micro_lamports = + Exporter::get_recent_compute_unit_price_micro_lamports(state).await; + + if let Some(estimated_recent_price) = recent_compute_unit_price_micro_lamports { + // Get the estimated compute unit price and wrap it so it stays below the maximum + // total compute unit fee. We additionally divide such price by 2 to create an + // exponential decay. This will make sure that a spike doesn't get propagated + // forever. + let estimated_price = estimated_recent_price >> 1; + + compute_unit_price_micro_lamports = compute_unit_price_micro_lamports + .map(|price| price.max(estimated_price)) + .or(Some(estimated_price)); + } + + // Use exponentially higher price if this publisher hasn't published in a while for the accounts + // in this batch. This will use the maximum total compute unit fee if the publisher + // hasn't updated for >= MAXIMUM_SLOT_GAP_FOR_DYNAMIC_COMPUTE_UNIT_PRICE slots. + let result = GlobalStore::price_accounts( + &*state, + network, + price_accounts.clone().into_iter().collect(), + ) + .await?; + + // Calculate the maximum slot difference between the publisher latest slot and + // current slot amongst all the accounts. Here, the aggregate slot is + // used instead of the publishers latest update to avoid overpaying. + let oldest_slot = result + .values() + .filter(|account| account.min_pub != 255) // Only consider live price accounts + .flat_map(|account| { + account + .comp + .iter() + .find(|c| c.publisher == publish_keypair.pubkey()) + .map(|c| c.latest.pub_slot.max(account.agg.pub_slot)) + }) + .min(); + + if let Some(oldest_slot) = oldest_slot { + let slot_gap = network_state.current_slot.saturating_sub(oldest_slot); + + // Set the dynamic price exponentially based on the slot gap. If the max slot gap is + // 25, on this number (or more) the maximum unit price is paid, and then on slot 24 it + // is half of that and gets halved each lower slot. Given that we have max total + // compute price of 10**12 and 250k compute units in one tx (12 updates) these are the + // estimated prices based on slot gaps: + // 25 (or more): 4_000_000 + // 20 : 125_000 + // 18 : 31_250 + // 15 : 3_906 + // 13 : 976 + // 10 : 122 + let exponential_price = maximum_compute_unit_price_micro_lamports + >> maximum_slot_gap_for_dynamic_compute_unit_price.saturating_sub(slot_gap); + + compute_unit_price_micro_lamports = compute_unit_price_micro_lamports + .map(|price| price.max(exponential_price)) + .or(Some(exponential_price)); + } + } + + if let Some(mut compute_unit_price_micro_lamports) = compute_unit_price_micro_lamports { + compute_unit_price_micro_lamports = + compute_unit_price_micro_lamports.min(maximum_compute_unit_price_micro_lamports); + + tracing::debug!( + unit_price = compute_unit_price_micro_lamports, + "setting compute unit price", + ); + instructions.push(ComputeBudgetInstruction::set_compute_unit_price( + compute_unit_price_micro_lamports, + )); + } + + let transaction = Transaction::new_signed_with_payer( + &instructions, + Some(&publish_keypair.pubkey()), + &vec![&publish_keypair], + network_state.blockhash, + ); + + let signature = match client + .send_transaction_with_config( + &transaction, + RpcSendTransactionConfig { + skip_preflight: true, + ..RpcSendTransactionConfig::default() + }, + ) + .await + { + Ok(signature) => signature, + Err(err) => { + tracing::error!(err = ?err, "Exporter: failed to send transaction."); + return Ok(()); + } + }; + + tracing::debug!( + signature = signature.to_string(), + instructions = instructions.len(), + price_accounts = ?price_accounts, + "Sent upd_price transaction.", + ); + + Transactions::add_transaction(&*state, signature).await; + + Ok(()) +} + +fn create_instruction_without_accumulator( + publish_pubkey: Pubkey, + program_key: Pubkey, + price_id: Pubkey, + price_info: &PriceInfo, + current_slot: u64, +) -> Result { + Ok(Instruction { + program_id: program_key, + accounts: vec![ + AccountMeta { + pubkey: publish_pubkey, + is_signer: true, + is_writable: true, + }, + AccountMeta { + pubkey: price_id, + is_signer: false, + is_writable: true, + }, + AccountMeta { + pubkey: clock::id(), + is_signer: false, + is_writable: false, + }, + ], + data: bincode::DefaultOptions::new() + .with_little_endian() + .with_fixint_encoding() + .serialize( + &(UpdPriceCmd { + version: PYTH_ORACLE_VERSION, + cmd: UPDATE_PRICE_NO_FAIL_ON_ERROR, + status: price_info.status, + unused_: 0, + price: price_info.price, + conf: price_info.conf, + pub_slot: current_slot, + }), + )?, + }) +} + +fn create_instruction_with_accumulator( + publish_pubkey: Pubkey, + program_key: Pubkey, + price_id: Pubkey, + price_info: &PriceInfo, + current_slot: u64, + accumulator_program_key: Pubkey, +) -> Result { + let (whitelist_pubkey, _whitelist_bump) = Pubkey::find_program_address( + &["message".as_bytes(), "whitelist".as_bytes()], + &accumulator_program_key, + ); + + let (oracle_auth_pda, _) = Pubkey::find_program_address( + &[b"upd_price_write", &accumulator_program_key.to_bytes()], + &program_key, + ); + + let (accumulator_data_pubkey, _accumulator_data_pubkey) = Pubkey::find_program_address( + &[ + &oracle_auth_pda.to_bytes(), + "message".as_bytes(), + &price_id.to_bytes(), + ], + &accumulator_program_key, + ); + + Ok(Instruction { + program_id: program_key, + accounts: vec![ + AccountMeta { + pubkey: publish_pubkey, + is_signer: true, + is_writable: true, + }, + AccountMeta { + pubkey: price_id, + is_signer: false, + is_writable: true, + }, + AccountMeta { + pubkey: clock::id(), + is_signer: false, + is_writable: false, + }, + // accumulator program key + AccountMeta { + pubkey: accumulator_program_key, + is_signer: false, + is_writable: false, + }, + // whitelist + AccountMeta { + pubkey: whitelist_pubkey, + is_signer: false, + is_writable: false, + }, + // oracle_auth_pda + AccountMeta { + pubkey: oracle_auth_pda, + is_signer: false, + is_writable: false, + }, + // accumulator_data + AccountMeta { + pubkey: accumulator_data_pubkey, + is_signer: false, + is_writable: true, + }, + ], + data: bincode::DefaultOptions::new() + .with_little_endian() + .with_fixint_encoding() + .serialize( + &(UpdPriceCmd { + version: PYTH_ORACLE_VERSION, + cmd: UPDATE_PRICE_NO_FAIL_ON_ERROR, + status: price_info.status, + unused_: 0, + price: price_info.price, + conf: price_info.conf, + pub_slot: current_slot, + }), + )?, + }) +} diff --git a/src/agent/state/global.rs b/src/agent/state/global.rs index 36033a7..1e41019 100644 --- a/src/agent/state/global.rs +++ b/src/agent/state/global.rs @@ -2,20 +2,19 @@ // on-chain aggregation contracts, across both the primary and secondary networks. // This enables this data to be easily queried by other components. use { - super::State, + super::{ + oracle::{ + PriceEntry, + ProductEntry, + }, + State, + }, crate::agent::{ metrics::{ PriceGlobalMetrics, ProductGlobalMetrics, }, - solana::{ - network::Network, - oracle::{ - self, - PriceEntry, - ProductEntry, - }, - }, + solana::network::Network, }, anyhow::{ anyhow, @@ -35,8 +34,8 @@ use { /// from the primary network. #[derive(Debug, Clone, Default)] pub struct AllAccountsData { - pub product_accounts: HashMap, - pub price_accounts: HashMap, + pub product_accounts: HashMap, + pub price_accounts: HashMap, } /// AllAccountsMetadata contains the metadata for all the price and product accounts. @@ -57,8 +56,8 @@ pub struct ProductAccountMetadata { pub price_accounts: Vec, } -impl From for ProductAccountMetadata { - fn from(product_account: oracle::ProductEntry) -> Self { +impl From for ProductAccountMetadata { + fn from(product_account: ProductEntry) -> Self { ProductAccountMetadata { attr_dict: product_account .account_data @@ -77,8 +76,8 @@ pub struct PriceAccountMetadata { pub expo: i32, } -impl From for PriceAccountMetadata { - fn from(price_account: oracle::PriceEntry) -> Self { +impl From for PriceAccountMetadata { + fn from(price_account: PriceEntry) -> Self { PriceAccountMetadata { expo: price_account.expo, } diff --git a/src/agent/state/keypairs.rs b/src/agent/state/keypairs.rs index a93eca5..5b57962 100644 --- a/src/agent/state/keypairs.rs +++ b/src/agent/state/keypairs.rs @@ -5,64 +5,11 @@ use { super::State, crate::agent::solana::network::Network, - anyhow::{ - Context, - Result, - }, - serde::Deserialize, - solana_client::nonblocking::rpc_client::RpcClient, - solana_sdk::{ - commitment_config::CommitmentConfig, - signature::Keypair, - signer::Signer, - }, - std::{ - net::SocketAddr, - sync::Arc, - }, - tokio::{ - sync::RwLock, - task::JoinHandle, - }, - warp::{ - hyper::StatusCode, - reply::{ - self, - WithStatus, - }, - Filter, - Rejection, - }, + anyhow::Result, + solana_sdk::signature::Keypair, + tokio::sync::RwLock, }; -pub fn default_min_keypair_balance_sol() -> u64 { - 1 -} - -pub fn default_bind_address() -> SocketAddr { - "127.0.0.1:9001" - .parse() - .expect("INTERNAL: Could not build default remote keypair loader bind address") -} - -#[derive(Clone, Debug, Deserialize)] -#[serde(default)] -pub struct Config { - primary_min_keypair_balance_sol: u64, - secondary_min_keypair_balance_sol: u64, - bind_address: SocketAddr, -} - -impl Default for Config { - fn default() -> Self { - Self { - primary_min_keypair_balance_sol: default_min_keypair_balance_sol(), - secondary_min_keypair_balance_sol: default_min_keypair_balance_sol(), - bind_address: default_bind_address(), - } - } -} - #[derive(Default)] pub struct KeypairState { primary_current_keypair: RwLock>, @@ -111,172 +58,3 @@ where } = Some(new_keypair); } } - -pub async fn spawn( - primary_rpc_url: String, - secondary_rpc_url: Option, - config: Config, - state: Arc, -) -> Vec> -where - S: Keypairs, - S: Send + Sync + 'static, - for<'a> &'a S: Into<&'a KeypairState>, -{ - let ip = config.bind_address.ip(); - - if !ip.is_loopback() { - tracing::warn!( - bind_address = ?config.bind_address, - "Remote key loader: bind address is not localhost. Make sure the access on the selected address is secure.", - ); - } - - let primary_upload_route = { - let state = state.clone(); - let rpc_url = primary_rpc_url.clone(); - let min_balance = config.primary_min_keypair_balance_sol; - warp::path!("primary" / "load_keypair") - .and(warp::post()) - .and(warp::body::content_length_limit(1024)) - .and(warp::body::json()) - .and(warp::path::end()) - .and_then(move |kp: Vec| { - let state = state.clone(); - let rpc_url = rpc_url.clone(); - async move { - let response = handle_new_keypair( - state, - Network::Primary, - kp, - min_balance, - rpc_url, - "primary", - ) - .await; - Result::, Rejection>::Ok(response) - } - }) - }; - - let secondary_upload_route = warp::path!("secondary" / "load_keypair") - .and(warp::post()) - .and(warp::body::content_length_limit(1024)) - .and(warp::body::json()) - .and(warp::path::end()) - .and_then(move |kp: Vec| { - let state = state.clone(); - let rpc_url = secondary_rpc_url.clone(); - async move { - if let Some(rpc_url) = rpc_url { - let min_balance = config.secondary_min_keypair_balance_sol; - let response = handle_new_keypair( - state, - Network::Secondary, - kp, - min_balance, - rpc_url, - "secondary", - ) - .await; - Result::, Rejection>::Ok(response) - } else { - Result::, Rejection>::Ok(reply::with_status( - "Secondary network is not active", - StatusCode::SERVICE_UNAVAILABLE, - )) - } - } - }); - - let http_api_jh = { - let (_, serve) = warp::serve(primary_upload_route.or(secondary_upload_route)) - .bind_with_graceful_shutdown(config.bind_address, async { - let _ = crate::agent::EXIT.subscribe().changed().await; - }); - tokio::spawn(serve) - }; - - // WARNING: All jobs spawned here must report their join handles in this vec - vec![http_api_jh] -} - -/// Validate and apply a keypair to the specified mut reference, -/// hiding errors in logs. -/// -/// Returns the appropriate HTTP response depending on checks success. -/// -/// NOTE(2023-03-22): Lifetime bounds are currently necessary -/// because of https://github.com/rust-lang/rust/issues/63033 -async fn handle_new_keypair<'a, 'b: 'a, S>( - state: Arc, - network: Network, - new_keypair_bytes: Vec, - min_keypair_balance_sol: u64, - rpc_url: String, - network_name: &'b str, -) -> WithStatus<&'static str> -where - S: Keypairs, -{ - let mut upload_ok = true; - match Keypair::from_bytes(&new_keypair_bytes) { - Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_url.clone()).await { - Ok(()) => { - Keypairs::update_keypair(&*state, network, kp).await; - } - Err(e) => { - tracing::warn!( - network = network_name, - error = e.to_string(), - "Remote keypair loader: Keypair failed validation", - ); - upload_ok = false; - } - }, - Err(e) => { - tracing::warn!( - network = network_name, - error = e.to_string(), - "Remote keypair loader: Keypair failed validation", - ); - upload_ok = false; - } - } - - if upload_ok { - reply::with_status("keypair upload OK", StatusCode::OK) - } else { - reply::with_status( - "Could not upload keypair. See logs for details.", - StatusCode::BAD_REQUEST, - ) - } -} - -/// Validate keypair balance before using it in transactions. -pub async fn validate_keypair( - kp: &Keypair, - min_keypair_balance_sol: u64, - rpc_url: String, -) -> Result<()> { - let c = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); - - let balance_lamports = c - .get_balance(&kp.pubkey()) - .await - .context("Could not check keypair's balance")?; - - let lamports_in_sol = 1_000_000_000; - - if balance_lamports > min_keypair_balance_sol * lamports_in_sol { - Ok(()) - } else { - Err(anyhow::anyhow!(format!( - "Keypair {} balance of {} SOL below threshold of {} SOL", - kp.pubkey(), - balance_lamports as f64 / lamports_in_sol as f64, - min_keypair_balance_sol - ))) - } -} diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs new file mode 100644 index 0000000..c73cedd --- /dev/null +++ b/src/agent/state/oracle.rs @@ -0,0 +1,618 @@ +use { + super::{ + super::solana::network::Network, + exporter::Exporter, + }, + crate::agent::{ + legacy_schedule::LegacySchedule, + market_schedule::MarketSchedule, + state::{ + global::Update, + Prices, + State, + }, + }, + anyhow::{ + anyhow, + Context, + Result, + }, + pyth_sdk_solana::state::{ + load_mapping_account, + load_product_account, + GenericPriceAccount, + MappingAccount, + PriceComp, + PythnetPriceAccount, + SolanaPriceAccount, + }, + serde::{ + Deserialize, + Serialize, + }, + solana_client::nonblocking::rpc_client::RpcClient, + solana_sdk::{ + account::Account, + commitment_config::CommitmentLevel, + pubkey::Pubkey, + signature::Keypair, + }, + std::{ + collections::{ + HashMap, + HashSet, + }, + time::Duration, + }, + tokio::sync::RwLock, +}; + +#[derive(Debug, Clone)] +pub struct ProductEntry { + pub account_data: pyth_sdk_solana::state::ProductAccount, + pub schedule: MarketSchedule, + pub price_accounts: Vec, + pub publish_interval: Option, +} + +#[derive(Default, Debug, Clone)] +pub struct PricePublishingMetadata { + pub schedule: MarketSchedule, + pub publish_interval: Option, +} + +/// This shim is used to abstract over SolanaPriceAccount and PythnetPriceAccount so we +/// can iterate over either of these. The API is intended to force users to be aware of +/// the account type they have, and so doesn't provide this abstraction (a good thing) +/// and the agent should implement this in a better way. +/// +/// For now, to implement the abstraction in the smallest way possible we use a shim +/// type that uses the size of the accounts to determine the underlying representation +/// and construct the right one regardless of which network we read. This will only work +/// as long as we don't care about any extended fields. +/// +/// TODO: Refactor the agent's network handling code. +#[derive(Copy, Clone, Debug)] +pub struct PriceEntry { + // We intentionally act as if we have a truncated account where the underlying memory is unavailable. + account: GenericPriceAccount<0, ()>, + pub comp: [PriceComp; 64], +} + +impl From for PriceEntry { + fn from(other: SolanaPriceAccount) -> PriceEntry { + unsafe { + // NOTE: We know the size is 32 because It's a Solana account. This is for tests only. + let comp_mem = std::slice::from_raw_parts(other.comp.as_ptr(), 32); + let account = + *(&other as *const SolanaPriceAccount as *const GenericPriceAccount<0, ()>); + let mut comp = [PriceComp::default(); 64]; + comp[0..32].copy_from_slice(comp_mem); + PriceEntry { account, comp } + } + } +} + +impl PriceEntry { + /// Construct the right underlying GenericPriceAccount based on the account size. + pub fn load_from_account(acc: &[u8]) -> Option { + unsafe { + let size = match acc.len() { + n if n == std::mem::size_of::() => 32, + n if n == std::mem::size_of::() => 64, + _ => return None, + }; + + // Getting a pointer to avoid copying the account + let account_ptr = &*(acc.as_ptr() as *const GenericPriceAccount<0, ()>); + let comp_mem = std::slice::from_raw_parts(account_ptr.comp.as_ptr(), size); + let mut comp = [PriceComp::default(); 64]; + comp[0..size].copy_from_slice(comp_mem); + Some(Self { + account: *account_ptr, + comp, + }) + } + } +} + +/// Implement `Deref` so we can access the underlying account fields. +impl std::ops::Deref for PriceEntry { + type Target = GenericPriceAccount<0, ()>; + fn deref(&self) -> &Self::Target { + &self.account + } +} + +#[derive(Default, Debug, Clone)] +pub struct Data { + pub mapping_accounts: HashMap, + pub product_accounts: HashMap, + pub price_accounts: HashMap, + /// publisher => {their permissioned price accounts => price publishing metadata} + pub publisher_permissions: HashMap>, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(default)] +pub struct Config { + /// The commitment level to use when reading data from the RPC node. + pub commitment: CommitmentLevel, + /// The interval with which to poll account information. + #[serde(with = "humantime_serde")] + pub poll_interval_duration: Duration, + /// Whether subscribing to account updates over websocket is enabled + pub subscriber_enabled: bool, + /// Capacity of the channel over which the Subscriber sends updates to the Oracle + pub updates_channel_capacity: usize, + /// Capacity of the channel over which the Poller sends data to the Oracle + pub data_channel_capacity: usize, + + /// Ask the RPC for up to this many product/price accounts in a + /// single request. Tune this setting if you're experiencing + /// timeouts on data fetching. In order to keep concurrent open + /// socket count at bay, the batches are looked up sequentially, + /// trading off overall time it takes to fetch all symbols. + pub max_lookup_batch_size: usize, +} + +impl Default for Config { + fn default() -> Self { + Self { + commitment: CommitmentLevel::Confirmed, + poll_interval_duration: Duration::from_secs(5), + subscriber_enabled: true, + updates_channel_capacity: 10000, + data_channel_capacity: 10000, + max_lookup_batch_size: 100, + } + } +} + +pub struct OracleState { + data: RwLock, +} + +impl OracleState { + pub fn new() -> Self { + Self { + data: Default::default(), + } + } +} + +#[async_trait::async_trait] +pub trait Oracle { + async fn sync_global_store(&self, network: Network) -> Result<()>; + async fn poll_updates( + &self, + network: Network, + mapping_key: Pubkey, + publish_keypair: Option<&Keypair>, + rpc_client: &RpcClient, + max_lookup_batch_size: usize, + ) -> Result<()>; + async fn handle_price_account_update( + &self, + network: Network, + account_key: &Pubkey, + account: &Account, + ) -> Result<()>; +} + +// Allow downcasting State into Keypairs for functions that depend on the `Keypairs` service. +impl<'a> From<&'a State> for &'a OracleState { + fn from(state: &'a State) -> &'a OracleState { + &state.oracle + } +} + +#[async_trait::async_trait] +impl Oracle for T +where + for<'a> &'a T: Into<&'a OracleState>, + T: Send + Sync + 'static, + T: Prices, + T: Exporter, +{ + async fn handle_price_account_update( + &self, + network: Network, + account_key: &Pubkey, + account: &Account, + ) -> Result<()> { + tracing::debug!("Handling account update."); + + let mut data = self.into().data.write().await; + + // We are only interested in price account updates, all other types of updates + // will be fetched using polling. + if !data.price_accounts.contains_key(account_key) { + return Ok(()); + } + + let price_entry = PriceEntry::load_from_account(&account.data) + .with_context(|| format!("load price account {}", account_key))?; + + tracing::debug!( + pubkey = account_key.to_string(), + price = price_entry.agg.price, + conf = price_entry.agg.conf, + status = ?price_entry.agg.status, + "Observed on-chain price account update.", + ); + + data.price_accounts + .insert(*account_key, price_entry.clone()); + + Prices::update_global_price( + self, + network, + &Update::PriceAccountUpdate { + account_key: *account_key, + account: price_entry, + }, + ) + .await?; + + Ok(()) + } + + /// Poll target Solana based chain for Pyth related accounts. + async fn poll_updates( + &self, + network: Network, + mapping_key: Pubkey, + publish_keypair: Option<&Keypair>, + rpc_client: &RpcClient, + max_lookup_batch_size: usize, + ) -> Result<()> { + let mut publisher_permissions = HashMap::new(); + let mapping_accounts = fetch_mapping_accounts(rpc_client, mapping_key).await?; + let (product_accounts, price_accounts) = fetch_product_and_price_accounts( + rpc_client, + max_lookup_batch_size, + mapping_accounts.values(), + ) + .await?; + + for (price_key, price_entry) in price_accounts.iter() { + for component in price_entry.comp { + if component.publisher == Pubkey::default() { + continue; + } + + let component_pub_entry = publisher_permissions + .entry(component.publisher) + .or_insert(HashMap::new()); + + let publisher_permission = if let Some(prod_entry) = + product_accounts.get(&price_entry.prod) + { + PricePublishingMetadata { + schedule: prod_entry.schedule.clone(), + publish_interval: prod_entry.publish_interval.clone(), + } + } else { + tracing::warn!( + price = price_key.to_string(), + missing_product = price_entry.prod.to_string(), + "Oracle: INTERNAL: could not find product from price `prod` field, market hours falling back to 24/7.", + ); + Default::default() + }; + + component_pub_entry.insert(*price_key, publisher_permission); + } + } + + let new_data = Data { + mapping_accounts, + product_accounts, + price_accounts, + publisher_permissions, + }; + + let mut data = self.into().data.write().await; + log_data_diff(&data, &new_data); + *data = new_data; + + Exporter::update_permissions( + self, + network, + publish_keypair, + data.publisher_permissions.clone(), + ) + .await?; + + Ok(()) + } + + /// Sync Product/Price Accounts found by polling to the Global Store. + async fn sync_global_store(&self, network: Network) -> Result<()> { + for (product_account_key, product_account) in + &self.into().data.read().await.product_accounts + { + Prices::update_global_price( + self, + network, + &Update::ProductAccountUpdate { + account_key: *product_account_key, + account: product_account.clone(), + }, + ) + .await + .map_err(|_| anyhow!("failed to notify product account update"))?; + } + + for (price_account_key, price_account) in &self.into().data.read().await.price_accounts { + Prices::update_global_price( + self, + network, + &Update::PriceAccountUpdate { + account_key: *price_account_key, + account: price_account.clone(), + }, + ) + .await + .map_err(|_| anyhow!("failed to notify price account update"))?; + } + + Ok(()) + } +} + +async fn fetch_mapping_accounts( + rpc_client: &RpcClient, + mapping_account_key: Pubkey, +) -> Result> { + let mut accounts = HashMap::new(); + let mut account_key = mapping_account_key; + while account_key != Pubkey::default() { + let account = *load_mapping_account( + &rpc_client + .get_account_data(&account_key) + .await + .with_context(|| format!("load mapping account {}", account_key))?, + )?; + accounts.insert(account_key, account); + account_key = account.next; + } + Ok(accounts) +} + +async fn fetch_product_and_price_accounts<'a, A>( + rpc_client: &RpcClient, + max_lookup_batch_size: usize, + mapping_accounts: A, +) -> Result<(HashMap, HashMap)> +where + A: IntoIterator, +{ + let mut product_keys = vec![]; + + // Get all product keys + for mapping_account in mapping_accounts { + for account_key in mapping_account + .products + .iter() + .filter(|pubkey| **pubkey != Pubkey::default()) + { + product_keys.push(*account_key); + } + } + + let mut product_entries = HashMap::new(); + let mut price_entries = HashMap::new(); + + // Lookup products and their prices using the configured batch size + for product_key_batch in product_keys.as_slice().chunks(max_lookup_batch_size) { + let (mut batch_products, mut batch_prices) = + fetch_batch_of_product_and_price_accounts(&rpc_client, product_key_batch).await?; + + product_entries.extend(batch_products.drain()); + price_entries.extend(batch_prices.drain()); + } + + Ok((product_entries, price_entries)) +} + +async fn fetch_batch_of_product_and_price_accounts( + rpc_client: &RpcClient, + product_key_batch: &[Pubkey], +) -> Result<(HashMap, HashMap)> { + let mut product_entries = HashMap::new(); + + let product_keys = product_key_batch; + + // Look up the batch with a single request + let product_accounts = rpc_client.get_multiple_accounts(product_keys).await?; + + // Log missing products, fill the product entries with initial values + for (product_key, product_account) in product_keys.iter().zip(product_accounts) { + if let Some(prod_acc) = product_account { + let product = load_product_account(prod_acc.data.as_slice()) + .context(format!("Could not parse product account {}", product_key))?; + + #[allow(deprecated)] + let legacy_schedule: LegacySchedule = if let Some((_wsched_key, wsched_val)) = + product.iter().find(|(k, _v)| *k == "weekly_schedule") + { + wsched_val.parse().unwrap_or_else(|err| { + tracing::warn!( + product_key = product_key.to_string(), + weekly_schedule = wsched_val, + "Oracle: Product has weekly_schedule defined but it could not be parsed. Falling back to 24/7 publishing.", + ); + tracing::debug!(err = ?err, "Parsing error context."); + Default::default() + }) + } else { + Default::default() // No market hours specified, meaning 24/7 publishing + }; + + let market_schedule: Option = if let Some((_msched_key, msched_val)) = + product.iter().find(|(k, _v)| *k == "schedule") + { + match msched_val.parse::() { + Ok(schedule) => Some(schedule), + Err(err) => { + tracing::warn!( + product_key = product_key.to_string(), + schedule = msched_val, + "Oracle: Product has schedule defined but it could not be parsed. Falling back to legacy schedule.", + ); + tracing::debug!(err = ?err, "Parsing error context."); + None + } + } + } else { + None + }; + + let publish_interval: Option = if let Some(( + _publish_interval_key, + publish_interval_val, + )) = + product.iter().find(|(k, _v)| *k == "publish_interval") + { + match publish_interval_val.parse::() { + Ok(interval) => Some(Duration::from_secs_f64(interval)), + Err(err) => { + tracing::warn!( + product_key = product_key.to_string(), + publish_interval = publish_interval_val, + "Oracle: Product has publish_interval defined but it could not be parsed. Falling back to None.", + ); + tracing::debug!(err = ?err, "parsing error context"); + None + } + } + } else { + None + }; + + product_entries.insert( + *product_key, + ProductEntry { + account_data: *product, + schedule: market_schedule.unwrap_or_else(|| legacy_schedule.into()), + price_accounts: vec![], + publish_interval, + }, + ); + } else { + tracing::warn!( + product_key = product_key.to_string(), + "Oracle: Could not find product on chain, skipping", + ); + } + } + + let mut price_entries = HashMap::new(); + + // Starting with top-level prices, look up price accounts in + // batches, filling price entries and adding found prices to + // the product entries + let mut todo = product_entries + .values() + .map(|p| p.account_data.px_acc) + .collect::>(); + + while !todo.is_empty() { + let price_accounts = rpc_client.get_multiple_accounts(todo.as_slice()).await?; + + // Any non-zero price.next pubkey will be gathered here and looked up on next iteration + let mut next_todo = vec![]; + + // Process the response of each lookup request. If there's + // a next price, it will be looked up on next iteration, + // as todo gets replaced with next_todo. + for (price_key, price_account) in todo.iter().zip(price_accounts) { + if let Some(price_acc) = price_account { + let price = PriceEntry::load_from_account(&price_acc.data) + .context(format!("Could not parse price account at {}", price_key))?; + + let next_price = price.next; + if let Some(prod) = product_entries.get_mut(&price.prod) { + prod.price_accounts.push(*price_key); + price_entries.insert(*price_key, price); + } else { + tracing::warn!( + missing_product = price.prod.to_string(), + price_key = price_key.to_string(), + "Could not find product entry for price, listed in its prod field, skipping", + ); + + continue; + } + + if next_price != Pubkey::default() { + next_todo.push(next_price); + } + } else { + tracing::warn!( + price_key = price_key.to_string(), + "Could not look up price account on chain, skipping", + ); + continue; + } + } + + todo = next_todo; + } + Ok((product_entries, price_entries)) +} + +fn log_data_diff(data: &Data, new_data: &Data) { + // Log new accounts which have been found + let previous_mapping_accounts = data + .mapping_accounts + .keys() + .cloned() + .collect::>(); + tracing::info!( + new = ?new_data + .mapping_accounts + .keys() + .cloned() + .collect::>().difference(&previous_mapping_accounts), + total = data.mapping_accounts.len(), + "Fetched mapping accounts." + ); + let previous_product_accounts = data + .product_accounts + .keys() + .cloned() + .collect::>(); + tracing::info!( + new = ?new_data + .product_accounts + .keys() + .cloned() + .collect::>().difference(&previous_product_accounts), + total = data.product_accounts.len(), + "Fetched product accounts.", + ); + let previous_price_accounts = data.price_accounts.keys().cloned().collect::>(); + tracing::info!( + new = ?new_data + .price_accounts + .keys() + .cloned() + .collect::>().difference(&previous_price_accounts), + total = data.price_accounts.len(), + "Fetched price accounts.", + ); + + let previous_publishers = data.publisher_permissions.keys().collect::>(); + let new_publishers = new_data + .publisher_permissions + .keys() + .collect::>(); + tracing::info!( + new_publishers = ?new_publishers.difference(&previous_publishers).collect::>(), + total_publishers = new_publishers.len(), + "Updated publisher permissions.", + ); +} diff --git a/src/agent/state/transactions.rs b/src/agent/state/transactions.rs new file mode 100644 index 0000000..22505e7 --- /dev/null +++ b/src/agent/state/transactions.rs @@ -0,0 +1,111 @@ +use { + super::State, + anyhow::Result, + solana_client::nonblocking::rpc_client::RpcClient, + solana_sdk::{ + commitment_config::CommitmentConfig, + signature::Signature, + }, + std::collections::VecDeque, + tokio::sync::RwLock, +}; + +#[derive(Default)] +pub struct TransactionsState { + sent_transactions: RwLock>, + max_transactions: usize, +} + +impl TransactionsState { + pub fn new(max_transactions: usize) -> Self { + Self { + sent_transactions: Default::default(), + max_transactions, + } + } +} + +#[async_trait::async_trait] +pub trait Transactions { + async fn add_transaction(&self, signature: Signature); + async fn poll_transactions_status(&self, rpc: &RpcClient) -> Result<()>; +} + +/// Allow downcasting State into TransactionsState for functions that depend on the `Transactions` service. +impl<'a> From<&'a State> for &'a TransactionsState { + fn from(state: &'a State) -> &'a TransactionsState { + &state.transactions + } +} + +#[async_trait::async_trait] +impl Transactions for T +where + for<'a> &'a T: Into<&'a TransactionsState>, + T: Sync + Send + 'static, +{ + async fn add_transaction(&self, signature: Signature) { + tracing::debug!( + signature = signature.to_string(), + "Monitoring new transaction.", + ); + + // Add the new transaction to the list + let mut txs = self.into().sent_transactions.write().await; + txs.push_back(signature); + + // Pop off the oldest transaction if necessary + if txs.len() > self.into().max_transactions { + txs.pop_front(); + } + } + + async fn poll_transactions_status(&self, rpc: &RpcClient) -> Result<()> { + let mut txs = self.into().sent_transactions.write().await; + if txs.is_empty() { + return Ok(()); + } + + let signatures_contiguous = txs.make_contiguous(); + + // Poll the status of each transaction, in a single RPC request + let statuses = rpc + .get_signature_statuses(signatures_contiguous) + .await? + .value; + + tracing::debug!( + statuses = ?statuses, + "Processing Signature Statuses", + ); + + // Determine the percentage of the recently sent transactions that have successfully been committed + // TODO: expose as metric + let confirmed = statuses + .into_iter() + .zip(signatures_contiguous) + .map(|(status, sig)| status.map(|some_status| (some_status, sig))) // Collate Some() statuses with their tx signatures before flatten() + .flatten() + .filter(|(status, sig)| { + if let Some(err) = status.err.as_ref() { + tracing::warn!( + error = err.to_string(), + tx_signature = sig.to_string(), + "TX status has err value", + ); + } + + status.satisfies_commitment(CommitmentConfig::confirmed()) + }) + .count(); + + let percentage_confirmed = ((confirmed as f64) / (txs.len() as f64)) * 100.0; + + tracing::info!( + percentage_confirmed = format!("{:.}", percentage_confirmed), + "monitoring transaction hit rate", + ); + + Ok(()) + } +}