Skip to content

Commit

Permalink
feat(vrf_monitoring): VRF status check
Browse files Browse the repository at this point in the history
  • Loading branch information
akhercha committed Jul 16, 2024
1 parent 508107c commit 978d206
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ EXPOSE 8080

ENV RUST_LOG=info

CMD ["/bin/server"]
CMD ["/bin/server"]
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ It then processes the data and computes the following metrics:
- `price_deviation{network, pair, source, type}`: Deviation of the price from a reference price (DefiLlama API) given source and pair. (in percents)
- `price_deviation_source{network, pair, source, type}`: Deviation of the price from the on-chain aggregated median price given source and pair. (in percents)
- `publisher_balance{network, publisher}`: Balance of a publisher. (in ETH)
- `vrf_requests_count{network, status}`: Number of VRF requests handled for a given network.
- `vrf_time_since_last_handle_request{network}`: Time since the last VRF request was handled for a given network.

## Shared Public Access

Expand Down
11 changes: 11 additions & 0 deletions prometheus/alerts.rules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,14 @@ groups:
annotations:
summary: "Sequencer deviation is too high"
description: "The ETH/STRK price has deviated from the sequencer price by more than 2%."
- name: VRF
rules:
- alert: TimeInInitialStatusTooLong
expr: vrf_time_in_received_status > 900 # 900 Seconds = 15 minutes
for: 5m
labels:
severity: critical
group: VRF
annotations:
summary: "VRF request has been in initial status for too long"
description: "The VRF request with ID {{ $labels.request_id }} in network {{ $labels.network }} has been in the initial status for more than 15 minutes."
8 changes: 8 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ lazy_static! {
&["network"]
)
.unwrap();
pub static ref VRF_TIME_IN_RECEIVED_STATUS: GaugeVec = register_gauge_vec!(
opts!(
"vrf_time_in_received_status",
"Time in seconds that a VRF request has been in the initial status for a given network."
),
&["network", "request_id"]
)
.unwrap();
}

