Skip to content

Commit

Permalink
refactor(agent): extract oracle component/service
Browse files Browse the repository at this point in the history
  • Loading branch information
Reisen committed Jun 26, 2024
1 parent 216ab59 commit 96af23e
Show file tree
Hide file tree
Showing 11 changed files with 922 additions and 969 deletions.
40 changes: 33 additions & 7 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,45 @@ impl Agent {
// Create the Application State.
let state = Arc::new(state::State::new(self.config.state.clone()).await);

// Spawn the primary network
jhs.extend(network::spawn_network(
self.config.primary_network.clone(),
network::Network::Primary,
state.clone(),
)?);
// Spawn the primary network Oracle.
{
// Publisher permissions updates between oracle and exporter
let (publisher_permissions_tx, publisher_permissions_rx) =
watch::channel(<_>::default());

jhs.push(tokio::spawn(services::oracle(
self.config.primary_network.clone(),
network::Network::Primary,
state.clone(),
publisher_permissions_tx.clone(),
)));

// Spawn the primary network
jhs.extend(network::spawn_network(
self.config.primary_network.clone(),
network::Network::Primary,
state.clone(),
publisher_permissions_rx.clone(),
)?);
}

// Spawn the secondary network, if needed
// Spawn the secondary network Oracle, if needed.
if let Some(config) = &self.config.secondary_network {
let (publisher_permissions_tx, publisher_permissions_rx) =
watch::channel(<_>::default());

jhs.push(tokio::spawn(services::oracle(
config.clone(),
network::Network::Secondary,
state.clone(),
publisher_permissions_tx.clone(),
)));

jhs.extend(network::spawn_network(
config.clone(),
network::Network::Secondary,
state.clone(),
publisher_permissions_rx,
)?);
}

Expand Down
2 changes: 1 addition & 1 deletion src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
super::state::local::PriceInfo,
crate::agent::solana::oracle::PriceEntry,
crate::agent::state::oracle::PriceEntry,
lazy_static::lazy_static,
prometheus_client::{
encoding::{
Expand Down
2 changes: 2 additions & 0 deletions src/agent/services.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub mod keypairs;
pub mod notifier;
pub mod oracle;

pub use {
keypairs::keypairs,
notifier::notifier,
oracle::oracle,
};
196 changes: 196 additions & 0 deletions src/agent/services/oracle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
//! Oracle
//!
//! The Oracle service is respoinsible for reacting to all remote/on-chain events.
use {
crate::agent::{
solana::{
key_store::KeyStore,
network::{
Config,
Network,
},
},
state::oracle::{
Oracle,
PricePublishingMetadata,
},
},
anyhow::Result,
solana_account_decoder::UiAccountEncoding,
solana_client::{
nonblocking::{
pubsub_client::PubsubClient,
rpc_client::RpcClient,
},
rpc_config::{
RpcAccountInfoConfig,
RpcProgramAccountsConfig,
},
},
solana_sdk::{
account::Account,
commitment_config::CommitmentConfig,
pubkey::Pubkey,
},
std::{
collections::HashMap,
sync::Arc,
time::{
Duration,
Instant,
},
},
tokio::sync::watch::Sender,
tokio_stream::StreamExt,
};

pub async fn oracle<S>(
config: Config,
network: Network,
state: Arc<S>,
publisher_permissions_tx: Sender<HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>>,
) where
S: Oracle,
S: Send + Sync + 'static,
{
let Ok(key_store) = KeyStore::new(config.key_store.clone()) else {
tracing::warn!("Key store not available, Oracle won't start.");
return;
};

tokio::spawn(poller(
config.clone(),
network,
state.clone(),
key_store.mapping_key,
config.oracle.max_lookup_batch_size,
publisher_permissions_tx.clone(),
));

if config.oracle.subscriber_enabled {
tokio::spawn(async move {
loop {
let current_time = Instant::now();
if let Err(ref err) = subscriber(
config.clone(),
network,
state.clone(),
key_store.program_key,
)
.await
{
tracing::error!(err = ?err, "Subscriber exited unexpectedly.");
if current_time.elapsed() < Duration::from_secs(30) {
tracing::warn!("Subscriber restarting too quickly. Sleeping for 1 second.");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
}
}

/// When an account RPC Subscription update is receiveed.
///
/// We check if the account is one we're aware of and tracking, and if so, spawn
/// a small background task that handles that update. We only do this for price
/// accounts, all other accounts are handled below in the poller.
async fn subscriber<S>(
config: Config,
network: Network,
state: Arc<S>,
program_key: Pubkey,
) -> Result<()>
where
S: Oracle,
S: Send + Sync + 'static,
{
// Setup PubsubClient to listen for account changes on the Oracle program.
let client = PubsubClient::new(config.wss_url.as_str()).await?;

let (mut notifier, _unsub) = {
let program_key = program_key;
let commitment = config.oracle.commitment;
let config = RpcProgramAccountsConfig {
account_config: RpcAccountInfoConfig {
commitment: Some(CommitmentConfig { commitment }),
encoding: Some(UiAccountEncoding::Base64Zstd),
..Default::default()
},
filters: None,
with_context: Some(true),
};
client.program_subscribe(&program_key, Some(config)).await
}?;

while let Some(update) = notifier.next().await {
match update.value.account.decode::<Account>() {
Some(account) => {
let pubkey: Pubkey = update.value.pubkey.as_str().try_into()?;
let state = state.clone();
tokio::spawn(async move {
if let Err(err) =
Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
.await
{
tracing::error!(err = ?err, "Failed to handle account update.");
}
});
}

None => {
tracing::error!(
update = ?update,
"Failed to decode account from update.",
);
}
}
}

tracing::debug!("Subscriber closed connection.");
return Ok(());
}

/// On poll lookup all Pyth Mapping/Product/Price accounts and sync.
async fn poller<S>(
config: Config,
network: Network,
state: Arc<S>,
mapping_key: Pubkey,
max_lookup_batch_size: usize,
publisher_permissions_tx: Sender<HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>>,
) where
S: Oracle,
S: Send + Sync + 'static,
{
// Setup an RpcClient for manual polling.
let mut tick = tokio::time::interval(config.oracle.poll_interval_duration);
let client = Arc::new(RpcClient::new_with_timeout_and_commitment(
config.rpc_url,
config.rpc_timeout,
CommitmentConfig {
commitment: config.oracle.commitment,
},
));

loop {
tick.tick().await;
tracing::debug!("Polling for updates.");
if let Err(err) = async {
Oracle::poll_updates(
&*state,
mapping_key,
&client,
max_lookup_batch_size,
publisher_permissions_tx.clone(),
)
.await?;
Oracle::sync_global_store(&*state, network).await
}
.await
{
tracing::error!(err = ?err, "Failed to handle poll updates.");
}
}
}
68 changes: 50 additions & 18 deletions src/agent/solana.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod exporter;
pub mod oracle;

/// This module encapsulates all the interaction with a single Solana network:
/// - The Oracle, which reads data from the network
Expand All @@ -12,15 +11,22 @@ pub mod network {
self,
KeyStore,
},
oracle,
},
crate::agent::state::State,
crate::agent::state::{
oracle::{
self,
PricePublishingMetadata,
},
State,
},
anyhow::Result,
serde::{
Deserialize,
Serialize,
},
solana_sdk::pubkey::Pubkey,
std::{
collections::HashMap,
sync::Arc,
time::Duration,
},
Expand Down Expand Up @@ -70,25 +76,51 @@ pub mod network {
pub exporter: exporter::Config,
}

/// Spawn an Oracle, in-progress porting this to State.
///
/// Behaviour:
/// - Spawns Oracle: (Obsolete, now Extracted to state/oracle.rs)
/// - Spawns a Subscriber:
/// o Subscribes to the Oracle program key.
/// o Decodes account events related to the Oracle.
/// o Sends update.
/// - Spawns a Poller:
/// o Fetches Mapping Accounts
/// o Iterates Product+Price Accounts
/// o Sends update.
/// - Oracle then Listens for Updates from Subscriber
/// o Filters for Price Account Updates.
/// o Stores its own copy of the Price Account.
/// o Updates the Global Store for that Price Account.
/// - Oracle also Listens for Updates from Poller
/// o Tracks if any new Mapping Accounts were found.
/// o Update Local Data
/// o Updates entire Global Store View.
/// - Spawns Exporter:
/// - Spawns NetworkQuerier
/// - Queries BlockHash in a timer.
/// - Sends BlockHash + Slot
/// - Spawns Transaction Monitor:
/// - Listens for for Transactions
/// - Adds to tracked Transactions
/// - Responds to queries about Tx status.
/// - Spawns Exporter
/// - On Publish tick: pushes updates to the network as a batch.
/// - On Compute Unit Price Tick: calculates new median price fee from recent
///
/// Plan:
/// - Subscriber & Poller Can Be Spawnable Tasks
/// - Oracle becomes a State API
/// -
pub fn spawn_network(
config: Config,
network: Network,
state: Arc<State>,
publisher_permissions_rx: watch::Receiver<
HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>,
>,
) -> Result<Vec<JoinHandle<()>>> {
// Publisher permissions updates between oracle and exporter
let (publisher_permissions_tx, publisher_permissions_rx) = watch::channel(<_>::default());

// Spawn the Oracle
let mut jhs = oracle::spawn_oracle(
config.oracle.clone(),
network,
&config.rpc_url,
&config.wss_url,
config.rpc_timeout,
publisher_permissions_tx,
KeyStore::new(config.key_store.clone())?,
state.clone(),
);
let mut jhs = vec![];

// Spawn the Exporter
let exporter_jhs = exporter::spawn_exporter(
Expand All @@ -108,7 +140,7 @@ pub mod network {
}

/// The key_store module is responsible for parsing the pythd key store.
mod key_store {
pub mod key_store {
use {
anyhow::Result,
serde::{
Expand Down
3 changes: 2 additions & 1 deletion src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use {
super::{
key_store,
network::Network,
oracle::PricePublishingMetadata,
},
crate::agent::state::{
global::GlobalStore,
Expand All @@ -12,6 +11,7 @@ use {
LocalStore,
PriceInfo,
},
oracle::PricePublishingMetadata,
State,
},
anyhow::{
Expand Down Expand Up @@ -514,6 +514,7 @@ impl Exporter {
/// (n / batch_size) requests in flight.
async fn publish_updates(&mut self) -> Result<()> {
let permissioned_updates = self.get_permissioned_updates().await?;
let current_timestamp_millis = Utc::now().timestamp_millis();

if permissioned_updates.is_empty() {
return Ok(());
Expand Down
Loading

0 comments on commit 96af23e

Please sign in to comment.