diff --git a/Dockerfile b/Dockerfile index 6f0be6e..4fda868 100644 --- a/Dockerfile +++ b/Dockerfile @@ -50,4 +50,4 @@ EXPOSE 8080 ENV RUST_LOG=info -CMD ["/bin/server"] \ No newline at end of file +CMD ["/bin/server"] \ No newline at end of file diff --git a/README.md b/README.md index 6ecb6df..7195177 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ It then processes the data and computes the following metrics: - `price_deviation{network, pair, source, type}`: Deviation of the price from a reference price (DefiLlama API) given source and pair. (in percents) - `price_deviation_source{network, pair, source, type}`: Deviation of the price from the on-chain aggregated median price given source and pair. (in percents) - `publisher_balance{network, publisher}`: Balance of a publisher. (in ETH) +- `vrf_requests_count{network, status}`: Number of VRF requests handled for a given network. +- `vrf_time_since_last_handle_request{network}`: Time since the last VRF request was handled for a given network. ## Shared Public Access diff --git a/prometheus/alerts.rules.yml b/prometheus/alerts.rules.yml index 167ad3a..4ebb0ab 100644 --- a/prometheus/alerts.rules.yml +++ b/prometheus/alerts.rules.yml @@ -95,3 +95,14 @@ groups: annotations: summary: "Sequencer deviation is too high" description: "The ETH/STRK price has deviated from the sequencer price by more than 2%." + - name: VRF + rules: + - alert: TimeInInitialStatusTooLong + expr: vrf_time_in_received_status > 900 # 900 Seconds = 15 minutes + for: 5m + labels: + severity: critical + group: VRF + annotations: + summary: "VRF request has been in initial status for too long" + description: "The VRF request with ID {{ $labels.request_id }} in network {{ $labels.network }} has been in the initial status for more than 15 minutes." diff --git a/src/constants.rs b/src/constants.rs index c60274c..afccd8d 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -133,6 +133,14 @@ lazy_static! { &["network"] ) .unwrap(); + pub static ref VRF_TIME_IN_RECEIVED_STATUS: GaugeVec = register_gauge_vec!( + opts!( + "vrf_time_in_received_status", + "Time in seconds that a VRF request has been in the initial status for a given network." + ), + &["network", "request_id"] + ) + .unwrap(); } #[allow(unused)] diff --git a/src/error.rs b/src/error.rs index 79f76a4..59756b4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,7 +6,7 @@ use starknet::providers::ProviderError; pub enum MonitoringError { Price(String), Database(diesel::result::Error), - Connection(String), + Connection(diesel_async::pooled_connection::deadpool::PoolError), Api(String), Conversion(String), OnChain(String), diff --git a/src/main.rs b/src/main.rs index d886f6f..24c6104 100644 --- a/src/main.rs +++ b/src/main.rs @@ -261,7 +261,6 @@ pub(crate) async fn publisher_monitor( pub(crate) async fn vrf_monitor(pool: Pool>) { log::info!("[VRF] Monitoring VRF requests.."); let mut interval = interval(Duration::from_secs(30)); - loop { let tasks: Vec<_> = vec![ tokio::spawn(Box::pin(processing::vrf::check_vrf_request_count( @@ -270,7 +269,11 @@ pub(crate) async fn vrf_monitor(pool: Pool = futures::future::join_all(tasks).await; log_tasks_results("VRF", results); diff --git a/src/processing/future.rs b/src/processing/future.rs index 60ff8c0..fefeb6e 100644 --- a/src/processing/future.rs +++ b/src/processing/future.rs @@ -31,10 +31,7 @@ pub async fn process_data_by_pair( pool: deadpool::managed::Pool>, pair: String, ) -> Result { - let mut conn = pool - .get() - .await - .map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?; + let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; let config = get_config(None).await; @@ -117,10 +114,7 @@ pub async fn process_data_by_pair_and_source( src: &str, decimals: u32, ) -> Result { - let mut conn = pool - .get() - .await - .map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?; + let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; let config = get_config(None).await; @@ -180,10 +174,7 @@ pub async fn process_data_by_publisher( pool: deadpool::managed::Pool>, publisher: String, ) -> Result<(), MonitoringError> { - let mut conn = pool - .get() - .await - .map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?; + let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; let config = get_config(None).await; diff --git a/src/processing/spot.rs b/src/processing/spot.rs index 5c7d2fc..1ba1bb9 100644 --- a/src/processing/spot.rs +++ b/src/processing/spot.rs @@ -31,10 +31,7 @@ pub async fn process_data_by_pair( pool: deadpool::managed::Pool>, pair: String, ) -> Result { - let mut conn = pool - .get() - .await - .map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?; + let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; let config = get_config(None).await; @@ -112,10 +109,7 @@ pub async fn process_data_by_pair_and_source( src: &str, decimals: u32, ) -> Result { - let mut conn = pool - .get() - .await - .map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?; + let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; let config = get_config(None).await; @@ -175,10 +169,7 @@ pub async fn process_data_by_publisher( pool: deadpool::managed::Pool>, publisher: String, ) -> Result<(), MonitoringError> { - let mut conn = pool - .get() - .await - .map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?; + let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; let config = get_config(None).await; diff --git a/src/processing/vrf.rs b/src/processing/vrf.rs index 83b1d52..f7583ad 100644 --- a/src/processing/vrf.rs +++ b/src/processing/vrf.rs @@ -10,18 +10,35 @@ use diesel_async::pooled_connection::AsyncDieselConnectionManager; use diesel_async::AsyncPgConnection; use diesel_async::RunQueryDsl; -use crate::constants::{VRF_REQUESTS_COUNT, VRF_TIME_SINCE_LAST_HANDLE_REQUEST}; +use crate::constants::{ + VRF_REQUESTS_COUNT, VRF_TIME_IN_RECEIVED_STATUS, VRF_TIME_SINCE_LAST_HANDLE_REQUEST, +}; use crate::diesel::QueryDsl; use crate::error::MonitoringError; +use crate::models::VrfRequest; use crate::schema::vrf_requests::dsl as vrf_dsl; +#[derive(Debug, Copy, Clone)] +#[allow(dead_code)] +enum VrfStatus { + Uninitialized, + Received, + Fulfilled, + Cancelled, + OutOfGas, + Refunded, +} + +impl From for BigDecimal { + fn from(val: VrfStatus) -> Self { + BigDecimal::from(val as i32) + } +} + pub async fn check_vrf_request_count( pool: Pool>, ) -> Result<(), MonitoringError> { - let mut conn = pool - .get() - .await - .map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?; + let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; let networks: Vec = vrf_dsl::vrf_requests .select(vrf_dsl::network) @@ -50,10 +67,7 @@ pub async fn check_vrf_request_count( pub async fn check_vrf_time_since_last_handle( pool: Pool>, ) -> Result<(), MonitoringError> { - let mut conn = pool - .get() - .await - .map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?; + let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; let networks: Vec = vrf_dsl::vrf_requests .select(vrf_dsl::network) @@ -61,6 +75,7 @@ pub async fn check_vrf_time_since_last_handle( .load::(&mut conn) .await?; + let now = Utc::now().naive_utc(); for network in networks { let last_handle_time: Option = vrf_dsl::vrf_requests .filter(vrf_dsl::network.eq(&network)) @@ -69,13 +84,45 @@ pub async fn check_vrf_time_since_last_handle( .await?; if let Some(last_time) = last_handle_time { - let now = Utc::now().naive_utc(); let duration = now.signed_duration_since(last_time).num_seconds(); VRF_TIME_SINCE_LAST_HANDLE_REQUEST .with_label_values(&[&network]) .set(duration as f64); } } + Ok(()) +} + +pub async fn check_vrf_received_status_duration( + pool: Pool>, +) -> Result<(), MonitoringError> { + let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; + + // Clear the gauge so we only check current received requests for each network + VRF_TIME_IN_RECEIVED_STATUS.reset(); + + let networks: Vec = vrf_dsl::vrf_requests + .select(vrf_dsl::network) + .distinct() + .load::(&mut conn) + .await?; + + let now = Utc::now().naive_utc(); + + for network in networks { + let requests: Vec = vrf_dsl::vrf_requests + .filter(vrf_dsl::network.eq(&network)) + .filter(vrf_dsl::status.eq(BigDecimal::from(VrfStatus::Received))) + .load::(&mut conn) + .await?; + + for request in requests { + let duration = now.signed_duration_since(request.created_at).num_seconds(); + VRF_TIME_IN_RECEIVED_STATUS + .with_label_values(&[&network, &request.request_id.to_string()]) + .set(duration as f64); + } + } Ok(()) }