Skip to content

Commit

Permalink
refactor: migrate adapter service to api
Browse files Browse the repository at this point in the history
  • Loading branch information
Reisen committed Apr 22, 2024
1 parent 0d6b36a commit 9d3cb67
Show file tree
Hide file tree
Showing 13 changed files with 743 additions and 1,600 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async-trait = "0.1.79"
warp = { version = "0.3.6", features = ["websocket"] }
tokio = { version = "1.37.0", features = ["full"] }
tokio-stream = "0.1.15"
futures = { version = "0.3.30" }
futures-util = { version = "0.3.30", default-features = false, features = [
"sink",
] }
Expand Down
41 changes: 23 additions & 18 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ pub mod store;
use {
self::{
config::Config,
pythd::api::rpc,
pythd::{
adapter::notifier,
api::rpc,
},
solana::network,
},
anyhow::Result,
futures_util::future::join_all,
slog::Logger,
std::sync::Arc,
tokio::sync::{
broadcast,
mpsc,
Expand Down Expand Up @@ -113,8 +117,7 @@ impl Agent {

// Create the channels
// TODO: make all components listen to shutdown signal
let (shutdown_tx, shutdown_rx) =
broadcast::channel(self.config.channel_capacities.shutdown);
let (shutdown_tx, _) = broadcast::channel(self.config.channel_capacities.shutdown);
let (primary_oracle_updates_tx, primary_oracle_updates_rx) =
mpsc::channel(self.config.channel_capacities.primary_oracle_updates);
let (secondary_oracle_updates_tx, secondary_oracle_updates_rx) =
Expand All @@ -123,8 +126,6 @@ impl Agent {
mpsc::channel(self.config.channel_capacities.global_store_lookup);
let (local_store_tx, local_store_rx) =
mpsc::channel(self.config.channel_capacities.local_store);
let (pythd_adapter_tx, pythd_adapter_rx) =
mpsc::channel(self.config.channel_capacities.pythd_adapter);
let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10);
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);

Expand Down Expand Up @@ -152,34 +153,38 @@ impl Agent {
)?);
}

// Create the Pythd Adapter.
let adapter = Arc::new(pythd::adapter::Adapter::new(
self.config.pythd_adapter.clone(),
global_store_lookup_tx.clone(),
local_store_tx.clone(),
logger.clone(),
));

// Create the Notifier task for the Pythd RPC.
jhs.push(tokio::spawn(notifier(
adapter.clone(),
shutdown_tx.subscribe(),
)));

// Spawn the Global Store
jhs.push(store::global::spawn_store(
global_store_lookup_rx,
primary_oracle_updates_rx,
secondary_oracle_updates_rx,
pythd_adapter_tx.clone(),
adapter.clone(),
logger.clone(),
));

// Spawn the Local Store
jhs.push(store::local::spawn_store(local_store_rx, logger.clone()));

// Spawn the Pythd Adapter
jhs.push(pythd::adapter::spawn_adapter(
self.config.pythd_adapter.clone(),
pythd_adapter_rx,
global_store_lookup_tx.clone(),
local_store_tx.clone(),
shutdown_tx.subscribe(),
logger.clone(),
));

// Spawn the Pythd API Server
jhs.push(tokio::spawn(rpc::run(
self.config.pythd_api_server.clone(),
logger.clone(),
pythd_adapter_tx,
shutdown_rx,
adapter,
shutdown_tx.subscribe(),
)));

// Spawn the metrics server
Expand Down
Loading

0 comments on commit 9d3cb67

Please sign in to comment.