Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(agent): rename adapter->state #124

Merged
merged 3 commits into from
May 29, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
refactor(agent): rename adapter->state
Reisen committed May 29, 2024
commit 2332f5522a72683ef7f8c7b954b3aef01b55ea2e
13 changes: 6 additions & 7 deletions src/agent.rs
Original file line number Diff line number Diff line change
@@ -62,21 +62,21 @@ Note that there is an Oracle and Exporter for each network, but only one Local S

################################################################################################################################## */

pub mod adapter;
pub mod dashboard;
pub mod legacy_schedule;
pub mod market_schedule;
pub mod metrics;
pub mod pythd;
pub mod remote_keypair_loader;
pub mod solana;
pub mod state;
pub mod store;
use {
self::{
adapter::notifier,
config::Config,
pythd::api::rpc,
solana::network,
state::notifier,
},
anyhow::Result,
futures_util::future::join_all,
@@ -121,9 +121,8 @@ impl Agent {
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);

// Create the Pythd Adapter.
let adapter = Arc::new(
adapter::Adapter::new(self.config.pythd_adapter.clone(), logger.clone()).await,
);
let adapter =
Arc::new(state::State::new(self.config.pythd_adapter.clone(), logger.clone()).await);

// Spawn the primary network
jhs.extend(network::spawn_network(
@@ -192,11 +191,11 @@ impl Agent {
pub mod config {
use {
super::{
adapter,
metrics,
pythd,
remote_keypair_loader,
solana::network,
state,
},
anyhow::Result,
config as config_rs,
@@ -216,7 +215,7 @@ pub mod config {
pub primary_network: network::Config,
pub secondary_network: Option<network::Config>,
#[serde(default)]
pub pythd_adapter: adapter::Config,
pub pythd_adapter: state::Config,
#[serde(default)]
pub pythd_api_server: pythd::api::rpc::Config,
#[serde(default)]
14 changes: 7 additions & 7 deletions src/agent/dashboard.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
use {
super::{
adapter::{
solana::{
network::Network,
oracle::PriceEntry,
},
state::{
global::GlobalStore,
local::{
LocalStore,
PriceInfo,
},
},
solana::{
network::Network,
oracle::PriceEntry,
},
},
crate::agent::{
adapter::global::{
metrics::MetricsServer,
state::global::{
AllAccountsData,
AllAccountsMetadata,
PriceAccountMetadata,
},
metrics::MetricsServer,
},
chrono::DateTime,
pyth_sdk::{
12 changes: 4 additions & 8 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::adapter::{
super::state::{
local::PriceInfo,
Adapter,
State,
},
crate::agent::{
solana::oracle::PriceEntry,
@@ -69,16 +69,12 @@ lazy_static! {
pub struct MetricsServer {
pub start_time: Instant,
pub logger: Logger,
pub adapter: Arc<Adapter>,
pub adapter: Arc<State>,
}

impl MetricsServer {
/// Instantiate a metrics API with a dashboard
pub async fn spawn(
addr: impl Into<SocketAddr> + 'static,
logger: Logger,
adapter: Arc<Adapter>,
) {
pub async fn spawn(addr: impl Into<SocketAddr> + 'static, logger: Logger, adapter: Arc<State>) {
let server = MetricsServer {
start_time: Instant::now(),
logger,
14 changes: 7 additions & 7 deletions src/agent/pythd/api/rpc.rs
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ use {
Pubkey,
SubscriptionID,
},
crate::agent::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
@@ -120,7 +120,7 @@ async fn handle_connection<S>(
notify_price_sched_tx_buffer: usize,
logger: Logger,
) where
S: adapter::AdapterApi,
S: state::StateApi,
S: Send,
S: Sync,
S: 'static,
@@ -168,7 +168,7 @@ async fn handle_next<S>(
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
) -> Result<()>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
tokio::select! {
msg = ws_rx.next() => {
@@ -210,7 +210,7 @@ async fn handle<S>(
msg: Message,
) -> Result<()>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
// Ignore control and binary messages
if !msg.is_text() {
@@ -296,7 +296,7 @@ async fn dispatch_and_catch_error<S>(
request: &Request<Method, Value>,
) -> Response<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
debug!(
logger,
@@ -436,7 +436,7 @@ pub async fn run<S>(
adapter: Arc<S>,
shutdown_rx: broadcast::Receiver<()>,
) where
S: adapter::AdapterApi,
S: state::StateApi,
S: Send,
S: Sync,
S: 'static,
@@ -454,7 +454,7 @@ async fn serve<S>(
mut shutdown_rx: broadcast::Receiver<()>,
) -> Result<()>
where
S: adapter::AdapterApi,
S: state::StateApi,
S: Send,
S: Sync,
S: 'static,
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/get_all_products.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {
crate::agent::adapter,
crate::agent::state,
anyhow::Result,
};

pub async fn get_all_products<S>(adapter: &S) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let products = adapter.get_all_products().await?;
Ok(serde_json::to_value(products)?)
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/get_product.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ use {
GetProductParams,
Method,
},
crate::agent::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
@@ -19,7 +19,7 @@ pub async fn get_product<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let params: GetProductParams = {
let value = request.params.clone();
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/get_product_list.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {
crate::agent::adapter,
crate::agent::state,
anyhow::Result,
};

pub async fn get_product_list<S>(adapter: &S) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let product_list = adapter.get_product_list().await?;
Ok(serde_json::to_value(product_list)?)
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/subscribe_price.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ use {
SubscribePriceParams,
SubscribeResult,
},
crate::agent::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
@@ -23,7 +23,7 @@ pub async fn subscribe_price<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let params: SubscribePriceParams = serde_json::from_value(
request
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/subscribe_price_sched.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ use {
SubscribePriceSchedParams,
SubscribeResult,
},
crate::agent::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
@@ -23,7 +23,7 @@ pub async fn subscribe_price_sched<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let params: SubscribePriceSchedParams = serde_json::from_value(
request
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/update_price.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ use {
Method,
UpdatePriceParams,
},
crate::agent::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
@@ -19,7 +19,7 @@ pub async fn update_price<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let params: UpdatePriceParams = serde_json::from_value(
request
4 changes: 2 additions & 2 deletions src/agent/solana.rs
Original file line number Diff line number Diff line change
@@ -15,8 +15,8 @@ pub mod network {
oracle,
},
crate::agent::{
adapter::Adapter,
remote_keypair_loader::KeypairRequest,
state::State,
},
anyhow::Result,
serde::{
@@ -82,7 +82,7 @@ pub mod network {
network: Network,
keypair_request_tx: Sender<KeypairRequest>,
logger: Logger,
adapter: Arc<Adapter>,
adapter: Arc<State>,
) -> Result<Vec<JoinHandle<()>>> {
// Publisher permissions updates between oracle and exporter
let (publisher_permissions_tx, publisher_permissions_rx) = watch::channel(<_>::default());
18 changes: 9 additions & 9 deletions src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
@@ -7,17 +7,17 @@ use {
oracle::PricePublishingMetadata,
},
crate::agent::{
adapter::{
remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
},
state::{
global::GlobalStore,
local::{
LocalStore,
PriceInfo,
},
Adapter,
},
remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
State,
},
},
anyhow::{
@@ -182,7 +182,7 @@ pub fn spawn_exporter(
key_store: KeyStore,
keypair_request_tx: mpsc::Sender<KeypairRequest>,
logger: Logger,
adapter: Arc<Adapter>,
adapter: Arc<State>,
) -> Result<Vec<JoinHandle<()>>> {
// Create and spawn the network state querier
let (network_state_tx, network_state_rx) = watch::channel(Default::default());
@@ -274,7 +274,7 @@ pub struct Exporter {

logger: Logger,

adapter: Arc<Adapter>,
adapter: Arc<State>,
}

impl Exporter {
@@ -291,7 +291,7 @@ impl Exporter {
>,
keypair_request_tx: mpsc::Sender<KeypairRequest>,
logger: Logger,
adapter: Arc<Adapter>,
adapter: Arc<State>,
) -> Self {
let publish_interval = time::interval(config.publish_interval_duration);
Exporter {
14 changes: 7 additions & 7 deletions src/agent/solana/oracle.rs
Original file line number Diff line number Diff line change
@@ -7,15 +7,15 @@ use {
network::Network,
},
crate::agent::{
adapter::{
legacy_schedule::LegacySchedule,
market_schedule::MarketSchedule,
state::{
global::{
GlobalStore,
Update,
},
Adapter,
State,
},
legacy_schedule::LegacySchedule,
market_schedule::MarketSchedule,
},
anyhow::{
anyhow,
@@ -180,7 +180,7 @@ pub struct Oracle {

logger: Logger,

adapter: Arc<Adapter>,
adapter: Arc<State>,
}

#[derive(Clone, Serialize, Deserialize, Debug)]
@@ -230,7 +230,7 @@ pub fn spawn_oracle(
>,
key_store: KeyStore,
logger: Logger,
adapter: Arc<Adapter>,
adapter: Arc<State>,
) -> Vec<JoinHandle<()>> {
let mut jhs = vec![];

@@ -275,7 +275,7 @@ impl Oracle {
updates_rx: mpsc::Receiver<(Pubkey, solana_sdk::account::Account)>,
network: Network,
logger: Logger,
adapter: Arc<Adapter>,
adapter: Arc<State>,
) -> Self {
Oracle {
data: Default::default(),
Loading