diff --git a/Cargo.lock b/Cargo.lock index b7331d3..d9aaec2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3452,7 +3452,7 @@ dependencies = [ [[package]] name = "pyth-price-publisher" version = "0.1.0" -source = "git+https://github.com/pyth-network/pyth-crosschain?branch=add-publisher-program#e487dfd7523422169011fd8344863d389ea04427" +source = "git+https://github.com/pyth-network/pyth-crosschain?rev=4df7172b#4df7172b85f9d1b65b117c58652e945a219f8b82" dependencies = [ "bytemuck", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 39b3edb..293a7ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ tracing-opentelemetry = "0.24.0" opentelemetry = "0.23.0" opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"]} opentelemetry-otlp = { version = "0.16.0" } -pyth-price-publisher = { git = "https://github.com/pyth-network/pyth-crosschain", branch = "add-publisher-program" } +pyth-price-publisher = { git = "https://github.com/pyth-network/pyth-crosschain", rev = "4df7172b" } bytemuck = "1.13.0" [dev-dependencies] diff --git a/src/agent/services/exporter.rs b/src/agent/services/exporter.rs index 3631cf8..559eaa9 100644 --- a/src/agent/services/exporter.rs +++ b/src/agent/services/exporter.rs @@ -270,6 +270,7 @@ mod exporter { &publish_keypair, key_store.oracle_program_key, key_store.publish_program_key, + key_store.publisher_buffer_key, config.exporter.max_batch_size, config.exporter.staleness_threshold, config.exporter.compute_unit_limit, diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 3ea5858..ebbf4c3 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -103,6 +103,13 @@ pub mod key_store { default )] pub publish_program_key: Option, + /// The public key of the publisher's buffer for the Publish program + #[serde( + serialize_with = "opt_pubkey_string_ser", + deserialize_with = "opt_pubkey_string_de", + default + )] + pub publisher_buffer_key: Option, /// The public key of the root mapping account #[serde( serialize_with = "pubkey_string_ser", @@ -122,15 +129,18 @@ pub mod key_store { /// The keypair used to publish price updates. When None, /// publishing will not start until a new keypair is supplied /// via the remote loading endpoint - pub publish_keypair: Option, + pub publish_keypair: Option, /// Public key of the Oracle program - pub oracle_program_key: Pubkey, + pub oracle_program_key: Pubkey, /// Public key of the Publish program - pub publish_program_key: Option, + pub publish_program_key: Option, + /// Public key of the publisher's buffer for the publish program + pub publisher_buffer_key: Option, + /// Public key of the root mapping account - pub mapping_key: Pubkey, + pub mapping_key: Pubkey, /// Public key of the accumulator program (if provided) - pub accumulator_key: Option, + pub accumulator_key: Option, } impl KeyStore { @@ -151,6 +161,7 @@ pub mod key_store { publish_keypair, oracle_program_key: config.oracle_program_key, publish_program_key: config.publish_program_key, + publisher_buffer_key: config.publisher_buffer_key, mapping_key: config.mapping_key, accumulator_key: config.accumulator_key, }) diff --git a/src/agent/state/exporter.rs b/src/agent/state/exporter.rs index aa8ca85..514e9f6 100644 --- a/src/agent/state/exporter.rs +++ b/src/agent/state/exporter.rs @@ -15,15 +15,13 @@ use { }, }, anyhow::{ - anyhow, - Context, - Result, + anyhow, bail, Context, Result }, bincode::Options, - bytemuck::cast_slice, + bytemuck::{bytes_of, cast_slice}, chrono::Utc, futures_util::future::join_all, - pyth_price_publisher::accounts::publisher_prices::PublisherPrice, + pyth_price_publisher::accounts::buffer::BufferedPrice, pyth_sdk::Identifier, pyth_sdk_solana::state::PriceStatus, serde::Serialize, @@ -451,6 +449,7 @@ pub async fn publish_batches( publish_keypair: &Keypair, oracle_program_key: Pubkey, publish_program_key: Option, + publisher_buffer_key: Option, max_batch_size: usize, staleness_threshold: Duration, compute_unit_limit: u32, @@ -489,6 +488,7 @@ where publish_keypair, oracle_program_key, publish_program_key, + publisher_buffer_key, batch, staleness_threshold, compute_unit_limit, @@ -533,6 +533,7 @@ async fn publish_batch( publish_keypair: &Keypair, oracle_program_key: Pubkey, publish_program_key: Option, + publisher_buffer_key: Option, batch: &[PermissionedUpdate], staleness_threshold: Duration, compute_unit_limit: u32, @@ -576,35 +577,36 @@ where } if let Some(publish_program_key) = publish_program_key { - let (instruction, unsupported_updates) = create_instruction_with_publish_program( + let instruction = create_instruction_with_publish_program( publish_keypair.pubkey(), publish_program_key, + publisher_buffer_key.context("must specify publisher_buffer_key if publish_program_key is specified")?, updates, )?; - updates = unsupported_updates; - instructions.push(instruction); - } - for update in updates { - let instruction = if let Some(accumulator_program_key) = accumulator_key { - create_instruction_with_accumulator( - publish_keypair.pubkey(), - oracle_program_key, - Pubkey::from(update.feed_id.to_bytes()), - &update.info, - network_state.current_slot, - accumulator_program_key, - )? - } else { - create_instruction_without_accumulator( - publish_keypair.pubkey(), - oracle_program_key, - Pubkey::from(update.feed_id.to_bytes()), - &update.info, - network_state.current_slot, - )? - }; - instructions.push(instruction); + } else { + for update in updates { + let instruction = if let Some(accumulator_program_key) = accumulator_key { + create_instruction_with_accumulator( + publish_keypair.pubkey(), + oracle_program_key, + Pubkey::from(update.feed_id.to_bytes()), + &update.info, + network_state.current_slot, + accumulator_program_key, + )? + } else { + create_instruction_without_accumulator( + publish_keypair.pubkey(), + oracle_program_key, + Pubkey::from(update.feed_id.to_bytes()), + &update.info, + network_state.current_slot, + )? + }; + + instructions.push(instruction); + } } // Pay priority fees, if configured @@ -800,28 +802,31 @@ fn create_instruction_without_accumulator( fn create_instruction_with_publish_program( publish_pubkey: Pubkey, publish_program_key: Pubkey, + publisher_buffer_key: Pubkey, prices: Vec, -) -> Result<(Instruction, Vec)> { - let mut unsupported_updates = Vec::new(); - let (buffer_key, _buffer_bump) = Pubkey::find_program_address( - &["BUFFER".as_bytes(), &publish_pubkey.to_bytes()], +) -> Result { + use pyth_price_publisher::instruction::{Instruction as PublishInstruction, SubmitPricesArgsHeader, PUBLISHER_CONFIG_SEED}; + let (publisher_config_key, publisher_config_bump) = Pubkey::find_program_address( + &[PUBLISHER_CONFIG_SEED.as_bytes(), &publish_pubkey.to_bytes()], &publish_program_key, ); let mut values = Vec::new(); for update in prices { if update.feed_index == 0 { - unsupported_updates.push(update); - } else { - values.push(PublisherPrice::new( - update.feed_index, - (update.info.status as u8).into(), - update.info.price, - update.info.conf, - )?); + bail!("no feed index for feed {:?}", update.feed_id); } + values.push(BufferedPrice::new( + update.feed_index, + (update.info.status as u8).into(), + update.info.price, + update.info.conf, + )?); } - let mut data = vec![1]; + let mut data = vec![PublishInstruction::SubmitPrices as u8]; + data.extend_from_slice(bytes_of(&SubmitPricesArgsHeader { + publisher_config_bump, + })); data.extend(cast_slice(&values)); let instruction = Instruction { @@ -833,14 +838,19 @@ fn create_instruction_with_publish_program( is_writable: true, }, AccountMeta { - pubkey: buffer_key, + pubkey: publisher_config_key, + is_signer: false, + is_writable: false, + }, + AccountMeta { + pubkey: publisher_buffer_key, is_signer: false, is_writable: true, }, ], data, }; - Ok((instruction, unsupported_updates)) + Ok(instruction) } fn create_instruction_with_accumulator(