From 30fb3eb864b9c6d25990e6394bcdc4b61a1f3bbe Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Mon, 2 Sep 2024 12:21:51 +0100 Subject: [PATCH] feat: support batch publish --- Cargo.lock | 11 ++ Cargo.toml | 2 + src/agent/services/exporter.rs | 3 +- src/agent/services/oracle.rs | 2 +- src/agent/solana.rs | 25 +++-- src/agent/state/exporter.rs | 178 +++++++++++++++++++++++---------- src/agent/state/oracle.rs | 3 + 7 files changed, 162 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5919b0f..b7331d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3405,6 +3405,7 @@ dependencies = [ "anyhow", "async-trait", "bincode", + "bytemuck", "chrono", "chrono-tz", "clap 4.5.4", @@ -3423,6 +3424,7 @@ dependencies = [ "portpicker", "prometheus-client", "proptest", + "pyth-price-publisher", "pyth-sdk", "pyth-sdk-solana", "rand 0.8.5", @@ -3447,6 +3449,15 @@ dependencies = [ "winnow 0.6.5", ] +[[package]] +name = "pyth-price-publisher" +version = "0.1.0" +source = "git+https://github.com/pyth-network/pyth-crosschain?branch=add-publisher-program#e487dfd7523422169011fd8344863d389ea04427" +dependencies = [ + "bytemuck", + "thiserror", +] + [[package]] name = "pyth-sdk" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index cb8cb1d..39b3edb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,8 @@ tracing-opentelemetry = "0.24.0" opentelemetry = "0.23.0" opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"]} opentelemetry-otlp = { version = "0.16.0" } +pyth-price-publisher = { git = "https://github.com/pyth-network/pyth-crosschain", branch = "add-publisher-program" } +bytemuck = "1.13.0" [dev-dependencies] tokio-util = { version = "0.7.10", features = ["full"] } diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index 1e6aadd..3631cf8 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -268,7 +268,8 @@ mod exporter { &network_state_rx, key_store.accumulator_key, &publish_keypair, - key_store.program_key, + key_store.oracle_program_key, + key_store.publish_program_key, config.exporter.max_batch_size, config.exporter.staleness_threshold, config.exporter.compute_unit_limit, diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index a9d5237..3876d30 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -73,7 +73,7 @@ where config.clone(), network, state.clone(), - key_store.program_key, + key_store.oracle_program_key, ) .await { diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 71e9eb7..3ea5858 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -92,9 +92,17 @@ pub mod key_store { /// The public key of the Oracle program #[serde( serialize_with = "pubkey_string_ser", - deserialize_with = "pubkey_string_de" + deserialize_with = "pubkey_string_de", + alias = "program_key" // for compatibility + )] + pub oracle_program_key: Pubkey, + /// The public key of the Publish program + #[serde( + serialize_with = "opt_pubkey_string_ser", + deserialize_with = "opt_pubkey_string_de", + default )] - pub program_key: Pubkey, + pub publish_program_key: Option, /// The public key of the root mapping account #[serde( serialize_with = "pubkey_string_ser", @@ -114,13 +122,15 @@ pub mod key_store { /// The keypair used to publish price updates. When None, /// publishing will not start until a new keypair is supplied /// via the remote loading endpoint - pub publish_keypair: Option, + pub publish_keypair: Option, /// Public key of the Oracle program - pub program_key: Pubkey, + pub oracle_program_key: Pubkey, + /// Public key of the Publish program + pub publish_program_key: Option, /// Public key of the root mapping account - pub mapping_key: Pubkey, + pub mapping_key: Pubkey, /// Public key of the accumulator program (if provided) - pub accumulator_key: Option, + pub accumulator_key: Option, } impl KeyStore { @@ -139,7 +149,8 @@ pub mod key_store { Ok(KeyStore { publish_keypair, - program_key: config.program_key, + oracle_program_key: config.oracle_program_key, + publish_program_key: config.publish_program_key, mapping_key: config.mapping_key, accumulator_key: config.accumulator_key, }) diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index 2bceded..aa8ca85 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -20,8 +20,10 @@ use { Result, }, bincode::Options, + bytemuck::cast_slice, chrono::Utc, futures_util::future::join_all, + pyth_price_publisher::accounts::publisher_prices::PublisherPrice, pyth_sdk::Identifier, pyth_sdk_solana::state::PriceStatus, serde::Serialize, @@ -105,7 +107,7 @@ where publish_keypair: &Keypair, staleness_threshold: Duration, unchanged_publish_threshold: Duration, - ) -> Result>; + ) -> Result>; async fn get_recent_compute_unit_price_micro_lamports(&self) -> Option; async fn update_recent_compute_unit_price( &self, @@ -129,6 +131,13 @@ impl<'a> From<&'a State> for &'a ExporterState { } } +#[derive(Debug, Clone)] +pub struct PermissionedUpdate { + pub feed_id: pyth_sdk::Identifier, + pub feed_index: u32, + pub info: PriceInfo, +} + #[async_trait::async_trait] impl Exporter for T where @@ -160,7 +169,7 @@ where publish_keypair: &Keypair, staleness_threshold: Duration, unchanged_publish_threshold: Duration, - ) -> Result> { + ) -> Result> { let local_store_contents = LocalStore::get_all_price_infos(self).await; let now = Utc::now().naive_utc(); @@ -197,22 +206,27 @@ where true // No prior data found, letting the price through } }) - .filter(|(id, _data)| { - let key_from_id = Pubkey::from((*id).clone().to_bytes()); + .filter_map(|(feed_id, info)| { + let key_from_id = Pubkey::from(feed_id.clone().to_bytes()); if let Some(publisher_permission) = our_prices.get(&key_from_id) { let now_utc = Utc::now(); - let ret = publisher_permission.schedule.can_publish_at(&now_utc); - - if !ret { + let can_publish = publisher_permission.schedule.can_publish_at(&now_utc); + + if can_publish { + Some(PermissionedUpdate { + feed_id, + feed_index: publisher_permission.feed_index, + info, + }) + } else { tracing::debug!( price_account = key_from_id.to_string(), schedule = ?publisher_permission.schedule, utc_time = now_utc.format("%c").to_string(), "Exporter: Attempted to publish price outside market hours", ); + None } - - ret } else { // Note: This message is not an error. Some // publishers have different permissions on @@ -222,12 +236,12 @@ where permissioned_accounts = ?self.into().our_prices, "Exporter: Attempted to publish a price without permission, skipping", ); - false + None } }) - .filter(|(id, info)| { + .filter(|update| { // Filtering out prices that are being updated too frequently according to publisher_permission.publish_interval - let last_info = match last_published_state.get(id) { + let last_info = match last_published_state.get(&update.feed_id) { Some(last_info) => last_info, None => { // No prior data found, letting the price through @@ -235,7 +249,7 @@ where } }; - let key_from_id = Pubkey::from((*id).clone().to_bytes()); + let key_from_id = Pubkey::from((update.feed_id).clone().to_bytes()); let publisher_metadata = match our_prices.get(&key_from_id) { Some(metadata) => metadata, None => { @@ -245,7 +259,7 @@ where }; if let Some(publish_interval) = publisher_metadata.publish_interval { - if info.timestamp < last_info.timestamp + publish_interval { + if update.info.timestamp < last_info.timestamp + publish_interval { // Updating the price too soon after the last update, skipping return false; } @@ -287,7 +301,7 @@ where .await?; let price_accounts = permissioned_updates .iter() - .map(|(identifier, _)| Pubkey::from(identifier.to_bytes())) + .map(|update| Pubkey::from(update.feed_id.to_bytes())) .collect::>(); *self @@ -435,7 +449,8 @@ pub async fn publish_batches( network_state_rx: &watch::Receiver, accumulator_key: Option, publish_keypair: &Keypair, - program_key: Pubkey, + oracle_program_key: Pubkey, + publish_program_key: Option, max_batch_size: usize, staleness_threshold: Duration, compute_unit_limit: u32, @@ -443,7 +458,7 @@ pub async fn publish_batches( maximum_compute_unit_price_micro_lamports: u64, maximum_slot_gap_for_dynamic_compute_unit_price: u64, dynamic_compute_unit_pricing_enabled: bool, - permissioned_updates: Vec<(pyth_sdk::Identifier, PriceInfo)>, + permissioned_updates: Vec, ) -> Result<()> where S: Sync + Send + 'static, @@ -472,7 +487,8 @@ where network_state, accumulator_key, publish_keypair, - program_key, + oracle_program_key, + publish_program_key, batch, staleness_threshold, compute_unit_limit, @@ -482,8 +498,8 @@ where dynamic_compute_unit_pricing_enabled, )); - for (identifier, info) in batch { - batch_state.insert(*identifier, (*info).clone()); + for update in batch { + batch_state.insert(update.feed_id, update.info.clone()); } } @@ -505,7 +521,7 @@ where blockhash = network_state.blockhash.to_string(), current_slot = network_state.current_slot, staleness_threshold = staleness_threshold.as_millis(), - batch = ?batch.iter().map(|(identifier, _)| identifier.to_string()).collect::>(), + batch = ?batch.iter().map(|update| update.feed_id.to_string()).collect::>(), ) )] async fn publish_batch( @@ -515,8 +531,9 @@ async fn publish_batch( network_state: NetworkState, accumulator_key: Option, publish_keypair: &Keypair, - program_key: Pubkey, - batch: &[(Identifier, PriceInfo)], + oracle_program_key: Pubkey, + publish_program_key: Option, + batch: &[PermissionedUpdate], staleness_threshold: Duration, compute_unit_limit: u32, compute_unit_price_micro_lamports_opt: Option, @@ -534,46 +551,55 @@ where { let mut instructions = Vec::new(); - // Refresh the data in the batch - let local_store_contents = LocalStore::get_all_price_infos(&*state).await; - let refreshed_batch = batch.iter().map(|(identifier, _)| { - ( - identifier, - local_store_contents - .get(identifier) - .ok_or_else(|| anyhow!("price identifier not found in local store")) - .with_context(|| identifier.to_string()), - ) - }); - let price_accounts = refreshed_batch - .clone() - .map(|(identifier, _)| Pubkey::from(identifier.to_bytes())) + let price_accounts = batch + .iter() + .map(|update| Pubkey::from(update.feed_id.to_bytes())) .collect::>(); - for (identifier, price_info_result) in refreshed_batch { - let price_info = price_info_result?; - let now = Utc::now().naive_utc(); - - let stale_price = now > price_info.timestamp + staleness_threshold; + let now = Utc::now().naive_utc(); + let mut updates = Vec::new(); + // Refresh the data in the batch + let local_store_contents = LocalStore::get_all_price_infos(&*state).await; + for update in batch { + let mut update = update.clone(); + update.info = local_store_contents + .get(&update.feed_id) + .ok_or_else(|| anyhow!("price identifier not found in local store")) + .with_context(|| update.feed_id.to_string())? + .clone(); + + let stale_price = now > update.info.timestamp + staleness_threshold; if stale_price { continue; } + updates.push(update); + } + if let Some(publish_program_key) = publish_program_key { + let (instruction, unsupported_updates) = create_instruction_with_publish_program( + publish_keypair.pubkey(), + publish_program_key, + updates, + )?; + updates = unsupported_updates; + instructions.push(instruction); + } + for update in updates { let instruction = if let Some(accumulator_program_key) = accumulator_key { create_instruction_with_accumulator( publish_keypair.pubkey(), - program_key, - Pubkey::from(identifier.to_bytes()), - price_info, + oracle_program_key, + Pubkey::from(update.feed_id.to_bytes()), + &update.info, network_state.current_slot, accumulator_program_key, )? } else { create_instruction_without_accumulator( publish_keypair.pubkey(), - program_key, - Pubkey::from(identifier.to_bytes()), - price_info, + oracle_program_key, + Pubkey::from(update.feed_id.to_bytes()), + &update.info, network_state.current_slot, )? }; @@ -730,13 +756,13 @@ where fn create_instruction_without_accumulator( publish_pubkey: Pubkey, - program_key: Pubkey, + oracle_program_key: Pubkey, price_id: Pubkey, price_info: &PriceInfo, current_slot: u64, ) -> Result { Ok(Instruction { - program_id: program_key, + program_id: oracle_program_key, accounts: vec![ AccountMeta { pubkey: publish_pubkey, @@ -771,9 +797,55 @@ fn create_instruction_without_accumulator( }) } +fn create_instruction_with_publish_program( + publish_pubkey: Pubkey, + publish_program_key: Pubkey, + prices: Vec, +) -> Result<(Instruction, Vec)> { + let mut unsupported_updates = Vec::new(); + let (buffer_key, _buffer_bump) = Pubkey::find_program_address( + &["BUFFER".as_bytes(), &publish_pubkey.to_bytes()], + &publish_program_key, + ); + + let mut values = Vec::new(); + for update in prices { + if update.feed_index == 0 { + unsupported_updates.push(update); + } else { + values.push(PublisherPrice::new( + update.feed_index, + (update.info.status as u8).into(), + update.info.price, + update.info.conf, + )?); + } + } + let mut data = vec![1]; + data.extend(cast_slice(&values)); + + let instruction = Instruction { + program_id: publish_program_key, + accounts: vec![ + AccountMeta { + pubkey: publish_pubkey, + is_signer: true, + is_writable: true, + }, + AccountMeta { + pubkey: buffer_key, + is_signer: false, + is_writable: true, + }, + ], + data, + }; + Ok((instruction, unsupported_updates)) +} + fn create_instruction_with_accumulator( publish_pubkey: Pubkey, - program_key: Pubkey, + oracle_program_key: Pubkey, price_id: Pubkey, price_info: &PriceInfo, current_slot: u64, @@ -786,7 +858,7 @@ fn create_instruction_with_accumulator( let (oracle_auth_pda, _) = Pubkey::find_program_address( &[b"upd_price_write", &accumulator_program_key.to_bytes()], - &program_key, + &oracle_program_key, ); let (accumulator_data_pubkey, _accumulator_data_pubkey) = Pubkey::find_program_address( @@ -799,7 +871,7 @@ fn create_instruction_with_accumulator( ); Ok(Instruction { - program_id: program_key, + program_id: oracle_program_key, accounts: vec![ AccountMeta { pubkey: publish_pubkey, diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index eba3b96..39b04b4 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -61,6 +61,7 @@ pub struct ProductEntry { pub struct PricePublishingMetadata { pub schedule: MarketSchedule, pub publish_interval: Option, + pub feed_index: u32, } /// This shim is used to abstract over SolanaPriceAccount and PythnetPriceAccount so we @@ -294,6 +295,8 @@ where PricePublishingMetadata { schedule: prod_entry.schedule.clone(), publish_interval: prod_entry.publish_interval, + // TODO: update sdk + feed_index: price_entry.drv4, } } else { tracing::warn!(