Skip to content

Commit

Permalink
✨ Publishers Processing (#14)
Browse files Browse the repository at this point in the history
* init

* data type modifications

* fix: publisher task removal

* fix: metric label

---------

Co-authored-by: 0xevolve <[email protected]>
  • Loading branch information
JordyRo1 and EvolveArt authored Jan 24, 2024
1 parent 2ad5794 commit 89de531
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ lazy_static! {
"time_since_last_update_seconds",
"Time since the last update in seconds."
),
&["network", "publisher", "type"]
&["network", "publisher"]
)
.unwrap();
pub static ref PAIR_PRICE: GaugeVec = register_gauge_vec!(
Expand Down
46 changes: 40 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use diesel_async::AsyncPgConnection;
use dotenv::dotenv;
use std::env;
use std::time::Duration;
use std::vec;
use tokio::time::interval;

use crate::processing::common::{check_publisher_balance, is_syncing};
Expand Down Expand Up @@ -57,15 +58,16 @@ async fn main() {
let spot_monitoring = tokio::spawn(monitor(pool.clone(), true, &DataType::Spot));
let future_monitoring = tokio::spawn(monitor(pool.clone(), true, &DataType::Future));

let balance_monitoring = tokio::spawn(balance_monitor());
let publisher_monitoring = tokio::spawn(publisher_monitor(pool.clone(), false));

let api_monitoring = tokio::spawn(monitor_api());

// Wait for the monitoring to finish
let results = futures::future::join_all(vec![
spot_monitoring,
future_monitoring,
api_monitoring,
balance_monitoring,
publisher_monitoring,
])
.await;

Expand All @@ -80,7 +82,7 @@ async fn main() {
log::error!("[API] Monitoring failed: {:?}", e);
}
if let Err(e) = &results[3] {
log::error!("[BALANCE] Monitoring failed: {:?}", e);
log::error!("[PUBLISHERS] Monitoring failed: {:?}", e);
}
}

Expand Down Expand Up @@ -205,19 +207,51 @@ pub(crate) async fn monitor(
}
}

pub(crate) async fn balance_monitor() {
pub(crate) async fn publisher_monitor(
pool: deadpool::managed::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
wait_for_syncing: bool,
) {
log::info!("[PUBLISHERS] Monitoring Publishers..");

let mut interval = interval(Duration::from_secs(30));
let monitoring_config: arc_swap::Guard<std::sync::Arc<config::Config>> = get_config(None).await;

loop {
interval.tick().await; // Wait for the next tick

if wait_for_syncing {
match is_syncing(&DataType::Spot).await {
Ok(true) => {
log::info!("[PUBLISHERS] Indexers are still syncing ♻️");
continue;
}
Ok(false) => {
log::info!("PUBLISHERS] Indexers are synced ✅");
}
Err(e) => {
log::error!(
"[PUBLISHERS] Failed to check if indexers are syncing: {:?}",
e
);
continue;
}
}
}

let tasks: Vec<_> = monitoring_config
.all_publishers()
.iter()
.map(|(name, address)| {
tokio::spawn(Box::pin(check_publisher_balance(name.clone(), *address)))
.flat_map(|(publisher, address)| {
vec![
tokio::spawn(Box::pin(check_publisher_balance(
publisher.clone(),
*address,
))),
tokio::spawn(Box::pin(processing::spot::process_data_by_publisher(
pool.clone(),
publisher.clone(),
))),
]
})
.collect();

Expand Down
7 changes: 0 additions & 7 deletions src/processing/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::constants::PAIR_PRICE;
use crate::constants::PRICE_DEVIATION;
use crate::constants::PRICE_DEVIATION_SOURCE;
use crate::constants::TIME_SINCE_LAST_UPDATE_PAIR_ID;
use crate::constants::TIME_SINCE_LAST_UPDATE_PUBLISHER;
use crate::diesel::QueryDsl;
use crate::error::MonitoringError;
use crate::models::FutureEntry;
Expand Down Expand Up @@ -149,11 +148,6 @@ pub async fn process_data_by_pair_and_source(
let data_type = "future";

// Get the labels
let time_labels = TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[
network_env,
&data.publisher,
data_type,
]);
let price_labels = PAIR_PRICE.with_label_values(&[network_env, pair, src, data_type]);
let deviation_labels =
PRICE_DEVIATION.with_label_values(&[network_env, pair, src, data_type]);
Expand All @@ -172,7 +166,6 @@ pub async fn process_data_by_pair_and_source(

// Set the metrics
price_labels.set(normalized_price);
time_labels.set(time as f64);
deviation_labels.set(deviation);
source_deviation_labels.set(source_deviation);

Expand Down
52 changes: 46 additions & 6 deletions src/processing/spot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,6 @@ pub async fn process_data_by_pair_and_source(
let data_type = "spot";

// Get the labels
let time_labels = TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[
network_env,
&data.publisher,
data_type,
]);
let price_labels = PAIR_PRICE.with_label_values(&[network_env, pair, src, data_type]);
let deviation_labels =
PRICE_DEVIATION.with_label_values(&[network_env, pair, src, data_type]);
Expand All @@ -167,7 +162,6 @@ pub async fn process_data_by_pair_and_source(

// Set the metrics
price_labels.set(normalized_price);
time_labels.set(time as f64);
deviation_labels.set(deviation);
source_deviation_labels.set(source_deviation);

Expand All @@ -176,3 +170,49 @@ pub async fn process_data_by_pair_and_source(
Err(e) => Err(e.into()),
}
}

pub async fn process_data_by_publisher(
pool: deadpool::managed::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
publisher: String,
) -> Result<(), MonitoringError> {
let mut conn = pool
.get()
.await
.map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?;

let config = get_config(None).await;

let result: Result<SpotEntry, _> = match config.network().name {
NetworkName::Testnet => {
testnet_dsl::spot_entry
.filter(testnet_dsl::publisher.eq(publisher.clone()))
.order(testnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
}
NetworkName::Mainnet => {
mainnet_dsl::mainnet_spot_entry
.filter(mainnet_dsl::publisher.eq(publisher.clone()))
.order(mainnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
}
};

log::info!("Processing data for publisher: {}", publisher);

match result {
Ok(data) => {
let network_env = &config.network_str();

let seconds_since_last_publish = time_since_last_update(&data);
let time_labels =
TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[network_env, &publisher]);

time_labels.set(seconds_since_last_publish as f64);

Ok(())
}
Err(e) => Err(e.into()),
}
}

0 comments on commit 89de531

Please sign in to comment.