#[allow(unused)]
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use starknet::providers::ProviderError;
pub enum MonitoringError {
Price(String),
Database(diesel::result::Error),
Connection(String),
Connection(diesel_async::pooled_connection::deadpool::PoolError),
Api(String),
Conversion(String),
OnChain(String),
Expand Down
5 changes: 4 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ pub(crate) async fn publisher_monitor(
pub(crate) async fn vrf_monitor(pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>) {
log::info!("[VRF] Monitoring VRF requests..");
let mut interval = interval(Duration::from_secs(30));

loop {
let tasks: Vec<_> = vec![
tokio::spawn(Box::pin(processing::vrf::check_vrf_request_count(
Expand All @@ -270,7 +269,11 @@ pub(crate) async fn vrf_monitor(pool: Pool<AsyncDieselConnectionManager<AsyncPgC
tokio::spawn(Box::pin(processing::vrf::check_vrf_time_since_last_handle(
pool.clone(),
))),
tokio::spawn(Box::pin(
processing::vrf::check_vrf_received_status_duration(pool.clone()),
)),
];

let results: Vec<_> = futures::future::join_all(tasks).await;
log_tasks_results("VRF", results);

Expand Down
15 changes: 3 additions & 12 deletions src/processing/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ pub async fn process_data_by_pair(
pool: deadpool::managed::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
pair: String,
) -> Result<u64, MonitoringError> {
let mut conn = pool
.get()
.await
.map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?;
let mut conn = pool.get().await.map_err(MonitoringError::Connection)?;

let config = get_config(None).await;

Expand Down Expand Up @@ -117,10 +114,7 @@ pub async fn process_data_by_pair_and_source(
src: &str,
decimals: u32,
) -> Result<u64, MonitoringError> {
let mut conn = pool
.get()
.await
.map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?;
let mut conn = pool.get().await.map_err(MonitoringError::Connection)?;

let config = get_config(None).await;

Expand Down Expand Up @@ -180,10 +174,7 @@ pub async fn process_data_by_publisher(
pool: deadpool::managed::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
publisher: String,
) -> Result<(), MonitoringError> {
let mut conn = pool
.get()
.await
.map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?;
let mut conn = pool.get().await.map_err(MonitoringError::Connection)?;

let config = get_config(None).await;

Expand Down
15 changes: 3 additions & 12 deletions src/processing/spot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ pub async fn process_data_by_pair(
pool: deadpool::managed::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
pair: String,
) -> Result<u64, MonitoringError> {
let mut conn = pool
.get()
.await
.map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?;
let mut conn = pool.get().await.map_err(MonitoringError::Connection)?;

let config = get_config(None).await;

Expand Down Expand Up @@ -112,10 +109,7 @@ pub async fn process_data_by_pair_and_source(
src: &str,
decimals: u32,
) -> Result<u64, MonitoringError> {
let mut conn = pool
.get()
.await
.map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?;
let mut conn = pool.get().await.map_err(MonitoringError::Connection)?;

let config = get_config(None).await;

Expand Down Expand Up @@ -175,10 +169,7 @@ pub async fn process_data_by_publisher(
pool: deadpool::managed::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
publisher: String,
) -> Result<(), MonitoringError> {
let mut conn = pool
.get()
.await
.map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?;
let mut conn = pool.get().await.map_err(MonitoringError::Connection)?;

let config = get_config(None).await;

Expand Down
67 changes: 57 additions & 10 deletions src/processing/vrf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,35 @@ use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::AsyncPgConnection;
use diesel_async::RunQueryDsl;

use crate::constants::{VRF_REQUESTS_COUNT, VRF_TIME_SINCE_LAST_HANDLE_REQUEST};
use crate::constants::{
VRF_REQUESTS_COUNT, VRF_TIME_IN_RECEIVED_STATUS, VRF_TIME_SINCE_LAST_HANDLE_REQUEST,
};
use crate::diesel::QueryDsl;
use crate::error::MonitoringError;
use crate::models::VrfRequest;
use crate::schema::vrf_requests::dsl as vrf_dsl;

#[derive(Debug, Copy, Clone)]
#[allow(dead_code)]
enum VrfStatus {
Uninitialized,
Received,
Fulfilled,
Cancelled,
OutOfGas,
Refunded,
}

impl From<VrfStatus> for BigDecimal {
fn from(val: VrfStatus) -> Self {
BigDecimal::from(val as i32)
}
}

pub async fn check_vrf_request_count(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
) -> Result<(), MonitoringError> {
let mut conn = pool
.get()
.await
.map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?;
let mut conn = pool.get().await.map_err(MonitoringError::Connection)?;

let networks: Vec<String> = vrf_dsl::vrf_requests
.select(vrf_dsl::network)
Expand Down Expand Up @@ -50,17 +67,15 @@ pub async fn check_vrf_request_count(
pub async fn check_vrf_time_since_last_handle(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
) -> Result<(), MonitoringError> {
let mut conn = pool
.get()
.await
.map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?;
let mut conn = pool.get().await.map_err(MonitoringError::Connection)?;

let networks: Vec<String> = vrf_dsl::vrf_requests
.select(vrf_dsl::network)
.distinct()
.load::<String>(&mut conn)
.await?;

let now = Utc::now().naive_utc();
for network in networks {
let last_handle_time: Option<chrono::NaiveDateTime> = vrf_dsl::vrf_requests
.filter(vrf_dsl::network.eq(&network))
Expand All @@ -69,13 +84,45 @@ pub async fn check_vrf_time_since_last_handle(
.await?;

if let Some(last_time) = last_handle_time {
let now = Utc::now().naive_utc();
let duration = now.signed_duration_since(last_time).num_seconds();
VRF_TIME_SINCE_LAST_HANDLE_REQUEST
.with_label_values(&[&network])
.set(duration as f64);
}
}
Ok(())
}

pub async fn check_vrf_received_status_duration(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
) -> Result<(), MonitoringError> {
let mut conn = pool.get().await.map_err(MonitoringError::Connection)?;

// Clear the gauge so we only check current received requests for each network
VRF_TIME_IN_RECEIVED_STATUS.reset();

let networks: Vec<String> = vrf_dsl::vrf_requests
.select(vrf_dsl::network)
.distinct()
.load::<String>(&mut conn)
.await?;

let now = Utc::now().naive_utc();

for network in networks {
let requests: Vec<VrfRequest> = vrf_dsl::vrf_requests
.filter(vrf_dsl::network.eq(&network))
.filter(vrf_dsl::status.eq(BigDecimal::from(VrfStatus::Received)))
.load::<VrfRequest>(&mut conn)
.await?;

for request in requests {
let duration = now.signed_duration_since(request.created_at).num_seconds();
VRF_TIME_IN_RECEIVED_STATUS
.with_label_values(&[&network, &request.request_id.to_string()])
.set(duration as f64);
}
}

Ok(())
}

0 comments on commit 978d206

Please sign in to comment.