Skip to content

Commit

Permalink
feat: fetch publisher buffer key
Browse files Browse the repository at this point in the history
  • Loading branch information
Riateche committed Sep 5, 2024
1 parent 0045bdc commit 801b902
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 24 deletions.
3 changes: 2 additions & 1 deletion src/agent/services/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ mod exporter {
config.exporter.staleness_threshold,
config.exporter.unchanged_publish_threshold,
).await {
let publisher_buffer_key = Exporter::get_publisher_buffer_key(&*state).await;
if let Err(err) = publish_batches(
state.clone(),
client.clone(),
Expand All @@ -270,7 +271,7 @@ mod exporter {
&publish_keypair,
key_store.oracle_program_key,
key_store.publish_program_key,
key_store.publisher_buffer_key,
publisher_buffer_key,
config.exporter.max_batch_size,
config.exporter.staleness_threshold,
config.exporter.compute_unit_limit,
Expand Down
3 changes: 3 additions & 0 deletions src/agent/services/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ where
state.clone(),
key_store.mapping_key,
key_store.publish_keypair,
key_store.publish_program_key,
config.oracle.max_lookup_batch_size,
)));

Expand Down Expand Up @@ -159,6 +160,7 @@ async fn poller<S>(
state: Arc<S>,
mapping_key: Pubkey,
publish_keypair: Option<Keypair>,
publish_program_key: Option<Pubkey>,
max_lookup_batch_size: usize,
) where
S: Oracle,
Expand All @@ -183,6 +185,7 @@ async fn poller<S>(
network,
mapping_key,
publish_keypair.as_ref(),
publish_program_key,
&client,
max_lookup_batch_size,
)
Expand Down
21 changes: 5 additions & 16 deletions src/agent/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,6 @@ pub mod key_store {
default
)]
pub publish_program_key: Option<Pubkey>,
/// The public key of the publisher's buffer for the Publish program
#[serde(
serialize_with = "opt_pubkey_string_ser",
deserialize_with = "opt_pubkey_string_de",
default
)]
pub publisher_buffer_key: Option<Pubkey>,
/// The public key of the root mapping account
#[serde(
serialize_with = "pubkey_string_ser",
Expand All @@ -129,18 +122,15 @@ pub mod key_store {
/// The keypair used to publish price updates. When None,
/// publishing will not start until a new keypair is supplied
/// via the remote loading endpoint
pub publish_keypair: Option<Keypair>,
pub publish_keypair: Option<Keypair>,
/// Public key of the Oracle program
pub oracle_program_key: Pubkey,
pub oracle_program_key: Pubkey,
/// Public key of the Publish program
pub publish_program_key: Option<Pubkey>,
/// Public key of the publisher's buffer for the publish program
pub publisher_buffer_key: Option<Pubkey>,

pub publish_program_key: Option<Pubkey>,
/// Public key of the root mapping account
pub mapping_key: Pubkey,
pub mapping_key: Pubkey,
/// Public key of the accumulator program (if provided)
pub accumulator_key: Option<Pubkey>,
pub accumulator_key: Option<Pubkey>,
}

impl KeyStore {
Expand All @@ -161,7 +151,6 @@ pub mod key_store {
publish_keypair,
oracle_program_key: config.oracle_program_key,
publish_program_key: config.publish_program_key,
publisher_buffer_key: config.publisher_buffer_key,
mapping_key: config.mapping_key,
accumulator_key: config.accumulator_key,
})
Expand Down
33 changes: 27 additions & 6 deletions src/agent/state/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@ use {
},
},
anyhow::{
anyhow, bail, Context, Result
anyhow,
bail,
Context,
Result,
},
bincode::Options,
bytemuck::{bytes_of, cast_slice},
bytemuck::{
bytes_of,
cast_slice,
},
chrono::Utc,
futures_util::future::join_all,
pyth_price_publisher::accounts::buffer::BufferedPrice,
Expand Down Expand Up @@ -81,6 +87,8 @@ pub struct ExporterState {
/// Currently known permissioned prices of this publisher along with their market hours
our_prices: RwLock<HashMap<Pubkey, PricePublishingMetadata>>,

publisher_buffer_key: RwLock<Option<Pubkey>>,

/// Recent compute unit price in micro lamports (set if dynamic compute unit pricing is enabled)
recent_compute_unit_price_micro_lamports: RwLock<Option<u64>>,
}
Expand All @@ -106,6 +114,7 @@ where
staleness_threshold: Duration,
unchanged_publish_threshold: Duration,
) -> Result<Vec<PermissionedUpdate>>;
async fn get_publisher_buffer_key(&self) -> Option<Pubkey>;
async fn get_recent_compute_unit_price_micro_lamports(&self) -> Option<u64>;
async fn update_recent_compute_unit_price(
&self,
Expand All @@ -114,11 +123,12 @@ where
staleness_threshold: Duration,
unchanged_publish_threshold: Duration,
) -> Result<()>;
async fn update_permissions(
async fn update_on_chain_state(
&self,
network: Network,
publish_keypair: Option<&Keypair>,
publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>,
publisher_buffer_key: Option<Pubkey>,
) -> Result<()>;
}

Expand Down Expand Up @@ -267,6 +277,10 @@ where
.collect::<Vec<_>>())
}

