From 718829f256206a7341cd035bfb2f1392f71ab2fd Mon Sep 17 00:00:00 2001 From: akhercha Date: Wed, 24 Jul 2024 22:36:56 +0200 Subject: [PATCH] fix(long_tail_assets): New alerts for long tail assets --- prometheus/alerts.rules.yml | 31 +++++++++-- src/config.rs | 26 +++++++-- src/constants.rs | 35 ++++++++---- src/main.rs | 1 + src/processing/spot.rs | 108 +++++++++++++++++++++++------------- 5 files changed, 141 insertions(+), 60 deletions(-) diff --git a/prometheus/alerts.rules.yml b/prometheus/alerts.rules.yml index 3053fcb..913f13c 100644 --- a/prometheus/alerts.rules.yml +++ b/prometheus/alerts.rules.yml @@ -64,14 +64,35 @@ groups: summary: "Price deviation is too high" description: "The median on-chain price of {{ $labels.pair }} has deviated for more than 2.5% with the reference price from DefiLlama." - - alert: LongTailAssetDeviation - expr: abs(long_tail_asset_deviation) > on(pair) group_left long_tail_asset_threshold + - alert: LongTailAssetCriticalDeviation + expr: | + ( + long_tail_asset_deviating_sources >= 2 + and + long_tail_asset_total_sources <= 6 + ) + or + ( + long_tail_asset_deviating_sources / long_tail_asset_total_sources >= 0.25 + and + long_tail_asset_total_sources > 6 + ) for: 5m labels: - severity: warning + severity: critical annotations: - summary: "Long tail asset deviation is too high" - description: 'The deviation between sources for {{ $labels.pair }} ({{ $labels.type }}) from {{ $labels.source1 }} vs {{ $labels.source2 }} has exceeded the configured threshold of {{ $value | printf "%.2f" }}.' + summary: "Long tail asset critical deviation detected" + description: | + {{ $deviating := printf "%.0f" $value }} + {{ $total := printf "%.0f" (long_tail_asset_total_sources) }} + {{ $threshold := printf "%.2f" ( + (long_tail_asset_total_sources <= 6) + * 100 * last_over_time(long_tail_asset_threshold{type="low"}[1h]) + or + (long_tail_asset_total_sources > 6) + * 100 * last_over_time(long_tail_asset_threshold{type="high"}[1h]) + ) }} + {{ $deviating }} out of {{ $total }} sources for {{ $labels.pair }} have deviated from the onchain price beyond {{ $threshold }}%. - name: API rules: diff --git a/src/config.rs b/src/config.rs index 24bcd32..243787b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -20,7 +20,7 @@ use url::Url; use crate::{ constants::{CONFIG_UPDATE_INTERVAL, LONG_TAIL_ASSETS, LONG_TAIL_ASSET_THRESHOLD}, - utils::try_felt_to_u32, + utils::{is_long_tail_asset, try_felt_to_u32}, }; #[derive(Debug, Clone, EnumString, IntoStaticStr)] @@ -451,10 +451,28 @@ async fn init_future_config( /// fetched from LONG_TAIL_ASSETS. /// TODO: LONG_TAIL_ASSETS should be an independent (db, yaml...) configuration? pub fn init_long_tail_asset_configuration() { - for (pair, threshold) in LONG_TAIL_ASSETS.iter() { + for (pair, (threshold_low, threshold_high)) in LONG_TAIL_ASSETS.iter() { LONG_TAIL_ASSET_THRESHOLD - .with_label_values(&[pair]) - .set(*threshold); + .with_label_values(&[pair, "low"]) + .set(*threshold_low); + LONG_TAIL_ASSET_THRESHOLD + .with_label_values(&[pair, "high"]) + .set(*threshold_high); + } +} + +#[allow(dead_code)] +/// Retrieves the long tail asset threshold configuration depending on the number of sources. +pub fn get_long_tail_threshold(pair: &str, number_of_sources: usize) -> Option { + if !is_long_tail_asset(pair) { + return None; + }; + let (threshold_low, threshold_high) = LONG_TAIL_ASSETS.get(pair).unwrap(); + + if number_of_sources <= 6 { + Some(*threshold_low) + } else { + Some(*threshold_high) } } diff --git a/src/constants.rs b/src/constants.rs index 957d5e1..6f129d9 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -25,12 +25,16 @@ lazy_static! { /// We should probably store them either in a yaml config file or a /// database (cons of a database => update the threshold/pairs without restarting /// the monitoring service). - pub static ref LONG_TAIL_ASSETS: HashMap = { + /// + /// Stores the threshold for when: + /// - `low`: the pair has 6 sources or less + /// - `high`: the pair has more than 6 sources. + pub static ref LONG_TAIL_ASSETS: HashMap = { let mut map = HashMap::new(); - map.insert("ZEND/USD".to_string(), 0.05); - map.insert("NSTR/USD".to_string(), 0.05); - map.insert("LUSD/USD".to_string(), 0.05); - map.insert("LORDS/USD".to_string(), 0.05); + map.insert("ZEND/USD".to_string(), (0.05, 0.025)); + map.insert("NSTR/USD".to_string(), (0.05, 0.025)); + map.insert("LUSD/USD".to_string(), (0.05, 0.025)); + map.insert("LORDS/USD".to_string(), (0.05, 0.025)); map }; @@ -54,18 +58,25 @@ lazy_static! { "long_tail_asset_threshold", "Deviation threshold configuration for long tail assets" ), - &["pair"] + &["pair", "type"] ) .unwrap(); - pub static ref LONG_TAIL_ASSET_DEVIATION: GaugeVec = register_gauge_vec!( + pub static ref LONG_TAIL_ASSET_DEVIATING_SOURCES: GaugeVec = register_gauge_vec!( opts!( - "long_tail_asset_deviation", - "Deviation between two sources for long tail assets" + "long_tail_asset_deviating_sources", + "Number of sources deviating for long tail assets" ), - &["network", "pair", "type", "source1", "source2"] - ) - .unwrap(); + &["network", "pair"] + ).unwrap(); + + pub static ref LONG_TAIL_ASSET_TOTAL_SOURCES: GaugeVec = register_gauge_vec!( + opts!( + "long_tail_asset_total_sources", + "Total number of sources for long tail assets" + ), + &["network", "pair"] + ).unwrap(); // Regular metrics below diff --git a/src/main.rs b/src/main.rs index 4ded71c..1449b79 100644 --- a/src/main.rs +++ b/src/main.rs @@ -179,6 +179,7 @@ pub(crate) async fn onchain_monitor( ] } } + // TODO: Long tail assets aren't treated as such for Future data DataType::Future => { vec![ tokio::spawn(Box::pin(processing::future::process_data_by_pair( diff --git a/src/processing/spot.rs b/src/processing/spot.rs index 09308cf..3700d8f 100644 --- a/src/processing/spot.rs +++ b/src/processing/spot.rs @@ -2,9 +2,9 @@ extern crate diesel; extern crate dotenv; use crate::config::get_config; +use crate::config::get_long_tail_threshold; use crate::config::DataType; use crate::config::NetworkName; -use crate::constants::LONG_TAIL_ASSET_DEVIATION; use crate::constants::NUM_SOURCES; use crate::constants::ON_OFF_PRICE_DEVIATION; use crate::constants::PAIR_PRICE; @@ -12,6 +12,7 @@ use crate::constants::PRICE_DEVIATION; use crate::constants::PRICE_DEVIATION_SOURCE; use crate::constants::TIME_SINCE_LAST_UPDATE_PAIR_ID; use crate::constants::TIME_SINCE_LAST_UPDATE_PUBLISHER; +use crate::constants::{LONG_TAIL_ASSET_DEVIATING_SOURCES, LONG_TAIL_ASSET_TOTAL_SOURCES}; use crate::diesel::QueryDsl; use crate::error::MonitoringError; use crate::models::SpotEntry; @@ -217,53 +218,82 @@ pub async fn process_long_tail_asset( pair: String, sources: Vec, ) -> Result { + let config = get_config(None).await; + let network_env = &config.network_str(); + let decimals = *config.decimals(DataType::Spot).get(&pair).unwrap(); + + let mut timestamps = Vec::with_capacity(sources.len()); + let mut deviations = Vec::with_capacity(sources.len()); + + for source in sources.iter() { + log::info!("Processing data for pair: {} and source: {}", pair, source); + let (timestamp, price_deviation) = + get_price_deviation_for_source_from_chain(pool.clone(), &pair, source, decimals) + .await?; + timestamps.push(timestamp); + deviations.push(price_deviation); + } + + let threshold = get_long_tail_threshold(&pair, sources.len()).unwrap(); + + // Count deviating sources + let deviating_sources = deviations + .iter() + .filter(|&&deviation| deviation > threshold) + .count(); + + // Set the metric for the number of deviating sources + LONG_TAIL_ASSET_DEVIATING_SOURCES + .with_label_values(&[network_env, &pair]) + .set(deviating_sources as f64); + + // Set the metric for the total number of sources + LONG_TAIL_ASSET_TOTAL_SOURCES + .with_label_values(&[network_env, &pair]) + .set(sources.len() as f64); + + Ok(timestamps.last().copied().unwrap()) +} + +pub async fn get_price_deviation_for_source_from_chain( + pool: deadpool::managed::Pool>, + pair: &str, + source: &str, + decimals: u32, +) -> Result<(u64, f64), MonitoringError> { let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; let config = get_config(None).await; - let network_env = &config.network_str(); - let data_type = "spot"; - let decimals = *config.decimals(DataType::Spot).get(&pair).unwrap(); + let filtered_by_source_result: Result = match config.network().name { + NetworkName::Testnet => { + testnet_dsl::spot_entry + .filter(testnet_dsl::pair_id.eq(pair)) + .filter(testnet_dsl::source.eq(source)) + .order(testnet_dsl::block_timestamp.desc()) + .first(&mut conn) + .await + } + NetworkName::Mainnet => { + mainnet_dsl::mainnet_spot_entry + .filter(mainnet_dsl::pair_id.eq(pair)) + .filter(mainnet_dsl::source.eq(source)) + .order(mainnet_dsl::block_timestamp.desc()) + .first(&mut conn) + .await + } + }; - let mut prices = Vec::new(); - for src in &sources { - let result: Result = match config.network().name { - NetworkName::Testnet => { - testnet_dsl::spot_entry - .filter(testnet_dsl::pair_id.eq(&pair)) - .filter(testnet_dsl::source.eq(src)) - .order(testnet_dsl::block_timestamp.desc()) - .first(&mut conn) - .await - } - NetworkName::Mainnet => { - mainnet_dsl::mainnet_spot_entry - .filter(mainnet_dsl::pair_id.eq(&pair)) - .filter(mainnet_dsl::source.eq(src)) - .order(mainnet_dsl::block_timestamp.desc()) - .first(&mut conn) - .await - } - }; - - if let Ok(data) = result { + match filtered_by_source_result { + Ok(data) => { + let time = time_since_last_update(&data); let price_as_f64 = data.price.to_f64().ok_or(MonitoringError::Price( "Failed to convert price to f64".to_string(), ))?; let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64; - prices.push((src.clone(), normalized_price)); - } - } - - // Calculate deviations between sources - for (i, (src1, price1)) in prices.iter().enumerate() { - for (src2, price2) in prices.iter().skip(i + 1) { - let deviation = (price1 - price2).abs() / price1; - LONG_TAIL_ASSET_DEVIATION - .with_label_values(&[network_env, &pair, data_type, src1, src2]) - .set(deviation); + let (source_deviation, _) = source_deviation(&data, normalized_price).await?; + Ok((time, source_deviation)) } + Err(e) => Err(e.into()), } - - Ok(0) }