Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
EvolveArt committed Nov 22, 2024
1 parent 8c82185 commit d09e88e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 57 deletions.
79 changes: 24 additions & 55 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tokio::task::JoinHandle;
use tokio::time::interval;

use config::{get_config, init_long_tail_asset_configuration, periodic_config_update, DataType};
use processing::common::{check_publisher_balance, data_indexers_are_synced, indexers_are_synced};
use processing::common::{check_publisher_balance, data_indexers_are_synced};
use utils::{is_long_tail_asset, log_monitoring_results, log_tasks_results};

struct MonitoringTask {
Expand Down Expand Up @@ -73,49 +73,40 @@ async fn main() {
init_long_tail_asset_configuration();

// Monitor spot/future in parallel
let monitoring_tasks = spawn_monitoring_tasks(pool.clone(), &monitoring_config).await;
let monitoring_tasks = spawn_monitoring_tasks(pool.clone()).await;
handle_task_results(monitoring_tasks).await;
}

async fn spawn_monitoring_tasks(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
monitoring_config: &config::Config,
) -> Vec<MonitoringTask> {
let mut tasks = vec![
// MonitoringTask {
// name: "Config Update".to_string(),
// handle: tokio::spawn(periodic_config_update()),
// },
let tasks = vec![
MonitoringTask {
name: "Config Update".to_string(),
handle: tokio::spawn(periodic_config_update()),
},
MonitoringTask {
name: "Spot Monitoring".to_string(),
handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Spot)),
},
// MonitoringTask {
// name: "Future Monitoring".to_string(),
// handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Future)),
// },
// MonitoringTask {
// name: "Publisher Monitoring".to_string(),
// handle: tokio::spawn(publisher_monitor(pool.clone(), false)),
// },
MonitoringTask {
name: "Future Monitoring".to_string(),
handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Future)),
},
MonitoringTask {
name: "Publisher Monitoring".to_string(),
handle: tokio::spawn(publisher_monitor(pool.clone(), false)),
},
MonitoringTask {
name: "API Monitoring".to_string(),
handle: tokio::spawn(api_monitor()),
},
MonitoringTask {
name: "VRF Monitoring".to_string(),
handle: tokio::spawn(vrf_monitor(pool.clone())),
},
];

// if monitoring_config.is_pragma_chain() {
// tasks.push(MonitoringTask {
// name: "Hyperlane Dispatches Monitoring".to_string(),
// handle: tokio::spawn(hyperlane_dispatch_monitor(pool.clone(), true)),
// });
// } else {
// tasks.push(MonitoringTask {
// name: "API Monitoring".to_string(),
// handle: tokio::spawn(api_monitor()),
// });
// tasks.push(MonitoringTask {
// name: "VRF Monitoring".to_string(),
// handle: tokio::spawn(vrf_monitor(pool.clone())),
// });
// }

tasks
}

Expand Down Expand Up @@ -301,26 +292,4 @@ pub(crate) async fn vrf_monitor(pool: Pool<AsyncDieselConnectionManager<AsyncPgC
let results: Vec<_> = futures::future::join_all(tasks).await;
log_tasks_results("VRF", results);
}
}

pub(crate) async fn hyperlane_dispatch_monitor(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
wait_for_syncing: bool,
) {
let mut interval = interval(Duration::from_secs(5));

loop {
interval.tick().await; // Wait for the next tick

// Skip if indexer is still syncing
if wait_for_syncing && !indexers_are_synced("pragma_devnet_dispatch_event").await {
continue;
}

let tasks: Vec<_> = vec![tokio::spawn(Box::pin(
processing::dispatch::process_dispatch_events(pool.clone()),
))];
let results: Vec<_> = futures::future::join_all(tasks).await;
log_tasks_results("Dispatch", results);
}
}
}
2 changes: 0 additions & 2 deletions src/monitoring/on_off_deviation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ pub async fn on_off_price_deviation(
.await
.map_err(|e| MonitoringError::Api(format!("Failed to get response text: {}", e)))?;

println!("Response JSON: {}", response_text);

let coins_prices: CoinPricesDTO =
serde_json::from_str(&response_text).map_err(|e| {
MonitoringError::Api(format!(
Expand Down
2 changes: 2 additions & 0 deletions src/processing/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub async fn data_indexers_are_synced(data_type: &DataType) -> bool {

/// Checks if indexers of the given data type are still syncing
/// Returns true if any of the indexers is still syncing
#[allow(unused)]
pub async fn is_syncing(table_name: &str) -> Result<bool, MonitoringError> {
let config = get_config(None).await;

Expand All @@ -88,6 +89,7 @@ pub async fn is_syncing(table_name: &str) -> Result<bool, MonitoringError> {
}

/// Check if the indexers are still syncing
#[allow(unused)]
pub async fn indexers_are_synced(table_name: &str) -> bool {
match is_syncing(table_name).await {
Ok(true) => {
Expand Down
1 change: 1 addition & 0 deletions src/processing/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{config::get_config, error::MonitoringError, models::FeedDispatch};
/// * dispatch_event_latest_block,
/// * dispatch_event_feed_latest_block_update,
/// * dispatch_event_nb_feeds_updated.
#[allow(unused)]
pub async fn process_dispatch_events(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
) -> Result<(), MonitoringError> {
Expand Down

0 comments on commit d09e88e

Please sign in to comment.