From 30fb3eb864b9c6d25990e6394bcdc4b61a1f3bbe Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Mon, 2 Sep 2024 12:21:51 +0100 Subject: [PATCH 1/5] feat: support batch publish --- Cargo.lock | 11 ++ Cargo.toml | 2 + src/agent/services/exporter.rs | 3 +- src/agent/services/oracle.rs | 2 +- src/agent/solana.rs | 25 +++-- src/agent/state/exporter.rs | 178 +++++++++++++++++++++++---------- src/agent/state/oracle.rs | 3 + 7 files changed, 162 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5919b0f..b7331d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3405,6 +3405,7 @@ dependencies = [ "anyhow", "async-trait", "bincode", + "bytemuck", "chrono", "chrono-tz", "clap 4.5.4", @@ -3423,6 +3424,7 @@ dependencies = [ "portpicker", "prometheus-client", "proptest", + "pyth-price-publisher", "pyth-sdk", "pyth-sdk-solana", "rand 0.8.5", @@ -3447,6 +3449,15 @@ dependencies = [ "winnow 0.6.5", ] +[[package]] +name = "pyth-price-publisher" +version = "0.1.0" +source = "git+https://github.com/pyth-network/pyth-crosschain?branch=add-publisher-program#e487dfd7523422169011fd8344863d389ea04427" +dependencies = [ + "bytemuck", + "thiserror", +] + [[package]] name = "pyth-sdk" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index cb8cb1d..39b3edb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,8 @@ tracing-opentelemetry = "0.24.0" opentelemetry = "0.23.0" opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"]} opentelemetry-otlp = { version = "0.16.0" } +pyth-price-publisher = { git = "https://github.com/pyth-network/pyth-crosschain", branch = "add-publisher-program" } +bytemuck = "1.13.0" [dev-dependencies] tokio-util = { version = "0.7.10", features = ["full"] } diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index 1e6aadd..3631cf8 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -268,7 +268,8 @@ mod exporter { &network_state_rx, key_store.accumulator_key, &publish_keypair, - key_store.program_key, + key_store.oracle_program_key, + key_store.publish_program_key, config.exporter.max_batch_size, config.exporter.staleness_threshold, config.exporter.compute_unit_limit, diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index a9d5237..3876d30 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -73,7 +73,7 @@ where config.clone(), network, state.clone(), - key_store.program_key, + key_store.oracle_program_key, ) .await { diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 71e9eb7..3ea5858 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -92,9 +92,17 @@ pub mod key_store { /// The public key of the Oracle program #[serde( serialize_with = "pubkey_string_ser", - deserialize_with = "pubkey_string_de" + deserialize_with = "pubkey_string_de", + alias = "program_key" // for compatibility + )] + pub oracle_program_key: Pubkey, + /// The public key of the Publish program + #[serde( + serialize_with = "opt_pubkey_string_ser", + deserialize_with = "opt_pubkey_string_de", + default )] - pub program_key: Pubkey, + pub publish_program_key: Option, /// The public key of the root mapping account #[serde( serialize_with = "pubkey_string_ser", @@ -114,13 +122,15 @@ pub mod key_store { /// The keypair used to publish price updates. When None, /// publishing will not start until a new keypair is supplied /// via the remote loading endpoint - pub publish_keypair: Option, + pub publish_keypair: Option, /// Public key of the Oracle program - pub program_key: Pubkey, + pub oracle_program_key: Pubkey, + /// Public key of the Publish program + pub publish_program_key: Option, /// Public key of the root mapping account - pub mapping_key: Pubkey, + pub mapping_key: Pubkey, /// Public key of the accumulator program (if provided) - pub accumulator_key: Option, + pub accumulator_key: Option, } impl KeyStore { @@ -139,7 +149,8 @@ pub mod key_store { Ok(KeyStore { publish_keypair, - program_key: config.program_key, + oracle_program_key: config.oracle_program_key, + publish_program_key: config.publish_program_key, mapping_key: config.mapping_key, accumulator_key: config.accumulator_key, }) diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index 2bceded..aa8ca85 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -20,8 +20,10 @@ use { Result, }, bincode::Options, + bytemuck::cast_slice, chrono::Utc, futures_util::future::join_all, + pyth_price_publisher::accounts::publisher_prices::PublisherPrice, pyth_sdk::Identifier, pyth_sdk_solana::state::PriceStatus, serde::Serialize, @@ -105,7 +107,7 @@ where publish_keypair: &Keypair, staleness_threshold: Duration, unchanged_publish_threshold: Duration, - ) -> Result>; + ) -> Result>; async fn get_recent_compute_unit_price_micro_lamports(&self) -> Option; async fn update_recent_compute_unit_price( &self, @@ -129,6 +131,13 @@ impl<'a> From<&'a State> for &'a ExporterState { } } +#[derive(Debug, Clone)] +pub struct PermissionedUpdate { + pub feed_id: pyth_sdk::Identifier, + pub feed_index: u32, + pub info: PriceInfo, +} + #[async_trait::async_trait] impl Exporter for T where @@ -160,7 +169,7 @@ where publish_keypair: &Keypair, staleness_threshold: Duration, unchanged_publish_threshold: Duration, - ) -> Result> { + ) -> Result> { let local_store_contents = LocalStore::get_all_price_infos(self).await; let now = Utc::now().naive_utc(); @@ -197,22 +206,27 @@ where true // No prior data found, letting the price through } }) - .filter(|(id, _data)| { - let key_from_id = Pubkey::from((*id).clone().to_bytes()); + .filter_map(|(feed_id, info)| { + let key_from_id = Pubkey::from(feed_id.clone().to_bytes()); if let Some(publisher_permission) = our_prices.get(&key_from_id) { let now_utc = Utc::now(); - let ret = publisher_permission.schedule.can_publish_at(&now_utc); - - if !ret { + let can_publish = publisher_permission.schedule.can_publish_at(&now_utc); + + if can_publish { + Some(PermissionedUpdate { + feed_id, + feed_index: publisher_permission.feed_index, + info, + }) + } else { tracing::debug!( price_account = key_from_id.to_string(), schedule = ?publisher_permission.schedule, utc_time = now_utc.format("%c").to_string(), "Exporter: Attempted to publish price outside market hours", ); + None } - - ret } else { // Note: This message is not an error. Some // publishers have different permissions on @@ -222,12 +236,12 @@ where permissioned_accounts = ?self.into().our_prices, "Exporter: Attempted to publish a price without permission, skipping", ); - false + None } }) - .filter(|(id, info)| { + .filter(|update| { // Filtering out prices that are being updated too frequently according to publisher_permission.publish_interval - let last_info = match last_published_state.get(id) { + let last_info = match last_published_state.get(&update.feed_id) { Some(last_info) => last_info, None => { // No prior data found, letting the price through @@ -235,7 +249,7 @@ where } }; - let key_from_id = Pubkey::from((*id).clone().to_bytes()); + let key_from_id = Pubkey::from((update.feed_id).clone().to_bytes()); let publisher_metadata = match our_prices.get(&key_from_id) { Some(metadata) => metadata, None => { @@ -245,7 +259,7 @@ where }; if let Some(publish_interval) = publisher_metadata.publish_interval { - if info.timestamp < last_info.timestamp + publish_interval { + if update.info.timestamp < last_info.timestamp + publish_interval { // Updating the price too soon after the last update, skipping return false; } @@ -287,7 +301,7 @@ where .await?; let price_accounts = permissioned_updates .iter() - .map(|(identifier, _)| Pubkey::from(identifier.to_bytes())) + .map(|update| Pubkey::from(update.feed_id.to_bytes())) .collect::>(); *self @@ -435,7 +449,8 @@ pub async fn publish_batches( network_state_rx: &watch::Receiver, accumulator_key: Option, publish_keypair: &Keypair, - program_key: Pubkey, + oracle_program_key: Pubkey, + publish_program_key: Option, max_batch_size: usize, staleness_threshold: Duration, compute_unit_limit: u32, @@ -443,7 +458,7 @@ pub async fn publish_batches( maximum_compute_unit_price_micro_lamports: u64, maximum_slot_gap_for_dynamic_compute_unit_price: u64, dynamic_compute_unit_pricing_enabled: bool, - permissioned_updates: Vec<(pyth_sdk::Identifier, PriceInfo)>, + permissioned_updates: Vec, ) -> Result<()> where S: Sync + Send + 'static, @@ -472,7 +487,8 @@ where network_state, accumulator_key, publish_keypair, - program_key, + oracle_program_key, + publish_program_key, batch, staleness_threshold, compute_unit_limit, @@ -482,8 +498,8 @@ where dynamic_compute_unit_pricing_enabled, )); - for (identifier, info) in batch { - batch_state.insert(*identifier, (*info).clone()); + for update in batch { + batch_state.insert(update.feed_id, update.info.clone()); } } @@ -505,7 +521,7 @@ where blockhash = network_state.blockhash.to_string(), current_slot = network_state.current_slot, staleness_threshold = staleness_threshold.as_millis(), - batch = ?batch.iter().map(|(identifier, _)| identifier.to_string()).collect::>(), + batch = ?batch.iter().map(|update| update.feed_id.to_string()).collect::>(), ) )] async fn publish_batch( @@ -515,8 +531,9 @@ async fn publish_batch( network_state: NetworkState, accumulator_key: Option, publish_keypair: &Keypair, - program_key: Pubkey, - batch: &[(Identifier, PriceInfo)], + oracle_program_key: Pubkey, + publish_program_key: Option, + batch: &[PermissionedUpdate], staleness_threshold: Duration, compute_unit_limit: u32, compute_unit_price_micro_lamports_opt: Option, @@ -534,46 +551,55 @@ where { let mut instructions = Vec::new(); - // Refresh the data in the batch - let local_store_contents = LocalStore::get_all_price_infos(&*state).await; - let refreshed_batch = batch.iter().map(|(identifier, _)| { - ( - identifier, - local_store_contents - .get(identifier) - .ok_or_else(|| anyhow!("price identifier not found in local store")) - .with_context(|| identifier.to_string()), - ) - }); - let price_accounts = refreshed_batch - .clone() - .map(|(identifier, _)| Pubkey::from(identifier.to_bytes())) + let price_accounts = batch + .iter() + .map(|update| Pubkey::from(update.feed_id.to_bytes())) .collect::>(); - for (identifier, price_info_result) in refreshed_batch { - let price_info = price_info_result?; - let now = Utc::now().naive_utc(); - - let stale_price = now > price_info.timestamp + staleness_threshold; + let now = Utc::now().naive_utc(); + let mut updates = Vec::new(); + // Refresh the data in the batch + let local_store_contents = LocalStore::get_all_price_infos(&*state).await; + for update in batch { + let mut update = update.clone(); + update.info = local_store_contents + .get(&update.feed_id) + .ok_or_else(|| anyhow!("price identifier not found in local store")) + .with_context(|| update.feed_id.to_string())? + .clone(); + + let stale_price = now > update.info.timestamp + staleness_threshold; if stale_price { continue; } + updates.push(update); + } + if let Some(publish_program_key) = publish_program_key { + let (instruction, unsupported_updates) = create_instruction_with_publish_program( + publish_keypair.pubkey(), + publish_program_key, + updates, + )?; + updates = unsupported_updates; + instructions.push(instruction); + } + for update in updates { let instruction = if let Some(accumulator_program_key) = accumulator_key { create_instruction_with_accumulator( publish_keypair.pubkey(), - program_key, - Pubkey::from(identifier.to_bytes()), - price_info, + oracle_program_key, + Pubkey::from(update.feed_id.to_bytes()), + &update.info, network_state.current_slot, accumulator_program_key, )? } else { create_instruction_without_accumulator( publish_keypair.pubkey(), - program_key, - Pubkey::from(identifier.to_bytes()), - price_info, + oracle_program_key, + Pubkey::from(update.feed_id.to_bytes()), + &update.info, network_state.current_slot, )? }; @@ -730,13 +756,13 @@ where fn create_instruction_without_accumulator( publish_pubkey: Pubkey, - program_key: Pubkey, + oracle_program_key: Pubkey, price_id: Pubkey, price_info: &PriceInfo, current_slot: u64, ) -> Result { Ok(Instruction { - program_id: program_key, + program_id: oracle_program_key, accounts: vec![ AccountMeta { pubkey: publish_pubkey, @@ -771,9 +797,55 @@ fn create_instruction_without_accumulator( }) } +fn create_instruction_with_publish_program( + publish_pubkey: Pubkey, + publish_program_key: Pubkey, + prices: Vec, +) -> Result<(Instruction, Vec)> { + let mut unsupported_updates = Vec::new(); + let (buffer_key, _buffer_bump) = Pubkey::find_program_address( + &["BUFFER".as_bytes(), &publish_pubkey.to_bytes()], + &publish_program_key, + ); + + let mut values = Vec::new(); + for update in prices { + if update.feed_index == 0 { + unsupported_updates.push(update); + } else { + values.push(PublisherPrice::new( + update.feed_index, + (update.info.status as u8).into(), + update.info.price, + update.info.conf, + )?); + } + } + let mut data = vec![1]; + data.extend(cast_slice(&values)); + + let instruction = Instruction { + program_id: publish_program_key, + accounts: vec![ + AccountMeta { + pubkey: publish_pubkey, + is_signer: true, + is_writable: true, + }, + AccountMeta { + pubkey: buffer_key, + is_signer: false, + is_writable: true, + }, + ], + data, + }; + Ok((instruction, unsupported_updates)) +} + fn create_instruction_with_accumulator( publish_pubkey: Pubkey, - program_key: Pubkey, + oracle_program_key: Pubkey, price_id: Pubkey, price_info: &PriceInfo, current_slot: u64, @@ -786,7 +858,7 @@ fn create_instruction_with_accumulator( let (oracle_auth_pda, _) = Pubkey::find_program_address( &[b"upd_price_write", &accumulator_program_key.to_bytes()], - &program_key, + &oracle_program_key, ); let (accumulator_data_pubkey, _accumulator_data_pubkey) = Pubkey::find_program_address( @@ -799,7 +871,7 @@ fn create_instruction_with_accumulator( ); Ok(Instruction { - program_id: program_key, + program_id: oracle_program_key, accounts: vec![ AccountMeta { pubkey: publish_pubkey, diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index eba3b96..39b04b4 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -61,6 +61,7 @@ pub struct ProductEntry { pub struct PricePublishingMetadata { pub schedule: MarketSchedule, pub publish_interval: Option, + pub feed_index: u32, } /// This shim is used to abstract over SolanaPriceAccount and PythnetPriceAccount so we @@ -294,6 +295,8 @@ where PricePublishingMetadata { schedule: prod_entry.schedule.clone(), publish_interval: prod_entry.publish_interval, + // TODO: update sdk + feed_index: price_entry.drv4, } } else { tracing::warn!( From 0045bdc761a566424d503a3f36fbcf9d81784d05 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Thu, 5 Sep 2024 11:27:53 +0100 Subject: [PATCH 2/5] feat: update to new batch publish instruction format --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/agent/services/exporter.rs | 1 + src/agent/solana.rs | 21 ++++++-- src/agent/state/exporter.rs | 98 +++++++++++++++++++--------------- 5 files changed, 73 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b7331d3..d9aaec2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3452,7 +3452,7 @@ dependencies = [ [[package]] name = "pyth-price-publisher" version = "0.1.0" -source = "git+https://github.com/pyth-network/pyth-crosschain?branch=add-publisher-program#e487dfd7523422169011fd8344863d389ea04427" +source = "git+https://github.com/pyth-network/pyth-crosschain?rev=4df7172b#4df7172b85f9d1b65b117c58652e945a219f8b82" dependencies = [ "bytemuck", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 39b3edb..293a7ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ tracing-opentelemetry = "0.24.0" opentelemetry = "0.23.0" opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"]} opentelemetry-otlp = { version = "0.16.0" } -pyth-price-publisher = { git = "https://github.com/pyth-network/pyth-crosschain", branch = "add-publisher-program" } +pyth-price-publisher = { git = "https://github.com/pyth-network/pyth-crosschain", rev = "4df7172b" } bytemuck = "1.13.0" [dev-dependencies] diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index 3631cf8..559eaa9 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -270,6 +270,7 @@ mod exporter { &publish_keypair, key_store.oracle_program_key, key_store.publish_program_key, + key_store.publisher_buffer_key, config.exporter.max_batch_size, config.exporter.staleness_threshold, config.exporter.compute_unit_limit, diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 3ea5858..ebbf4c3 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -103,6 +103,13 @@ pub mod key_store { default )] pub publish_program_key: Option, + /// The public key of the publisher's buffer for the Publish program + #[serde( + serialize_with = "opt_pubkey_string_ser", + deserialize_with = "opt_pubkey_string_de", + default + )] + pub publisher_buffer_key: Option, /// The public key of the root mapping account #[serde( serialize_with = "pubkey_string_ser", @@ -122,15 +129,18 @@ pub mod key_store { /// The keypair used to publish price updates. When None, /// publishing will not start until a new keypair is supplied /// via the remote loading endpoint - pub publish_keypair: Option, + pub publish_keypair: Option, /// Public key of the Oracle program - pub oracle_program_key: Pubkey, + pub oracle_program_key: Pubkey, /// Public key of the Publish program - pub publish_program_key: Option, + pub publish_program_key: Option, + /// Public key of the publisher's buffer for the publish program + pub publisher_buffer_key: Option, + /// Public key of the root mapping account - pub mapping_key: Pubkey, + pub mapping_key: Pubkey, /// Public key of the accumulator program (if provided) - pub accumulator_key: Option, + pub accumulator_key: Option, } impl KeyStore { @@ -151,6 +161,7 @@ pub mod key_store { publish_keypair, oracle_program_key: config.oracle_program_key, publish_program_key: config.publish_program_key, + publisher_buffer_key: config.publisher_buffer_key, mapping_key: config.mapping_key, accumulator_key: config.accumulator_key, }) diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index aa8ca85..514e9f6 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -15,15 +15,13 @@ use { }, }, anyhow::{ - anyhow, - Context, - Result, + anyhow, bail, Context, Result }, bincode::Options, - bytemuck::cast_slice, + bytemuck::{bytes_of, cast_slice}, chrono::Utc, futures_util::future::join_all, - pyth_price_publisher::accounts::publisher_prices::PublisherPrice, + pyth_price_publisher::accounts::buffer::BufferedPrice, pyth_sdk::Identifier, pyth_sdk_solana::state::PriceStatus, serde::Serialize, @@ -451,6 +449,7 @@ pub async fn publish_batches( publish_keypair: &Keypair, oracle_program_key: Pubkey, publish_program_key: Option, + publisher_buffer_key: Option, max_batch_size: usize, staleness_threshold: Duration, compute_unit_limit: u32, @@ -489,6 +488,7 @@ where publish_keypair, oracle_program_key, publish_program_key, + publisher_buffer_key, batch, staleness_threshold, compute_unit_limit, @@ -533,6 +533,7 @@ async fn publish_batch( publish_keypair: &Keypair, oracle_program_key: Pubkey, publish_program_key: Option, + publisher_buffer_key: Option, batch: &[PermissionedUpdate], staleness_threshold: Duration, compute_unit_limit: u32, @@ -576,35 +577,36 @@ where } if let Some(publish_program_key) = publish_program_key { - let (instruction, unsupported_updates) = create_instruction_with_publish_program( + let instruction = create_instruction_with_publish_program( publish_keypair.pubkey(), publish_program_key, + publisher_buffer_key.context("must specify publisher_buffer_key if publish_program_key is specified")?, updates, )?; - updates = unsupported_updates; - instructions.push(instruction); - } - for update in updates { - let instruction = if let Some(accumulator_program_key) = accumulator_key { - create_instruction_with_accumulator( - publish_keypair.pubkey(), - oracle_program_key, - Pubkey::from(update.feed_id.to_bytes()), - &update.info, - network_state.current_slot, - accumulator_program_key, - )? - } else { - create_instruction_without_accumulator( - publish_keypair.pubkey(), - oracle_program_key, - Pubkey::from(update.feed_id.to_bytes()), - &update.info, - network_state.current_slot, - )? - }; - instructions.push(instruction); + } else { + for update in updates { + let instruction = if let Some(accumulator_program_key) = accumulator_key { + create_instruction_with_accumulator( + publish_keypair.pubkey(), + oracle_program_key, + Pubkey::from(update.feed_id.to_bytes()), + &update.info, + network_state.current_slot, + accumulator_program_key, + )? + } else { + create_instruction_without_accumulator( + publish_keypair.pubkey(), + oracle_program_key, + Pubkey::from(update.feed_id.to_bytes()), + &update.info, + network_state.current_slot, + )? + }; + + instructions.push(instruction); + } } // Pay priority fees, if configured @@ -800,28 +802,31 @@ fn create_instruction_without_accumulator( fn create_instruction_with_publish_program( publish_pubkey: Pubkey, publish_program_key: Pubkey, + publisher_buffer_key: Pubkey, prices: Vec, -) -> Result<(Instruction, Vec)> { - let mut unsupported_updates = Vec::new(); - let (buffer_key, _buffer_bump) = Pubkey::find_program_address( - &["BUFFER".as_bytes(), &publish_pubkey.to_bytes()], +) -> Result { + use pyth_price_publisher::instruction::{Instruction as PublishInstruction, SubmitPricesArgsHeader, PUBLISHER_CONFIG_SEED}; + let (publisher_config_key, publisher_config_bump) = Pubkey::find_program_address( + &[PUBLISHER_CONFIG_SEED.as_bytes(), &publish_pubkey.to_bytes()], &publish_program_key, ); let mut values = Vec::new(); for update in prices { if update.feed_index == 0 { - unsupported_updates.push(update); - } else { - values.push(PublisherPrice::new( - update.feed_index, - (update.info.status as u8).into(), - update.info.price, - update.info.conf, - )?); + bail!("no feed index for feed {:?}", update.feed_id); } + values.push(BufferedPrice::new( + update.feed_index, + (update.info.status as u8).into(), + update.info.price, + update.info.conf, + )?); } - let mut data = vec![1]; + let mut data = vec![PublishInstruction::SubmitPrices as u8]; + data.extend_from_slice(bytes_of(&SubmitPricesArgsHeader { + publisher_config_bump, + })); data.extend(cast_slice(&values)); let instruction = Instruction { @@ -833,14 +838,19 @@ fn create_instruction_with_publish_program( is_writable: true, }, AccountMeta { - pubkey: buffer_key, + pubkey: publisher_config_key, + is_signer: false, + is_writable: false, + }, + AccountMeta { + pubkey: publisher_buffer_key, is_signer: false, is_writable: true, }, ], data, }; - Ok((instruction, unsupported_updates)) + Ok(instruction) } fn create_instruction_with_accumulator( From 801b902bfaec1e88772dad3446eb777f1ec93176 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Thu, 5 Sep 2024 12:16:38 +0100 Subject: [PATCH 3/5] feat: fetch publisher buffer key --- src/agent/services/exporter.rs | 3 ++- src/agent/services/oracle.rs | 3 +++ src/agent/solana.rs | 21 ++++------------ src/agent/state/exporter.rs | 33 +++++++++++++++++++----- src/agent/state/oracle.rs | 46 +++++++++++++++++++++++++++++++++- 5 files changed, 82 insertions(+), 24 deletions(-) diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index 559eaa9..ac8832e 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -261,6 +261,7 @@ mod exporter { config.exporter.staleness_threshold, config.exporter.unchanged_publish_threshold, ).await { + let publisher_buffer_key = Exporter::get_publisher_buffer_key(&*state).await; if let Err(err) = publish_batches( state.clone(), client.clone(), @@ -270,7 +271,7 @@ mod exporter { &publish_keypair, key_store.oracle_program_key, key_store.publish_program_key, - key_store.publisher_buffer_key, + publisher_buffer_key, config.exporter.max_batch_size, config.exporter.staleness_threshold, config.exporter.compute_unit_limit, diff --git a/src/agent/services/oracle.rs b/src/agent/services/oracle.rs index 3876d30..7de2194 100644 --- a/src/agent/services/oracle.rs +++ b/src/agent/services/oracle.rs @@ -62,6 +62,7 @@ where state.clone(), key_store.mapping_key, key_store.publish_keypair, + key_store.publish_program_key, config.oracle.max_lookup_batch_size, ))); @@ -159,6 +160,7 @@ async fn poller( state: Arc, mapping_key: Pubkey, publish_keypair: Option, + publish_program_key: Option, max_lookup_batch_size: usize, ) where S: Oracle, @@ -183,6 +185,7 @@ async fn poller( network, mapping_key, publish_keypair.as_ref(), + publish_program_key, &client, max_lookup_batch_size, ) diff --git a/src/agent/solana.rs b/src/agent/solana.rs index ebbf4c3..3ea5858 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -103,13 +103,6 @@ pub mod key_store { default )] pub publish_program_key: Option, - /// The public key of the publisher's buffer for the Publish program - #[serde( - serialize_with = "opt_pubkey_string_ser", - deserialize_with = "opt_pubkey_string_de", - default - )] - pub publisher_buffer_key: Option, /// The public key of the root mapping account #[serde( serialize_with = "pubkey_string_ser", @@ -129,18 +122,15 @@ pub mod key_store { /// The keypair used to publish price updates. When None, /// publishing will not start until a new keypair is supplied /// via the remote loading endpoint - pub publish_keypair: Option, + pub publish_keypair: Option, /// Public key of the Oracle program - pub oracle_program_key: Pubkey, + pub oracle_program_key: Pubkey, /// Public key of the Publish program - pub publish_program_key: Option, - /// Public key of the publisher's buffer for the publish program - pub publisher_buffer_key: Option, - + pub publish_program_key: Option, /// Public key of the root mapping account - pub mapping_key: Pubkey, + pub mapping_key: Pubkey, /// Public key of the accumulator program (if provided) - pub accumulator_key: Option, + pub accumulator_key: Option, } impl KeyStore { @@ -161,7 +151,6 @@ pub mod key_store { publish_keypair, oracle_program_key: config.oracle_program_key, publish_program_key: config.publish_program_key, - publisher_buffer_key: config.publisher_buffer_key, mapping_key: config.mapping_key, accumulator_key: config.accumulator_key, }) diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index 514e9f6..05a9673 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -15,10 +15,16 @@ use { }, }, anyhow::{ - anyhow, bail, Context, Result + anyhow, + bail, + Context, + Result, }, bincode::Options, - bytemuck::{bytes_of, cast_slice}, + bytemuck::{ + bytes_of, + cast_slice, + }, chrono::Utc, futures_util::future::join_all, pyth_price_publisher::accounts::buffer::BufferedPrice, @@ -81,6 +87,8 @@ pub struct ExporterState { /// Currently known permissioned prices of this publisher along with their market hours our_prices: RwLock>, + publisher_buffer_key: RwLock>, + /// Recent compute unit price in micro lamports (set if dynamic compute unit pricing is enabled) recent_compute_unit_price_micro_lamports: RwLock>, } @@ -106,6 +114,7 @@ where staleness_threshold: Duration, unchanged_publish_threshold: Duration, ) -> Result>; + async fn get_publisher_buffer_key(&self) -> Option; async fn get_recent_compute_unit_price_micro_lamports(&self) -> Option; async fn update_recent_compute_unit_price( &self, @@ -114,11 +123,12 @@ where staleness_threshold: Duration, unchanged_publish_threshold: Duration, ) -> Result<()>; - async fn update_permissions( + async fn update_on_chain_state( &self, network: Network, publish_keypair: Option<&Keypair>, publisher_permissions: HashMap>, + publisher_buffer_key: Option, ) -> Result<()>; } @@ -267,6 +277,10 @@ where .collect::>()) } + async fn get_publisher_buffer_key(&self) -> Option { + *self.into().publisher_buffer_key.read().await + } + async fn get_recent_compute_unit_price_micro_lamports(&self) -> Option { *self .into() @@ -313,11 +327,12 @@ where } #[instrument(skip(self, publish_keypair, publisher_permissions))] - async fn update_permissions( + async fn update_on_chain_state( &self, network: Network, publish_keypair: Option<&Keypair>, publisher_permissions: HashMap>, + publisher_buffer_key: Option, ) -> Result<()> { let publish_keypair = get_publish_keypair(self, network, publish_keypair).await?; *self.into().our_prices.write().await = publisher_permissions @@ -330,6 +345,7 @@ where ); HashMap::new() }); + *self.into().publisher_buffer_key.write().await = publisher_buffer_key; Ok(()) } @@ -580,7 +596,8 @@ where let instruction = create_instruction_with_publish_program( publish_keypair.pubkey(), publish_program_key, - publisher_buffer_key.context("must specify publisher_buffer_key if publish_program_key is specified")?, + publisher_buffer_key + .context("must specify publisher_buffer_key if publish_program_key is specified")?, updates, )?; instructions.push(instruction); @@ -805,7 +822,11 @@ fn create_instruction_with_publish_program( publisher_buffer_key: Pubkey, prices: Vec, ) -> Result { - use pyth_price_publisher::instruction::{Instruction as PublishInstruction, SubmitPricesArgsHeader, PUBLISHER_CONFIG_SEED}; + use pyth_price_publisher::instruction::{ + Instruction as PublishInstruction, + SubmitPricesArgsHeader, + PUBLISHER_CONFIG_SEED, + }; let (publisher_config_key, publisher_config_bump) = Pubkey::find_program_address( &[PUBLISHER_CONFIG_SEED.as_bytes(), &publish_pubkey.to_bytes()], &publish_program_key, diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index 39b04b4..e343658 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -18,6 +18,7 @@ use { Context, Result, }, + pyth_price_publisher::instruction::PUBLISHER_CONFIG_SEED, pyth_sdk_solana::state::{ load_mapping_account, load_product_account, @@ -37,6 +38,7 @@ use { commitment_config::CommitmentLevel, pubkey::Pubkey, signature::Keypair, + signer::Signer, }, std::{ collections::{ @@ -135,6 +137,7 @@ pub struct Data { pub price_accounts: HashMap, /// publisher => {their permissioned price accounts => price publishing metadata} pub publisher_permissions: HashMap>, + pub publisher_buffer_key: Option, } #[derive(Clone, Serialize, Deserialize, Debug)] @@ -193,6 +196,7 @@ pub trait Oracle { network: Network, mapping_key: Pubkey, publish_keypair: Option<&Keypair>, + publish_program_key: Option, rpc_client: &RpcClient, max_lookup_batch_size: usize, ) -> Result<()>; @@ -267,6 +271,7 @@ where network: Network, mapping_key: Pubkey, publish_keypair: Option<&Keypair>, + publish_program_key: Option, rpc_client: &RpcClient, max_lookup_batch_size: usize, ) -> Result<()> { @@ -311,22 +316,44 @@ where } } + let mut publisher_buffer_key = None; + if let (Some(publish_program_key), Some(publish_keypair)) = + (publish_program_key, publish_keypair) + { + match fetch_publisher_buffer_key( + rpc_client, + publish_program_key, + publish_keypair.pubkey(), + ) + .await + { + Ok(r) => { + publisher_buffer_key = Some(r); + } + Err(err) => { + tracing::warn!("failed to fetch publisher buffer key: {:?}", err); + } + } + } + let new_data = Data { mapping_accounts, product_accounts, price_accounts, publisher_permissions, + publisher_buffer_key, }; let mut data = self.into().data.write().await; log_data_diff(&data, &new_data); *data = new_data; - Exporter::update_permissions( + Exporter::update_on_chain_state( self, network, publish_keypair, data.publisher_permissions.clone(), + data.publisher_buffer_key, ) .await?; @@ -367,6 +394,23 @@ where } } +async fn fetch_publisher_buffer_key( + rpc_client: &RpcClient, + publish_program_key: Pubkey, + publisher_pubkey: Pubkey, +) -> Result { + let (publisher_config_key, _bump) = Pubkey::find_program_address( + &[ + PUBLISHER_CONFIG_SEED.as_bytes(), + &publisher_pubkey.to_bytes(), + ], + &publish_program_key, + ); + let data = rpc_client.get_account_data(&publisher_config_key).await?; + let config = pyth_price_publisher::accounts::publisher_config::read(&data)?; + Ok(config.buffer_account.into()) +} + #[instrument(skip(rpc_client))] async fn fetch_mapping_accounts( rpc_client: &RpcClient, From 7fea315abe80655bb8135e8fdbf0b582a874c207 Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Fri, 6 Sep 2024 11:15:41 +0100 Subject: [PATCH 4/5] chore: update publisher program dependency --- Cargo.lock | 7 ++++--- Cargo.toml | 2 +- src/agent/state/exporter.rs | 4 ++-- src/agent/state/oracle.rs | 4 ++-- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d9aaec2..ed27413 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3424,7 +3424,7 @@ dependencies = [ "portpicker", "prometheus-client", "proptest", - "pyth-price-publisher", + "pyth-price-store", "pyth-sdk", "pyth-sdk-solana", "rand 0.8.5", @@ -3450,9 +3450,10 @@ dependencies = [ ] [[package]] -name = "pyth-price-publisher" +name = "pyth-price-store" version = "0.1.0" -source = "git+https://github.com/pyth-network/pyth-crosschain?rev=4df7172b#4df7172b85f9d1b65b117c58652e945a219f8b82" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bbb76932606741e8b690d51d3e78df620df4a13474a2ca5f53ded5f70522cdc" dependencies = [ "bytemuck", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 293a7ef..d518c36 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ tracing-opentelemetry = "0.24.0" opentelemetry = "0.23.0" opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"]} opentelemetry-otlp = { version = "0.16.0" } -pyth-price-publisher = { git = "https://github.com/pyth-network/pyth-crosschain", rev = "4df7172b" } +pyth-price-store = "0.1.0" bytemuck = "1.13.0" [dev-dependencies] diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index 05a9673..199b814 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -27,7 +27,7 @@ use { }, chrono::Utc, futures_util::future::join_all, - pyth_price_publisher::accounts::buffer::BufferedPrice, + pyth_price_store::accounts::buffer::BufferedPrice, pyth_sdk::Identifier, pyth_sdk_solana::state::PriceStatus, serde::Serialize, @@ -822,7 +822,7 @@ fn create_instruction_with_publish_program( publisher_buffer_key: Pubkey, prices: Vec, ) -> Result { - use pyth_price_publisher::instruction::{ + use pyth_price_store::instruction::{ Instruction as PublishInstruction, SubmitPricesArgsHeader, PUBLISHER_CONFIG_SEED, diff --git a/src/agent/state/oracle.rs b/src/agent/state/oracle.rs index e343658..c09a2ee 100644 --- a/src/agent/state/oracle.rs +++ b/src/agent/state/oracle.rs @@ -18,7 +18,7 @@ use { Context, Result, }, - pyth_price_publisher::instruction::PUBLISHER_CONFIG_SEED, + pyth_price_store::instruction::PUBLISHER_CONFIG_SEED, pyth_sdk_solana::state::{ load_mapping_account, load_product_account, @@ -407,7 +407,7 @@ async fn fetch_publisher_buffer_key( &publish_program_key, ); let data = rpc_client.get_account_data(&publisher_config_key).await?; - let config = pyth_price_publisher::accounts::publisher_config::read(&data)?; + let config = pyth_price_store::accounts::publisher_config::read(&data)?; Ok(config.buffer_account.into()) } From 53b334463cc667ba935dd13626346d243062a11c Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Tue, 10 Sep 2024 16:43:34 +0100 Subject: [PATCH 5/5] chore: bump agent version to 2.11.0 --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ed27413..344eb17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3400,7 +3400,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.10.4" +version = "2.11.0" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index d518c36..568cbd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.10.4" +version = "2.11.0" edition = "2021" [[bin]]