Skip to content

Commit

Permalink
refactor(agent): convert global store to an Api
Browse files Browse the repository at this point in the history
  • Loading branch information
Reisen committed May 21, 2024
1 parent 8754c47 commit b91f96f
Show file tree
Hide file tree
Showing 12 changed files with 503 additions and 587 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ENV PATH="${PATH}:/root/.local/bin"
RUN poetry config virtualenvs.in-project true

# Install Solana Tool Suite
RUN sh -c "$(curl -sSfL https://release.solana.com/v1.14.11/install)"
RUN sh -c "$(curl -sSfL https://release.solana.com/v1.14.17/install)"
ENV PATH="${PATH}:/root/.local/share/solana/install/active_release/bin"

ADD . /agent
Expand Down
43 changes: 14 additions & 29 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,29 @@ impl Agent {
// Create the channels
// TODO: make all components listen to shutdown signal
let (shutdown_tx, _) = broadcast::channel(self.config.channel_capacities.shutdown);
let (primary_oracle_updates_tx, primary_oracle_updates_rx) =
mpsc::channel(self.config.channel_capacities.primary_oracle_updates);
let (secondary_oracle_updates_tx, secondary_oracle_updates_rx) =
mpsc::channel(self.config.channel_capacities.secondary_oracle_updates);
let (global_store_lookup_tx, global_store_lookup_rx) =
mpsc::channel(self.config.channel_capacities.global_store_lookup);
let (local_store_tx, local_store_rx) =
mpsc::channel(self.config.channel_capacities.local_store);
let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10);
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);

// Create the Pythd Adapter.
let adapter = Arc::new(
pythd::adapter::Adapter::new(
self.config.pythd_adapter.clone(),
local_store_tx.clone(),
logger.clone(),
)
.await,
);

// Spawn the primary network
jhs.extend(network::spawn_network(
self.config.primary_network.clone(),
network::Network::Primary,
local_store_tx.clone(),
global_store_lookup_tx.clone(),
primary_oracle_updates_tx,
primary_keypair_loader_tx,
logger.new(o!("primary" => true)),
adapter.clone(),
)?);

// Spawn the secondary network, if needed
Expand All @@ -146,53 +149,35 @@ impl Agent {
config.clone(),
network::Network::Secondary,
local_store_tx.clone(),
global_store_lookup_tx.clone(),
secondary_oracle_updates_tx,
secondary_keypair_loader_tx,
logger.new(o!("primary" => false)),
adapter.clone(),
)?);
}

// Create the Pythd Adapter.
let adapter = Arc::new(pythd::adapter::Adapter::new(
self.config.pythd_adapter.clone(),
global_store_lookup_tx.clone(),
local_store_tx.clone(),
logger.clone(),
));

// Create the Notifier task for the Pythd RPC.
jhs.push(tokio::spawn(notifier(
adapter.clone(),
shutdown_tx.subscribe(),
)));

// Spawn the Global Store
jhs.push(store::global::spawn_store(
global_store_lookup_rx,
primary_oracle_updates_rx,
secondary_oracle_updates_rx,
adapter.clone(),
logger.clone(),
));

// Spawn the Local Store
jhs.push(store::local::spawn_store(local_store_rx, logger.clone()));

// Spawn the Pythd API Server
jhs.push(tokio::spawn(rpc::run(
self.config.pythd_api_server.clone(),
logger.clone(),
adapter,
adapter.clone(),
shutdown_tx.subscribe(),
)));

// Spawn the metrics server
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(
self.config.metrics_server.bind_address,
local_store_tx,
global_store_lookup_tx,
logger.clone(),
adapter,
)));

// Spawn the remote keypair loader endpoint for both networks
Expand Down
47 changes: 18 additions & 29 deletions src/agent/dashboard.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
use {
super::{
solana::oracle::PriceEntry,
store::{
global::{
AllAccountsData,
AllAccountsMetadata,
Lookup,
PriceAccountMetadata,
},
local::{
Message,
PriceInfo,
},
pythd::adapter::global::GlobalStore,
solana::{
network::Network,
oracle::PriceEntry,
},
store::local::{
Message,
PriceInfo,
},
},
crate::agent::{
metrics::MetricsServer,
pythd::adapter::global::{
AllAccountsData,
AllAccountsMetadata,
PriceAccountMetadata,
},
},
crate::agent::metrics::MetricsServer,
chrono::DateTime,
pyth_sdk::{
Identifier,
Expand Down Expand Up @@ -44,8 +47,6 @@ impl MetricsServer {
pub async fn render_dashboard(&self) -> Result<String, Box<dyn std::error::Error>> {
// Prepare response channel for requests
let (local_tx, local_rx) = oneshot::channel();
let (global_data_tx, global_data_rx) = oneshot::channel();
let (global_metadata_tx, global_metadata_rx) = oneshot::channel();

// Request price data from local and global store
self.local_store_tx
Expand All @@ -54,23 +55,11 @@ impl MetricsServer {
})
.await?;

self.global_store_lookup_tx
.send(Lookup::LookupAllAccountsData {
network: super::solana::network::Network::Primary,
result_tx: global_data_tx,
})
.await?;

self.global_store_lookup_tx
.send(Lookup::LookupAllAccountsMetadata {
result_tx: global_metadata_tx,
})
.await?;
let global_data = GlobalStore::accounts_data(&*self.adapter, Network::Primary).await?;
let global_metadata = GlobalStore::accounts_metadata(&*self.adapter).await?;

// Await the results
let local_data = local_rx.await?;
let global_data = global_data_rx.await??;
let global_metadata = global_metadata_rx.await??;

let symbol_view =
build_dashboard_data(local_data, global_data, global_metadata, &self.logger);
Expand Down
20 changes: 10 additions & 10 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::store::{
global::Lookup,
local::Message,
super::{
pythd::adapter::Adapter,
store::local::Message,
},
crate::agent::{
solana::oracle::PriceEntry,
Expand Down Expand Up @@ -74,25 +74,25 @@ lazy_static! {
/// dashboard and metrics.
pub struct MetricsServer {
/// Used to pull the state of all symbols in local store
pub local_store_tx: mpsc::Sender<Message>,
pub global_store_lookup_tx: mpsc::Sender<Lookup>,
pub start_time: Instant,
pub logger: Logger,
pub local_store_tx: mpsc::Sender<Message>,
pub start_time: Instant,
pub logger: Logger,
pub adapter: Arc<Adapter>,
}

impl MetricsServer {
/// Instantiate a metrics API with a dashboard
pub async fn spawn(
addr: impl Into<SocketAddr> + 'static,
local_store_tx: mpsc::Sender<Message>,
global_store_lookup_tx: mpsc::Sender<Lookup>,
logger: Logger,
adapter: Arc<Adapter>,
) {
let server = MetricsServer {
local_store_tx,
global_store_lookup_tx,
start_time: Instant::now(),
logger,
adapter,
};

let shared_state = Arc::new(Mutex::new(server));
Expand All @@ -109,7 +109,7 @@ impl MetricsServer {
.await
.unwrap_or_else(|e| {
// Add logging here
error!(locked_state.logger,"Dashboard: Rendering failed"; "error" => e.to_string());
error!(locked_state.logger,"Dashboard: Rendering failed"; "error" => e.to_string());

// Withhold failure details from client
"Could not render dashboard! See the logs for details".to_owned()
Expand Down
Loading

0 comments on commit b91f96f

Please sign in to comment.