Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(agent): convert local store to an Api #123

Merged
merged 1 commit into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,18 @@ impl Agent {
// Create the channels
// TODO: make all components listen to shutdown signal
let (shutdown_tx, _) = broadcast::channel(self.config.channel_capacities.shutdown);
let (local_store_tx, local_store_rx) =
mpsc::channel(self.config.channel_capacities.local_store);
let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10);
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);

// Create the Pythd Adapter.
let adapter = Arc::new(
pythd::adapter::Adapter::new(
self.config.pythd_adapter.clone(),
local_store_tx.clone(),
logger.clone(),
)
.await,
pythd::adapter::Adapter::new(self.config.pythd_adapter.clone(), logger.clone()).await,
);

// Spawn the primary network
jhs.extend(network::spawn_network(
self.config.primary_network.clone(),
network::Network::Primary,
local_store_tx.clone(),
primary_keypair_loader_tx,
logger.new(o!("primary" => true)),
adapter.clone(),
Expand All @@ -148,7 +140,6 @@ impl Agent {
jhs.extend(network::spawn_network(
config.clone(),
network::Network::Secondary,
local_store_tx.clone(),
secondary_keypair_loader_tx,
logger.new(o!("primary" => false)),
adapter.clone(),
Expand All @@ -161,9 +152,6 @@ impl Agent {
shutdown_tx.subscribe(),
)));

// Spawn the Local Store
jhs.push(store::local::spawn_store(local_store_rx, logger.clone()));

// Spawn the Pythd API Server
jhs.push(tokio::spawn(rpc::run(
self.config.pythd_api_server.clone(),
Expand All @@ -175,7 +163,6 @@ impl Agent {
// Spawn the metrics server
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(
self.config.metrics_server.bind_address,
local_store_tx,
logger.clone(),
adapter,
)));
Expand Down
26 changes: 8 additions & 18 deletions src/agent/dashboard.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use {
super::{
pythd::adapter::global::GlobalStore,
pythd::adapter::{
global::GlobalStore,
local::{
LocalStore,
PriceInfo,
},
},
solana::{
network::Network,
oracle::PriceEntry,
},
store::local::{
Message,
PriceInfo,
},
},
crate::agent::{
metrics::MetricsServer,
Expand All @@ -34,7 +36,6 @@ use {
},
time::Duration,
},
tokio::sync::oneshot,
typed_html::{
dom::DOMTree,
html,
Expand All @@ -45,22 +46,11 @@ use {
impl MetricsServer {
/// Create an HTML view of store data
pub async fn render_dashboard(&self) -> Result<String, Box<dyn std::error::Error>> {
// Prepare response channel for requests
let (local_tx, local_rx) = oneshot::channel();

// Request price data from local and global store
self.local_store_tx
.send(Message::LookupAllPriceInfo {
result_tx: local_tx,
})
.await?;

let local_data = LocalStore::get_all_price_infos(&*self.adapter).await;
let global_data = GlobalStore::accounts_data(&*self.adapter, Network::Primary).await?;
let global_metadata = GlobalStore::accounts_metadata(&*self.adapter).await?;

// Await the results
let local_data = local_rx.await?;

let symbol_view =
build_dashboard_data(local_data, global_data, global_metadata, &self.logger);

Expand Down
26 changes: 8 additions & 18 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use {
super::{
pythd::adapter::Adapter,
store::local::Message,
super::pythd::adapter::{
local::PriceInfo,
Adapter,
},
crate::agent::{
solana::oracle::PriceEntry,
store::{
local::PriceInfo,
PriceIdentifier,
},
store::PriceIdentifier,
},
lazy_static::lazy_static,
prometheus_client::{
Expand All @@ -34,10 +31,7 @@ use {
},
time::Instant,
},
tokio::sync::{
mpsc,
Mutex,
},
tokio::sync::Mutex,
warp::{
hyper::StatusCode,
reply,
Expand Down Expand Up @@ -73,23 +67,19 @@ lazy_static! {
/// Internal metrics server state, holds state needed for serving
/// dashboard and metrics.
pub struct MetricsServer {
/// Used to pull the state of all symbols in local store
pub local_store_tx: mpsc::Sender<Message>,
pub start_time: Instant,
pub logger: Logger,
pub adapter: Arc<Adapter>,
pub start_time: Instant,
pub logger: Logger,
pub adapter: Arc<Adapter>,
}

impl MetricsServer {
/// Instantiate a metrics API with a dashboard
pub async fn spawn(
addr: impl Into<SocketAddr> + 'static,
local_store_tx: mpsc::Sender<Message>,
logger: Logger,
adapter: Arc<Adapter>,
) {
let server = MetricsServer {
local_store_tx,
start_time: Instant::now(),
logger,
adapter,
Expand Down
57 changes: 22 additions & 35 deletions src/agent/pythd/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use {
super::{
super::store::{
local,
PriceIdentifier,
},
super::store::PriceIdentifier,
api::{
NotifyPrice,
NotifyPriceSched,
Expand All @@ -29,6 +26,7 @@ use {

pub mod api;
pub mod global;
pub mod local;
pub use api::{
notifier,
AdapterApi,
Expand Down Expand Up @@ -68,14 +66,14 @@ pub struct Adapter {
/// The fixed interval at which Notify Price Sched notifications are sent
notify_price_sched_interval_duration: Duration,

/// Channel on which to communicate with the local store
local_store_tx: mpsc::Sender<local::Message>,

/// The logger
logger: Logger,

/// Global store for managing the unified state of Pyth-on-Solana networks.
global_store: global::Store,

/// Local store for managing the unpushed state.
local_store: local::Store,
}

/// Represents a single Notify Price Sched subscription
Expand All @@ -95,19 +93,15 @@ struct NotifyPriceSubscription {
}

impl Adapter {
pub async fn new(
config: Config,
local_store_tx: mpsc::Sender<local::Message>,
logger: Logger,
) -> Self {
pub async fn new(config: Config, logger: Logger) -> Self {
let registry = &mut *PROMETHEUS_REGISTRY.lock().await;
Adapter {
global_store: global::Store::new(logger.clone(), registry),
local_store: local::Store::new(logger.clone(), registry),
subscription_id_seq: 1.into(),
notify_price_sched_subscriptions: RwLock::new(HashMap::new()),
notify_price_subscriptions: RwLock::new(HashMap::new()),
notify_price_sched_interval_duration: config.notify_price_sched_interval_duration,
local_store_tx,
logger,
}
}
Expand All @@ -128,8 +122,9 @@ mod tests {
},
crate::agent::{
pythd::{
api,
adapter::local::LocalStore,
api::{
self,
NotifyPrice,
NotifyPriceSched,
PriceAccountMetadata,
Expand All @@ -140,7 +135,6 @@ mod tests {
},
},
solana,
store::local,
},
iobuffer::IoBuffer,
pyth_sdk::Identifier,
Expand Down Expand Up @@ -172,29 +166,26 @@ mod tests {
};

struct TestAdapter {
adapter: Arc<Adapter>,
local_store_rx: mpsc::Receiver<local::Message>,
shutdown_tx: broadcast::Sender<()>,
jh: JoinHandle<()>,
adapter: Arc<Adapter>,
shutdown_tx: broadcast::Sender<()>,
jh: JoinHandle<()>,
}

async fn setup() -> TestAdapter {
// Create and spawn an adapter
let (local_store_tx, local_store_rx) = mpsc::channel(1000);
let notify_price_sched_interval_duration = Duration::from_nanos(10);
let logger = slog_test::new_test_logger(IoBuffer::new());
let config = Config {
notify_price_sched_interval_duration,
};
let adapter = Arc::new(Adapter::new(config, local_store_tx, logger).await);
let adapter = Arc::new(Adapter::new(config, logger).await);
let (shutdown_tx, _) = broadcast::channel(1);

// Spawn Price Notifier
let jh = tokio::spawn(notifier(adapter.clone(), shutdown_tx.subscribe()));

TestAdapter {
adapter,
local_store_rx,
shutdown_tx,
jh,
}
Expand Down Expand Up @@ -1379,7 +1370,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_update_price() {
// Start the test adapter
let mut test_adapter = setup().await;
let test_adapter = setup().await;

// Send an Update Price message
let account = "CkMrDWtmFJZcmAUC11qNaWymbXQKvnRx4cq1QudLav7t"
Expand All @@ -1394,18 +1385,14 @@ mod tests {
.unwrap();

// Check that the local store indeed received the correct update
match test_adapter.local_store_rx.recv().await.unwrap() {
local::Message::Update {
price_identifier,
price_info,
} => {
assert_eq!(price_identifier, Identifier::new(account.to_bytes()));
assert_eq!(price_info.price, price);
assert_eq!(price_info.conf, conf);
assert_eq!(price_info.status, PriceStatus::Trading);
}
_ => panic!("Uexpected message received by local store from adapter"),
};
let price_infos = LocalStore::get_all_price_infos(&*test_adapter.adapter).await;
let price_info = price_infos
.get(&Identifier::new(account.to_bytes()))
.unwrap();

assert_eq!(price_info.price, price);
assert_eq!(price_info.conf, conf);
assert_eq!(price_info.status, PriceStatus::Trading);

let _ = test_adapter.shutdown_tx.send(());
test_adapter.jh.abort();
Expand Down
33 changes: 17 additions & 16 deletions src/agent/pythd/adapter/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use {
AllAccountsMetadata,
GlobalStore,
},
local::{
self,
LocalStore,
},
Adapter,
NotifyPriceSchedSubscription,
NotifyPriceSubscription,
Expand All @@ -28,10 +32,7 @@ use {
network::Network,
oracle::PriceEntry,
},
store::{
local,
PriceIdentifier,
},
store::PriceIdentifier,
},
anyhow::{
anyhow,
Expand Down Expand Up @@ -357,18 +358,18 @@ impl AdapterApi for Adapter {
conf: Conf,
status: String,
) -> Result<()> {
self.local_store_tx
.send(local::Message::Update {
price_identifier: pyth_sdk::Identifier::new(account.to_bytes()),
price_info: local::PriceInfo {
status: Adapter::map_status(&status)?,
price,
conf,
timestamp: Utc::now().naive_utc(),
},
})
.await
.map_err(|_| anyhow!("failed to send update to local store"))
LocalStore::update(
self,
pyth_sdk::Identifier::new(account.to_bytes()),
local::PriceInfo {
status: Adapter::map_status(&status)?,
price,
conf,
timestamp: Utc::now().naive_utc(),
},
)
.await
.map_err(|_| anyhow!("failed to send update to local store"))
}

// TODO: implement FromStr method on PriceStatus
Expand Down
Loading
Loading