diff --git a/Cargo.lock b/Cargo.lock index 5919b0f..344eb17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3400,11 +3400,12 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.10.4" +version = "2.11.0" dependencies = [ "anyhow", "async-trait", "bincode", + "bytemuck", "chrono", "chrono-tz", "clap 4.5.4", @@ -3423,6 +3424,7 @@ dependencies = [ "portpicker", "prometheus-client", "proptest", + "pyth-price-store", "pyth-sdk", "pyth-sdk-solana", "rand 0.8.5", @@ -3447,6 +3449,16 @@ dependencies = [ "winnow 0.6.5", ] +[[package]] +name = "pyth-price-store" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bbb76932606741e8b690d51d3e78df620df4a13474a2ca5f53ded5f70522cdc" +dependencies = [ + "bytemuck", + "thiserror", +] + [[package]] name = "pyth-sdk" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index cb8cb1d..568cbd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.10.4" +version = "2.11.0" edition = "2021" [[bin]] @@ -56,6 +56,8 @@ tracing-opentelemetry = "0.24.0" opentelemetry = "0.23.0" opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"]} opentelemetry-otlp = { version = "0.16.0" } +pyth-price-store = "0.1.0" +bytemuck = "1.13.0" [dev-dependencies] tokio-util = { version = "0.7.10", features = ["full"] } diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index b65d580..9825bdd 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -262,6 +262,7 @@ mod exporter { config.exporter.staleness_threshold, config.exporter.unchanged_publish_threshold, ).await { + let publisher_buffer_key = Exporter::get_publisher_buffer_key(&*state).await; if let Err(err) = publish_batches( state.clone(), client.clone(), @@ -269,7 +270,9 @@ mod exporter { &network_state_rx, key_store.accumulator_key, &publish_keypair, - key_store.program_key, + key_store.oracle_program_key, + key_store.publish_program_key, + publisher_buffer_key, config.exporter.max_batch_size, config.exporter.staleness_threshold, config.exporter.compute_unit_limit, diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index a9d5237..7de2194 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -62,6 +62,7 @@ where state.clone(), key_store.mapping_key, key_store.publish_keypair, + key_store.publish_program_key, config.oracle.max_lookup_batch_size, ))); @@ -73,7 +74,7 @@ where config.clone(), network, state.clone(), - key_store.program_key, + key_store.oracle_program_key, ) .await { @@ -159,6 +160,7 @@ async fn poller( state: Arc, mapping_key: Pubkey, publish_keypair: Option, + publish_program_key: Option, max_lookup_batch_size: usize, ) where S: Oracle, @@ -183,6 +185,7 @@ async fn poller( network, mapping_key, publish_keypair.as_ref(), + publish_program_key, &client, max_lookup_batch_size, ) diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 71e9eb7..3ea5858 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -92,9 +92,17 @@ pub mod key_store { /// The public key of the Oracle program #[serde( serialize_with = "pubkey_string_ser", - deserialize_with = "pubkey_string_de" + deserialize_with = "pubkey_string_de", + alias = "program_key" // for compatibility + )] + pub oracle_program_key: Pubkey, + /// The public key of the Publish program + #[serde( + serialize_with = "opt_pubkey_string_ser", + deserialize_with = "opt_pubkey_string_de", + default )] - pub program_key: Pubkey, + pub publish_program_key: Option, /// The public key of the root mapping account #[serde( serialize_with = "pubkey_string_ser", @@ -114,13 +122,15 @@ pub mod key_store { /// The keypair used to publish price updates. When None, /// publishing will not start until a new keypair is supplied /// via the remote loading endpoint - pub publish_keypair: Option, + pub publish_keypair: Option, /// Public key of the Oracle program - pub program_key: Pubkey, + pub oracle_program_key: Pubkey, + /// Public key of the Publish program + pub publish_program_key: Option, /// Public key of the root mapping account - pub mapping_key: Pubkey, + pub mapping_key: Pubkey, /// Public key of the accumulator program (if provided) - pub accumulator_key: Option, + pub accumulator_key: Option, } impl KeyStore { @@ -139,7 +149,8 @@ pub mod key_store { Ok(KeyStore { publish_keypair, - program_key: config.program_key, + oracle_program_key: config.oracle_program_key, + publish_program_key: config.publish_program_key, mapping_key: config.mapping_key, accumulator_key: config.accumulator_key, }) diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index 2bceded..199b814 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -16,12 +16,18 @@ use { }, anyhow::{ anyhow, + bail, Context, Result, }, bincode::Options, + bytemuck::{ + bytes_of, + cast_slice, + }, chrono::Utc, futures_util::future::join_all, + pyth_price_store::accounts::buffer::BufferedPrice, pyth_sdk::Identifier, pyth_sdk_solana::state::PriceStatus, serde::Serialize, @@ -81,6 +87,8 @@ pub struct ExporterState { /// Currently known permissioned prices of this publisher along with their market hours our_prices: RwLock>, + publisher_buffer_key: RwLock>, + /// Recent compute unit price in micro lamports (set if dynamic compute unit pricing is enabled) recent_compute_unit_price_micro_lamports: RwLock>, } @@ -105,7 +113,8 @@ where publish_keypair: &Keypair, staleness_threshold: Duration, unchanged_publish_threshold: Duration, - ) -> Result>; + ) -> Result>; + async fn get_publisher_buffer_key(&self) -> Option; async fn get_recent_compute_unit_price_micro_lamports(&self) -> Option; async fn update_recent_compute_unit_price( &self, @@ -114,11 +123,12 @@ where staleness_threshold: Duration, unchanged_publish_threshold: Duration, ) -> Result<()>; - async fn update_permissions( + async fn update_on_chain_state( &self, network: Network, publish_keypair: Option<&Keypair>, publisher_permissions: HashMap>, + publisher_buffer_key: Option, ) -> Result<()>; } @@ -129,6 +139,13 @@ impl<'a> From<&'a State> for &'a ExporterState { } } +#[derive(Debug, Clone)] +pub struct PermissionedUpdate { + pub feed_id: pyth_sdk::Identifier, + pub feed_index: u32, + pub info: PriceInfo, +} + #[async_trait::async_trait] impl Exporter for T where @@ -160,7 +177,7 @@ where publish_keypair: &Keypair, staleness_threshold: Duration, unchanged_publish_threshold: Duration, - ) -> Result> { + ) -> Result> { let local_store_contents = LocalStore::get_all_price_infos(self).await; let now = Utc::now().naive_utc(); @@ -197,22 +214,27 @@ where true // No prior data found, letting the price through } }) - .filter(|(id, _data)| { - let key_from_id = Pubkey::from((*id).clone().to_bytes()); + .filter_map(|(feed_id, info)| { + let key_from_id = Pubkey::from(feed_id.clone().to_bytes()); if let Some(publisher_permission) = our_prices.get(&key_from_id) { let now_utc = Utc::now(); - let ret = publisher_permission.schedule.can_publish_at(&now_utc); - - if !ret { + let can_publish = publisher_permission.schedule.can_publish_at(&now_utc); + + if can_publish { + Some(PermissionedUpdate { + feed_id, + feed_index: publisher_permission.feed_index, + info, + }) + } else { tracing::debug!( price_account = key_from_id.to_string(), schedule = ?publisher_permission.schedule, utc_time = now_utc.format("%c").to_string(), "Exporter: Attempted to publish price outside market hours", ); + None } - - ret } else { // Note: This message is not an error. Some // publishers have different permissions on @@ -222,12 +244,12 @@ where permissioned_accounts = ?self.into().our_prices, "Exporter: Attempted to publish a price without permission, skipping", ); - false + None } }) - .filter(|(id, info)| { + .filter(|update| { // Filtering out prices that are being updated too frequently according to publisher_permission.publish_interval - let last_info = match last_published_state.get(id) { + let last_info = match last_published_state.get(&update.feed_id) { Some(last_info) => last_info, None => { // No prior data found, letting the price through @@ -235,7 +257,7 @@ where } }; - let key_from_id = Pubkey::from((*id).clone().to_bytes()); + let key_from_id = Pubkey::from((update.feed_id).clone().to_bytes()); let publisher_metadata = match our_prices.get(&key_from_id) { Some(metadata) => metadata, None => { @@ -245,7 +267,7 @@ where }; if let Some(publish_interval) = publisher_metadata.publish_interval { - if info.timestamp < last_info.timestamp + publish_interval { + if update.info.timestamp < last_info.timestamp + publish_interval { // Updating the price too soon after the last update, skipping return false; } @@ -255,6 +277,10 @@ where .collect::>()) } + async fn get_publisher_buffer_key(&self) -> Option { + *self.into().publisher_buffer_key.read().await + } + async fn get_recent_compute_unit_price_micro_lamports(&self) -> Option { *self .into() @@ -287,7 +313,7 @@ where .await?; let price_accounts = permissioned_updates .iter() - .map(|(identifier, _)| Pubkey::from(identifier.to_bytes())) + .map(|update| Pubkey::from(update.feed_id.to_bytes())) .collect::>(); *self @@ -301,11 +327,12 @@ where } #[instrument(skip(self, publish_keypair, publisher_permissions))] - async fn update_permissions( + async fn update_on_chain_state( &self, network: Network, publish_keypair: Option<&Keypair>, publisher_permissions: HashMap>, + publisher_buffer_key: Option, ) -> Result<()> { let publish_keypair = get_publish_keypair(self, network, publish_keypair).await?; *self.into().our_prices.write().await = publisher_permissions @@ -318,6 +345,7 @@ where ); HashMap::new() }); + *self.into().publisher_buffer_key.write().await = publisher_buffer_key; Ok(()) } @@ -435,7 +463,9 @@ pub async fn publish_batches( network_state_rx: &watch::Receiver, accumulator_key: Option, publish_keypair: &Keypair, - program_key: Pubkey, + oracle_program_key: Pubkey, + publish_program_key: Option, + publisher_buffer_key: Option, max_batch_size: usize, staleness_threshold: Duration, compute_unit_limit: u32, @@ -443,7 +473,7 @@ pub async fn publish_batches( maximum_compute_unit_price_micro_lamports: u64, maximum_slot_gap_for_dynamic_compute_unit_price: u64, dynamic_compute_unit_pricing_enabled: bool, - permissioned_updates: Vec<(pyth_sdk::Identifier, PriceInfo)>, + permissioned_updates: Vec, ) -> Result<()> where S: Sync + Send + 'static, @@ -472,7 +502,9 @@ where network_state, accumulator_key, publish_keypair, - program_key, + oracle_program_key, + publish_program_key, + publisher_buffer_key, batch, staleness_threshold, compute_unit_limit, @@ -482,8 +514,8 @@ where dynamic_compute_unit_pricing_enabled, )); - for (identifier, info) in batch { - batch_state.insert(*identifier, (*info).clone()); + for update in batch { + batch_state.insert(update.feed_id, update.info.clone()); } } @@ -505,7 +537,7 @@ where blockhash = network_state.blockhash.to_string(), current_slot = network_state.current_slot, staleness_threshold = staleness_threshold.as_millis(), - batch = ?batch.iter().map(|(identifier, _)| identifier.to_string()).collect::>(), + batch = ?batch.iter().map(|update| update.feed_id.to_string()).collect::>(), ) )] async fn publish_batch( @@ -515,8 +547,10 @@ async fn publish_batch( network_state: NetworkState, accumulator_key: Option, publish_keypair: &Keypair, - program_key: Pubkey, - batch: &[(Identifier, PriceInfo)], + oracle_program_key: Pubkey, + publish_program_key: Option, + publisher_buffer_key: Option, + batch: &[PermissionedUpdate], staleness_threshold: Duration, compute_unit_limit: u32, compute_unit_price_micro_lamports_opt: Option, @@ -534,51 +568,62 @@ where { let mut instructions = Vec::new(); - // Refresh the data in the batch - let local_store_contents = LocalStore::get_all_price_infos(&*state).await; - let refreshed_batch = batch.iter().map(|(identifier, _)| { - ( - identifier, - local_store_contents - .get(identifier) - .ok_or_else(|| anyhow!("price identifier not found in local store")) - .with_context(|| identifier.to_string()), - ) - }); - let price_accounts = refreshed_batch - .clone() - .map(|(identifier, _)| Pubkey::from(identifier.to_bytes())) + let price_accounts = batch + .iter() + .map(|update| Pubkey::from(update.feed_id.to_bytes())) .collect::>(); - for (identifier, price_info_result) in refreshed_batch { - let price_info = price_info_result?; - let now = Utc::now().naive_utc(); - - let stale_price = now > price_info.timestamp + staleness_threshold; + let now = Utc::now().naive_utc(); + let mut updates = Vec::new(); + // Refresh the data in the batch + let local_store_contents = LocalStore::get_all_price_infos(&*state).await; + for update in batch { + let mut update = update.clone(); + update.info = local_store_contents + .get(&update.feed_id) + .ok_or_else(|| anyhow!("price identifier not found in local store")) + .with_context(|| update.feed_id.to_string())? + .clone(); + + let stale_price = now > update.info.timestamp + staleness_threshold; if stale_price { continue; } + updates.push(update); + } - let instruction = if let Some(accumulator_program_key) = accumulator_key { - create_instruction_with_accumulator( - publish_keypair.pubkey(), - program_key, - Pubkey::from(identifier.to_bytes()), - price_info, - network_state.current_slot, - accumulator_program_key, - )? - } else { - create_instruction_without_accumulator( - publish_keypair.pubkey(), - program_key, - Pubkey::from(identifier.to_bytes()), - price_info, - network_state.current_slot, - )? - }; - + if let Some(publish_program_key) = publish_program_key { + let instruction = create_instruction_with_publish_program( + publish_keypair.pubkey(), + publish_program_key, + publisher_buffer_key + .context("must specify publisher_buffer_key if publish_program_key is specified")?, + updates, + )?; instructions.push(instruction); + } else { + for update in updates { + let instruction = if let Some(accumulator_program_key) = accumulator_key { + create_instruction_with_accumulator( + publish_keypair.pubkey(), + oracle_program_key, + Pubkey::from(update.feed_id.to_bytes()), + &update.info, + network_state.current_slot, + accumulator_program_key, + )? + } else { + create_instruction_without_accumulator( + publish_keypair.pubkey(), + oracle_program_key, + Pubkey::from(update.feed_id.to_bytes()), + &update.info, + network_state.current_slot, + )? + }; + + instructions.push(instruction); + } } // Pay priority fees, if configured @@ -730,13 +775,13 @@ where fn create_instruction_without_accumulator( publish_pubkey: Pubkey, - program_key: Pubkey, + oracle_program_key: Pubkey, price_id: Pubkey, price_info: &PriceInfo, current_slot: u64, ) -> Result { Ok(Instruction { - program_id: program_key, + program_id: oracle_program_key, accounts: vec![ AccountMeta { pubkey: publish_pubkey, @@ -771,9 +816,67 @@ fn create_instruction_without_accumulator( }) } +fn create_instruction_with_publish_program( + publish_pubkey: Pubkey, + publish_program_key: Pubkey, + publisher_buffer_key: Pubkey, + prices: Vec, +) -> Result { + use pyth_price_store::instruction::{ + Instruction as PublishInstruction, + SubmitPricesArgsHeader, + PUBLISHER_CONFIG_SEED, + }; + let (publisher_config_key, publisher_config_bump) = Pubkey::find_program_address( + &[PUBLISHER_CONFIG_SEED.as_bytes(), &publish_pubkey.to_bytes()], + &publish_program_key, + ); + + let mut values = Vec::new(); + for update in prices { + if update.feed_index == 0 { + bail!("no feed index for feed {:?}", update.feed_id); + } + values.push(BufferedPrice::new( + update.feed_index, + (update.info.status as u8).into(), + update.info.price, + update.info.conf, + )?); + } + let mut data = vec![PublishInstruction::SubmitPrices as u8]; + data.extend_from_slice(bytes_of(&SubmitPricesArgsHeader { + publisher_config_bump, + })); + data.extend(cast_slice(&values)); + + let instruction = Instruction { + program_id: publish_program_key, + accounts: vec![ + AccountMeta { + pubkey: publish_pubkey, + is_signer: true, + is_writable: true, + }, + AccountMeta { + pubkey: publisher_config_key, + is_signer: false, + is_writable: false, + }, + AccountMeta { + pubkey: publisher_buffer_key, + is_signer: false, + is_writable: true, + }, + ], + data, + }; + Ok(instruction) +} + fn create_instruction_with_accumulator( publish_pubkey: Pubkey, - program_key: Pubkey, + oracle_program_key: Pubkey, price_id: Pubkey, price_info: &PriceInfo, current_slot: u64, @@ -786,7 +889,7 @@ fn create_instruction_with_accumulator( let (oracle_auth_pda, _) = Pubkey::find_program_address( &[b"upd_price_write", &accumulator_program_key.to_bytes()], - &program_key, + &oracle_program_key, ); let (accumulator_data_pubkey, _accumulator_data_pubkey) = Pubkey::find_program_address( @@ -799,7 +902,7 @@ fn create_instruction_with_accumulator( ); Ok(Instruction { - program_id: program_key, + program_id: oracle_program_key, accounts: vec![ AccountMeta { pubkey: publish_pubkey, diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index eba3b96..c09a2ee 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -18,6 +18,7 @@ use { Context, Result, }, + pyth_price_store::instruction::PUBLISHER_CONFIG_SEED, pyth_sdk_solana::state::{ load_mapping_account, load_product_account, @@ -37,6 +38,7 @@ use { commitment_config::CommitmentLevel, pubkey::Pubkey, signature::Keypair, + signer::Signer, }, std::{ collections::{ @@ -61,6 +63,7 @@ pub struct ProductEntry { pub struct PricePublishingMetadata { pub schedule: MarketSchedule, pub publish_interval: Option, + pub feed_index: u32, } /// This shim is used to abstract over SolanaPriceAccount and PythnetPriceAccount so we @@ -134,6 +137,7 @@ pub struct Data { pub price_accounts: HashMap, /// publisher => {their permissioned price accounts => price publishing metadata} pub publisher_permissions: HashMap>, + pub publisher_buffer_key: Option, } #[derive(Clone, Serialize, Deserialize, Debug)] @@ -192,6 +196,7 @@ pub trait Oracle { network: Network, mapping_key: Pubkey, publish_keypair: Option<&Keypair>, + publish_program_key: Option, rpc_client: &RpcClient, max_lookup_batch_size: usize, ) -> Result<()>; @@ -266,6 +271,7 @@ where network: Network, mapping_key: Pubkey, publish_keypair: Option<&Keypair>, + publish_program_key: Option, rpc_client: &RpcClient, max_lookup_batch_size: usize, ) -> Result<()> { @@ -294,6 +300,8 @@ where PricePublishingMetadata { schedule: prod_entry.schedule.clone(), publish_interval: prod_entry.publish_interval, + // TODO: update sdk + feed_index: price_entry.drv4, } } else { tracing::warn!( @@ -308,22 +316,44 @@ where } } + let mut publisher_buffer_key = None; + if let (Some(publish_program_key), Some(publish_keypair)) = + (publish_program_key, publish_keypair) + { + match fetch_publisher_buffer_key( + rpc_client, + publish_program_key, + publish_keypair.pubkey(), + ) + .await + { + Ok(r) => { + publisher_buffer_key = Some(r); + } + Err(err) => { + tracing::warn!("failed to fetch publisher buffer key: {:?}", err); + } + } + } + let new_data = Data { mapping_accounts, product_accounts, price_accounts, publisher_permissions, + publisher_buffer_key, }; let mut data = self.into().data.write().await; log_data_diff(&data, &new_data); *data = new_data; - Exporter::update_permissions( + Exporter::update_on_chain_state( self, network, publish_keypair, data.publisher_permissions.clone(), + data.publisher_buffer_key, ) .await?; @@ -364,6 +394,23 @@ where } } +async fn fetch_publisher_buffer_key( + rpc_client: &RpcClient, + publish_program_key: Pubkey, + publisher_pubkey: Pubkey, +) -> Result { + let (publisher_config_key, _bump) = Pubkey::find_program_address( + &[ + PUBLISHER_CONFIG_SEED.as_bytes(), + &publisher_pubkey.to_bytes(), + ], + &publish_program_key, + ); + let data = rpc_client.get_account_data(&publisher_config_key).await?; + let config = pyth_price_store::accounts::publisher_config::read(&data)?; + Ok(config.buffer_account.into()) +} + #[instrument(skip(rpc_client))] async fn fetch_mapping_accounts( rpc_client: &RpcClient,