Skip to content

Commit

Permalink
refactor(agent): StateApi -> Prices and refactor module
Browse files Browse the repository at this point in the history
The StateApi is left over from the initial Adapter, but all functionality is
for pricing/product accounts. This refactors that module and fixes the cyclic
dependency between it and GlobalStore.

The new logic performs updates within the Prices API (Which is where the state
relevant to subscriptions already was, so is the better place for it).

File rename left for a future commit to keep the diffs clean.
  • Loading branch information
Reisen committed Jun 5, 2024
1 parent ba4018f commit 0c1c066
Show file tree
Hide file tree
Showing 15 changed files with 310 additions and 208 deletions.
2 changes: 1 addition & 1 deletion src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl Agent {
}

// Create the Notifier task for the Pythd RPC.
jhs.push(tokio::spawn(notifier(adapter.clone())));
jhs.push(tokio::spawn(notifier(logger.clone(), adapter.clone())));

// Spawn the Pythd API Server
jhs.push(tokio::spawn(rpc::run(
Expand Down
48 changes: 35 additions & 13 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,44 @@
use {
super::state::{local::PriceInfo, State},
crate::agent::{solana::oracle::PriceEntry, store::PriceIdentifier},
super::state::{
local::PriceInfo,
State,
},
crate::agent::{
solana::oracle::PriceEntry,
store::PriceIdentifier,
},
lazy_static::lazy_static,
prometheus_client::{
encoding::{text::encode, EncodeLabelSet},
metrics::{counter::Counter, family::Family, gauge::Gauge},
encoding::{
text::encode,
EncodeLabelSet,
},
metrics::{
counter::Counter,
family::Family,
gauge::Gauge,
},
registry::Registry,
},
serde::Deserialize,
slog::Logger,
solana_sdk::pubkey::Pubkey,
std::{
net::SocketAddr,
sync::{atomic::AtomicU64, Arc},
sync::{
atomic::AtomicU64,
Arc,
},
time::Instant,
},
tokio::sync::Mutex,
warp::{hyper::StatusCode, reply, Filter, Rejection, Reply},
warp::{
hyper::StatusCode,
reply,
Filter,
Rejection,
Reply,
},
};

pub fn default_bind_address() -> SocketAddr {
Expand Down Expand Up @@ -46,8 +68,8 @@ lazy_static! {
/// metrics.
pub struct MetricsServer {
pub start_time: Instant,
pub logger: Logger,
pub adapter: Arc<State>,
pub logger: Logger,
pub adapter: Arc<State>,
}

impl MetricsServer {
Expand Down Expand Up @@ -151,12 +173,12 @@ pub struct PriceGlobalMetrics {

/// f64 is used to get u64 support. Official docs:
/// https://docs.rs/prometheus-client/latest/prometheus_client/metrics/gauge/struct.Gauge.html#using-atomicu64-as-storage-and-f64-on-the-interface
conf: Family<PriceGlobalLabels, Gauge<f64, AtomicU64>>,
conf: Family<PriceGlobalLabels, Gauge<f64, AtomicU64>>,
timestamp: Family<PriceGlobalLabels, Gauge>,

/// Note: the exponent is not applied to this metric
prev_price: Family<PriceGlobalLabels, Gauge>,
prev_conf: Family<PriceGlobalLabels, Gauge<f64, AtomicU64>>,
prev_price: Family<PriceGlobalLabels, Gauge>,
prev_conf: Family<PriceGlobalLabels, Gauge<f64, AtomicU64>>,
prev_timestamp: Family<PriceGlobalLabels, Gauge>,

/// How many times this Price was updated in the global store
Expand Down Expand Up @@ -299,10 +321,10 @@ pub struct PriceLocalLabels {
/// Metrics exposed to Prometheus by the local store for each price
#[derive(Default)]
pub struct PriceLocalMetrics {
price: Family<PriceLocalLabels, Gauge>,
price: Family<PriceLocalLabels, Gauge>,
/// f64 is used to get u64 support. Official docs:
/// https://docs.rs/prometheus-client/latest/prometheus_client/metrics/gauge/struct.Gauge.html#using-atomicu64-as-storage-and-f64-on-the-interface
conf: Family<PriceLocalLabels, Gauge<f64, AtomicU64>>,
conf: Family<PriceLocalLabels, Gauge<f64, AtomicU64>>,
timestamp: Family<PriceLocalLabels, Gauge>,

/// How many times this price was updated in the local store
Expand Down
12 changes: 6 additions & 6 deletions src/agent/pythd/api/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn handle_connection<S>(
notify_price_sched_tx_buffer: usize,
logger: Logger,
) where
S: state::StateApi,
S: state::Prices,
S: Send,
S: Sync,
S: 'static,
Expand Down Expand Up @@ -165,7 +165,7 @@ async fn handle_next<S>(
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
) -> Result<()>
where
S: state::StateApi,
S: state::Prices,
{
tokio::select! {
msg = ws_rx.next() => {
Expand Down Expand Up @@ -207,7 +207,7 @@ async fn handle<S>(
msg: Message,
) -> Result<()>
where
S: state::StateApi,
S: state::Prices,
{
// Ignore control and binary messages
if !msg.is_text() {
Expand Down Expand Up @@ -293,7 +293,7 @@ async fn dispatch_and_catch_error<S>(
request: &Request<Method, Value>,
) -> Response<serde_json::Value>
where
S: state::StateApi,
S: state::Prices,
{
debug!(
logger,
Expand Down Expand Up @@ -429,7 +429,7 @@ impl Default for Config {

pub async fn run<S>(config: Config, logger: Logger, adapter: Arc<S>)
where
S: state::StateApi,
S: state::Prices,
S: Send,
S: Sync,
S: 'static,
Expand All @@ -442,7 +442,7 @@ where

async fn serve<S>(config: Config, logger: &Logger, adapter: Arc<S>) -> Result<()>
where
S: state::StateApi,
S: state::Prices,
S: Send,
S: Sync,
S: 'static,
Expand Down
2 changes: 1 addition & 1 deletion src/agent/pythd/api/rpc/get_all_products.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {

pub async fn get_all_products<S>(adapter: &S) -> Result<serde_json::Value>
where
S: state::StateApi,
S: state::Prices,
{
let products = adapter.get_all_products().await?;
Ok(serde_json::to_value(products)?)
Expand Down
2 changes: 1 addition & 1 deletion src/agent/pythd/api/rpc/get_product.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub async fn get_product<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: state::StateApi,
S: state::Prices,
{
let params: GetProductParams = {
let value = request.params.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/agent/pythd/api/rpc/get_product_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {

pub async fn get_product_list<S>(adapter: &S) -> Result<serde_json::Value>
where
S: state::StateApi,
S: state::Prices,
{
let product_list = adapter.get_product_list().await?;
Ok(serde_json::to_value(product_list)?)
Expand Down
2 changes: 1 addition & 1 deletion src/agent/pythd/api/rpc/subscribe_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub async fn subscribe_price<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: state::StateApi,
S: state::Prices,
{
let params: SubscribePriceParams = serde_json::from_value(
request
Expand Down
2 changes: 1 addition & 1 deletion src/agent/pythd/api/rpc/subscribe_price_sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub async fn subscribe_price_sched<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: state::StateApi,
S: state::Prices,
{
let params: SubscribePriceSchedParams = serde_json::from_value(
request
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/update_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub async fn update_price<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: state::StateApi,
S: state::Prices,
{
let params: UpdatePriceParams = serde_json::from_value(
request
Expand All @@ -29,7 +29,7 @@ where
)?;

adapter
.update_price(
.update_local_price(
&params.account.parse::<solana_sdk::pubkey::Pubkey>()?,
params.price,
params.conf,
Expand Down
10 changes: 4 additions & 6 deletions src/agent/solana/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ use {
legacy_schedule::LegacySchedule,
market_schedule::MarketSchedule,
state::{
global::{
GlobalStore,
Update,
},
global::Update,
Prices,
State,
},
},
Expand Down Expand Up @@ -416,7 +414,7 @@ impl Oracle {
account_key: &Pubkey,
account: &ProductEntry,
) -> Result<()> {
GlobalStore::update(
Prices::update_global_price(
&*self.adapter,
self.network,
&Update::ProductAccountUpdate {
Expand All @@ -433,7 +431,7 @@ impl Oracle {
account_key: &Pubkey,
account: &PriceEntry,
) -> Result<()> {
GlobalStore::update(
Prices::update_global_price(
&*self.adapter,
self.network,
&Update::PriceAccountUpdate {
Expand Down
Loading

0 comments on commit 0c1c066

Please sign in to comment.