Skip to content

Commit

Permalink
add more traces to oracle state
Browse files Browse the repository at this point in the history
  • Loading branch information
cctdaniel committed Jul 12, 2024
1 parent a62a623 commit cb0fb12
Showing 1 changed file with 51 additions and 54 deletions.
105 changes: 51 additions & 54 deletions src/agent/state/oracle.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,23 @@
#[allow(deprecated)]
use crate::agent::legacy_schedule::LegacySchedule;
use {
super::{
super::solana::network::Network,
exporter::Exporter,
},
super::{super::solana::network::Network, exporter::Exporter},
crate::agent::{
market_schedule::MarketSchedule,
state::{
global::Update,
Prices,
State,
},
},
anyhow::{
anyhow,
Context,
Result,
state::{global::Update, Prices, State},
},
anyhow::{anyhow, Context, Result},
pyth_sdk_solana::state::{
load_mapping_account,
load_product_account,
GenericPriceAccount,
MappingAccount,
PriceComp,
PythnetPriceAccount,
SolanaPriceAccount,
},
serde::{
Deserialize,
Serialize,
load_mapping_account, load_product_account, GenericPriceAccount, MappingAccount, PriceComp,
PythnetPriceAccount, SolanaPriceAccount,
},
serde::{Deserialize, Serialize},
solana_client::nonblocking::rpc_client::RpcClient,
solana_sdk::{
account::Account,
commitment_config::CommitmentLevel,
pubkey::Pubkey,
signature::Keypair,
account::Account, commitment_config::CommitmentLevel, pubkey::Pubkey, signature::Keypair,
},
std::{
collections::{
HashMap,
HashSet,
},
collections::{HashMap, HashSet},
time::Duration,
},
tokio::sync::RwLock,
Expand All @@ -51,15 +26,15 @@ use {

#[derive(Debug, Clone)]
pub struct ProductEntry {
pub account_data: pyth_sdk_solana::state::ProductAccount,
pub schedule: MarketSchedule,
pub price_accounts: Vec<Pubkey>,
pub account_data: pyth_sdk_solana::state::ProductAccount,
pub schedule: MarketSchedule,
pub price_accounts: Vec<Pubkey>,
pub publish_interval: Option<Duration>,
}

#[derive(Default, Debug, Clone)]
pub struct PricePublishingMetadata {
pub schedule: MarketSchedule,
pub schedule: MarketSchedule,
pub publish_interval: Option<Duration>,
}

Expand All @@ -77,7 +52,7 @@ pub struct PricePublishingMetadata {
#[derive(Copy, Clone, Debug)]
pub struct PriceEntry {
// We intentionally act as if we have a truncated account where the underlying memory is unavailable.
account: GenericPriceAccount<0, ()>,
account: GenericPriceAccount<0, ()>,
pub comp: [PriceComp; 64],
}

Expand Down Expand Up @@ -129,9 +104,9 @@ impl std::ops::Deref for PriceEntry {

#[derive(Default, Debug, Clone)]
pub struct Data {
pub mapping_accounts: HashMap<Pubkey, MappingAccount>,
pub product_accounts: HashMap<Pubkey, ProductEntry>,
pub price_accounts: HashMap<Pubkey, PriceEntry>,
pub mapping_accounts: HashMap<Pubkey, MappingAccount>,
pub product_accounts: HashMap<Pubkey, ProductEntry>,
pub price_accounts: HashMap<Pubkey, PriceEntry>,
/// publisher => {their permissioned price accounts => price publishing metadata}
pub publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>,
}
Expand All @@ -140,16 +115,16 @@ pub struct Data {
#[serde(default)]
pub struct Config {
/// The commitment level to use when reading data from the RPC node.
pub commitment: CommitmentLevel,
pub commitment: CommitmentLevel,
/// The interval with which to poll account information.
#[serde(with = "humantime_serde")]
pub poll_interval_duration: Duration,
pub poll_interval_duration: Duration,
/// Whether subscribing to account updates over websocket is enabled
pub subscriber_enabled: bool,
pub subscriber_enabled: bool,
/// Capacity of the channel over which the Subscriber sends updates to the Oracle
pub updates_channel_capacity: usize,
/// Capacity of the channel over which the Poller sends data to the Oracle
pub data_channel_capacity: usize,
pub data_channel_capacity: usize,

/// Ask the RPC for up to this many product/price accounts in a
/// single request. Tune this setting if you're experiencing
Expand All @@ -162,12 +137,12 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Self {
commitment: CommitmentLevel::Confirmed,
poll_interval_duration: Duration::from_secs(5),
subscriber_enabled: true,
commitment: CommitmentLevel::Confirmed,
poll_interval_duration: Duration::from_secs(5),
subscriber_enabled: true,
updates_channel_capacity: 10000,
data_channel_capacity: 10000,
max_lookup_batch_size: 100,
data_channel_capacity: 10000,
max_lookup_batch_size: 100,
}
}
}
Expand All @@ -178,6 +153,7 @@ pub struct OracleState {

impl OracleState {
pub fn new() -> Self {
tracing::info!("Initializing OracleState");
Self {
data: Default::default(),
}
Expand Down Expand Up @@ -232,6 +208,7 @@ where
// We are only interested in price account updates, all other types of updates
// will be fetched using polling.
if !data.price_accounts.contains_key(account_key) {
tracing::info!("Account key not found in price accounts, skipping update.");
return Ok(());
}

Expand All @@ -247,13 +224,14 @@ where
);

data.price_accounts.insert(*account_key, price_entry);
tracing::info!("Updated price account for key: {}", account_key);

Prices::update_global_price(
self,
network,
&Update::PriceAccountUpdate {
account_key: *account_key,
account: price_entry,
account: price_entry,
},
)
.await?;
Expand All @@ -271,14 +249,17 @@ where
rpc_client: &RpcClient,
max_lookup_batch_size: usize,
) -> Result<()> {
tracing::info!("Polling updates for network: {:?}", network);
let mut publisher_permissions = HashMap::new();
let mapping_accounts = fetch_mapping_accounts(rpc_client, mapping_key).await?;
tracing::info!("Fetched mapping accounts.");
let (product_accounts, price_accounts) = fetch_product_and_price_accounts(
rpc_client,
max_lookup_batch_size,
mapping_accounts.values(),
)
.await?;
tracing::info!("Fetched product and price accounts.");

for (price_key, price_entry) in price_accounts.iter() {
for component in price_entry.comp {
Expand All @@ -294,7 +275,7 @@ where
product_accounts.get(&price_entry.prod)
{
PricePublishingMetadata {
schedule: prod_entry.schedule.clone(),
schedule: prod_entry.schedule.clone(),
publish_interval: prod_entry.publish_interval,
}
} else {
Expand All @@ -320,6 +301,7 @@ where
let mut data = self.into().data.write().await;
log_data_diff(&data, &new_data);
*data = new_data;
tracing::info!("Updated OracleState data.");

Exporter::update_permissions(
self,
Expand All @@ -335,6 +317,7 @@ where
/// Sync Product/Price Accounts found by polling to the Global Store.
#[instrument(skip(self))]
async fn sync_global_store(&self, network: Network) -> Result<()> {
tracing::info!("Syncing global store for network: {:?}", network);
for (product_account_key, product_account) in
&self.into().data.read().await.product_accounts
{
Expand All @@ -343,7 +326,7 @@ where
network,
&Update::ProductAccountUpdate {
account_key: *product_account_key,
account: product_account.clone(),
account: product_account.clone(),
},
)
.await
Expand All @@ -356,13 +339,14 @@ where
network,
&Update::PriceAccountUpdate {
account_key: *price_account_key,
account: *price_account,
account: *price_account,
},
)
.await
.map_err(|_| anyhow!("failed to notify price account update"))?;
}

tracing::info!("Global store sync completed.");
Ok(())
}
}
Expand All @@ -372,6 +356,10 @@ async fn fetch_mapping_accounts(
rpc_client: &RpcClient,
mapping_account_key: Pubkey,
) -> Result<HashMap<Pubkey, MappingAccount>> {
tracing::info!(
"Fetching mapping accounts starting from key: {}",
mapping_account_key
);
let mut accounts = HashMap::new();
let mut account_key = mapping_account_key;
while account_key != Pubkey::default() {
Expand All @@ -384,6 +372,7 @@ async fn fetch_mapping_accounts(
accounts.insert(account_key, account);
account_key = account.next;
}
tracing::info!("Fetched {} mapping accounts.", accounts.len());
Ok(accounts)
}

Expand All @@ -396,6 +385,7 @@ async fn fetch_product_and_price_accounts<'a, A>(
where
A: IntoIterator<Item = &'a MappingAccount>,
{
tracing::info!("Fetching product and price accounts.");
let mut product_keys = vec![];

// Get all product keys
Expand All @@ -421,13 +411,19 @@ where
price_entries.extend(batch_prices.drain());
}

tracing::info!(
"Fetched {} product entries and {} price entries.",
product_entries.len(),
price_entries.len()
);
Ok((product_entries, price_entries))
}

async fn fetch_batch_of_product_and_price_accounts(
rpc_client: &RpcClient,
product_key_batch: &[Pubkey],
) -> Result<(HashMap<Pubkey, ProductEntry>, HashMap<Pubkey, PriceEntry>)> {
tracing::info!("Fetching batch of product and price accounts.");
let mut product_entries = HashMap::new();

let product_keys = product_key_batch;
Expand Down Expand Up @@ -568,6 +564,7 @@ async fn fetch_batch_of_product_and_price_accounts(

todo = next_todo;
}
tracing::info!("Fetched batch of product and price accounts.");
Ok((product_entries, price_entries))
}

Expand Down

0 comments on commit cb0fb12

Please sign in to comment.