async fn get_publisher_buffer_key(&self) -> Option<Pubkey> {
*self.into().publisher_buffer_key.read().await
}

async fn get_recent_compute_unit_price_micro_lamports(&self) -> Option<u64> {
*self
.into()
Expand Down Expand Up @@ -313,11 +327,12 @@ where
}

#[instrument(skip(self, publish_keypair, publisher_permissions))]
async fn update_permissions(
async fn update_on_chain_state(
&self,
network: Network,
publish_keypair: Option<&Keypair>,
publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>,
publisher_buffer_key: Option<Pubkey>,
) -> Result<()> {
let publish_keypair = get_publish_keypair(self, network, publish_keypair).await?;
*self.into().our_prices.write().await = publisher_permissions
Expand All @@ -330,6 +345,7 @@ where
);
HashMap::new()
});
*self.into().publisher_buffer_key.write().await = publisher_buffer_key;

Ok(())
}
Expand Down Expand Up @@ -580,7 +596,8 @@ where
let instruction = create_instruction_with_publish_program(
publish_keypair.pubkey(),
publish_program_key,
publisher_buffer_key.context("must specify publisher_buffer_key if publish_program_key is specified")?,
publisher_buffer_key
.context("must specify publisher_buffer_key if publish_program_key is specified")?,
updates,
)?;
instructions.push(instruction);
Expand Down Expand Up @@ -805,7 +822,11 @@ fn create_instruction_with_publish_program(
publisher_buffer_key: Pubkey,
prices: Vec<PermissionedUpdate>,
) -> Result<Instruction> {
use pyth_price_publisher::instruction::{Instruction as PublishInstruction, SubmitPricesArgsHeader, PUBLISHER_CONFIG_SEED};
use pyth_price_publisher::instruction::{
Instruction as PublishInstruction,
SubmitPricesArgsHeader,
PUBLISHER_CONFIG_SEED,
};
let (publisher_config_key, publisher_config_bump) = Pubkey::find_program_address(
&[PUBLISHER_CONFIG_SEED.as_bytes(), &publish_pubkey.to_bytes()],
&publish_program_key,
Expand Down
46 changes: 45 additions & 1 deletion src/agent/state/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use {
Context,
Result,
},
pyth_price_publisher::instruction::PUBLISHER_CONFIG_SEED,
pyth_sdk_solana::state::{
load_mapping_account,
load_product_account,
Expand All @@ -37,6 +38,7 @@ use {
commitment_config::CommitmentLevel,
pubkey::Pubkey,
signature::Keypair,
signer::Signer,
},
std::{
collections::{
Expand Down Expand Up @@ -135,6 +137,7 @@ pub struct Data {
pub price_accounts: HashMap<Pubkey, PriceEntry>,
/// publisher => {their permissioned price accounts => price publishing metadata}
pub publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>,
pub publisher_buffer_key: Option<Pubkey>,
}

#[derive(Clone, Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -193,6 +196,7 @@ pub trait Oracle {
network: Network,
mapping_key: Pubkey,
publish_keypair: Option<&Keypair>,
publish_program_key: Option<Pubkey>,
rpc_client: &RpcClient,
max_lookup_batch_size: usize,
) -> Result<()>;
Expand Down Expand Up @@ -267,6 +271,7 @@ where
network: Network,
mapping_key: Pubkey,
publish_keypair: Option<&Keypair>,
publish_program_key: Option<Pubkey>,
rpc_client: &RpcClient,
max_lookup_batch_size: usize,
) -> Result<()> {
Expand Down Expand Up @@ -311,22 +316,44 @@ where
}
}

let mut publisher_buffer_key = None;
if let (Some(publish_program_key), Some(publish_keypair)) =
(publish_program_key, publish_keypair)
{
match fetch_publisher_buffer_key(
rpc_client,
publish_program_key,
publish_keypair.pubkey(),
)
.await
{
Ok(r) => {
publisher_buffer_key = Some(r);
}
Err(err) => {
tracing::warn!("failed to fetch publisher buffer key: {:?}", err);
}
}
}

let new_data = Data {
mapping_accounts,
product_accounts,
price_accounts,
publisher_permissions,
publisher_buffer_key,
};

let mut data = self.into().data.write().await;
log_data_diff(&data, &new_data);
*data = new_data;

Exporter::update_permissions(
Exporter::update_on_chain_state(
self,
network,
publish_keypair,
data.publisher_permissions.clone(),
data.publisher_buffer_key,
)
.await?;

Expand Down Expand Up @@ -367,6 +394,23 @@ where
}
}

async fn fetch_publisher_buffer_key(
rpc_client: &RpcClient,
publish_program_key: Pubkey,
publisher_pubkey: Pubkey,
) -> Result<Pubkey> {
let (publisher_config_key, _bump) = Pubkey::find_program_address(
&[
PUBLISHER_CONFIG_SEED.as_bytes(),
&publisher_pubkey.to_bytes(),
],
&publish_program_key,
);
let data = rpc_client.get_account_data(&publisher_config_key).await?;
let config = pyth_price_publisher::accounts::publisher_config::read(&data)?;
Ok(config.buffer_account.into())
}

#[instrument(skip(rpc_client))]
async fn fetch_mapping_accounts(
rpc_client: &RpcClient,
Expand Down

0 comments on commit 801b902

Please sign in to comment.