Skip to content

Commit

Permalink
fix(long_tail_assets): New alerts for long tail assets
Browse files Browse the repository at this point in the history
  • Loading branch information
akhercha committed Jul 24, 2024
1 parent 5435247 commit 718829f
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 60 deletions.
31 changes: 26 additions & 5 deletions prometheus/alerts.rules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,35 @@ groups:
summary: "Price deviation is too high"
description: "The median on-chain price of {{ $labels.pair }} has deviated for more than 2.5% with the reference price from DefiLlama."

- alert: LongTailAssetDeviation
expr: abs(long_tail_asset_deviation) > on(pair) group_left long_tail_asset_threshold
- alert: LongTailAssetCriticalDeviation
expr: |
(
long_tail_asset_deviating_sources >= 2
and
long_tail_asset_total_sources <= 6
)
or
(
long_tail_asset_deviating_sources / long_tail_asset_total_sources >= 0.25
and
long_tail_asset_total_sources > 6
)
for: 5m
labels:
severity: warning
severity: critical
annotations:
summary: "Long tail asset deviation is too high"
description: 'The deviation between sources for {{ $labels.pair }} ({{ $labels.type }}) from {{ $labels.source1 }} vs {{ $labels.source2 }} has exceeded the configured threshold of {{ $value | printf "%.2f" }}.'
summary: "Long tail asset critical deviation detected"
description: |
{{ $deviating := printf "%.0f" $value }}
{{ $total := printf "%.0f" (long_tail_asset_total_sources) }}
{{ $threshold := printf "%.2f" (
(long_tail_asset_total_sources <= 6)
* 100 * last_over_time(long_tail_asset_threshold{type="low"}[1h])
or
(long_tail_asset_total_sources > 6)
* 100 * last_over_time(long_tail_asset_threshold{type="high"}[1h])
) }}
{{ $deviating }} out of {{ $total }} sources for {{ $labels.pair }} have deviated from the onchain price beyond {{ $threshold }}%.
- name: API
rules:
Expand Down
26 changes: 22 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use url::Url;

use crate::{
constants::{CONFIG_UPDATE_INTERVAL, LONG_TAIL_ASSETS, LONG_TAIL_ASSET_THRESHOLD},
utils::try_felt_to_u32,
utils::{is_long_tail_asset, try_felt_to_u32},
};

#[derive(Debug, Clone, EnumString, IntoStaticStr)]
Expand Down Expand Up @@ -451,10 +451,28 @@ async fn init_future_config(
/// fetched from LONG_TAIL_ASSETS.
/// TODO: LONG_TAIL_ASSETS should be an independent (db, yaml...) configuration?
pub fn init_long_tail_asset_configuration() {
for (pair, threshold) in LONG_TAIL_ASSETS.iter() {
for (pair, (threshold_low, threshold_high)) in LONG_TAIL_ASSETS.iter() {
LONG_TAIL_ASSET_THRESHOLD
.with_label_values(&[pair])
.set(*threshold);
.with_label_values(&[pair, "low"])
.set(*threshold_low);
LONG_TAIL_ASSET_THRESHOLD
.with_label_values(&[pair, "high"])
.set(*threshold_high);
}
}

#[allow(dead_code)]
/// Retrieves the long tail asset threshold configuration depending on the number of sources.
pub fn get_long_tail_threshold(pair: &str, number_of_sources: usize) -> Option<f64> {
if !is_long_tail_asset(pair) {
return None;
};
let (threshold_low, threshold_high) = LONG_TAIL_ASSETS.get(pair).unwrap();

if number_of_sources <= 6 {
Some(*threshold_low)
} else {
Some(*threshold_high)
}
}

Expand Down
35 changes: 23 additions & 12 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ lazy_static! {
/// We should probably store them either in a yaml config file or a
/// database (cons of a database => update the threshold/pairs without restarting
/// the monitoring service).
pub static ref LONG_TAIL_ASSETS: HashMap<String, f64> = {
///
/// Stores the threshold for when:
/// - `low`: the pair has 6 sources or less
/// - `high`: the pair has more than 6 sources.
pub static ref LONG_TAIL_ASSETS: HashMap<String, (f64, f64)> = {
let mut map = HashMap::new();
map.insert("ZEND/USD".to_string(), 0.05);
map.insert("NSTR/USD".to_string(), 0.05);
map.insert("LUSD/USD".to_string(), 0.05);
map.insert("LORDS/USD".to_string(), 0.05);
map.insert("ZEND/USD".to_string(), (0.05, 0.025));
map.insert("NSTR/USD".to_string(), (0.05, 0.025));
map.insert("LUSD/USD".to_string(), (0.05, 0.025));
map.insert("LORDS/USD".to_string(), (0.05, 0.025));
map
};

Expand All @@ -54,18 +58,25 @@ lazy_static! {
"long_tail_asset_threshold",
"Deviation threshold configuration for long tail assets"
),
&["pair"]
&["pair", "type"]
)
.unwrap();

pub static ref LONG_TAIL_ASSET_DEVIATION: GaugeVec = register_gauge_vec!(
pub static ref LONG_TAIL_ASSET_DEVIATING_SOURCES: GaugeVec = register_gauge_vec!(
opts!(
"long_tail_asset_deviation",
"Deviation between two sources for long tail assets"
"long_tail_asset_deviating_sources",
"Number of sources deviating for long tail assets"
),
&["network", "pair", "type", "source1", "source2"]
)
.unwrap();
&["network", "pair"]
).unwrap();

pub static ref LONG_TAIL_ASSET_TOTAL_SOURCES: GaugeVec = register_gauge_vec!(
opts!(
"long_tail_asset_total_sources",
"Total number of sources for long tail assets"
),
&["network", "pair"]
).unwrap();

// Regular metrics below

Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ pub(crate) async fn onchain_monitor(
]
}
}
// TODO: Long tail assets aren't treated as such for Future data
DataType::Future => {
vec![
tokio::spawn(Box::pin(processing::future::process_data_by_pair(
Expand Down
108 changes: 69 additions & 39 deletions src/processing/spot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ extern crate diesel;
extern crate dotenv;

use crate::config::get_config;
use crate::config::get_long_tail_threshold;
use crate::config::DataType;
use crate::config::NetworkName;
use crate::constants::LONG_TAIL_ASSET_DEVIATION;
use crate::constants::NUM_SOURCES;
use crate::constants::ON_OFF_PRICE_DEVIATION;
use crate::constants::PAIR_PRICE;
use crate::constants::PRICE_DEVIATION;
use crate::constants::PRICE_DEVIATION_SOURCE;
use crate::constants::TIME_SINCE_LAST_UPDATE_PAIR_ID;
use crate::constants::TIME_SINCE_LAST_UPDATE_PUBLISHER;
use crate::constants::{LONG_TAIL_ASSET_DEVIATING_SOURCES, LONG_TAIL_ASSET_TOTAL_SOURCES};
use crate::diesel::QueryDsl;
use crate::error::MonitoringError;
use crate::models::SpotEntry;
Expand Down Expand Up @@ -217,53 +218,82 @@ pub async fn process_long_tail_asset(
pair: String,
sources: Vec<String>,
) -> Result<u64, MonitoringError> {
let config = get_config(None).await;
let network_env = &config.network_str();
let decimals = *config.decimals(DataType::Spot).get(&pair).unwrap();

let mut timestamps = Vec::with_capacity(sources.len());
let mut deviations = Vec::with_capacity(sources.len());

for source in sources.iter() {
log::info!("Processing data for pair: {} and source: {}", pair, source);
let (timestamp, price_deviation) =
get_price_deviation_for_source_from_chain(pool.clone(), &pair, source, decimals)
.await?;
timestamps.push(timestamp);
deviations.push(price_deviation);
}

let threshold = get_long_tail_threshold(&pair, sources.len()).unwrap();

// Count deviating sources
let deviating_sources = deviations
.iter()
.filter(|&&deviation| deviation > threshold)
.count();

// Set the metric for the number of deviating sources
LONG_TAIL_ASSET_DEVIATING_SOURCES
.with_label_values(&[network_env, &pair])
.set(deviating_sources as f64);

// Set the metric for the total number of sources
LONG_TAIL_ASSET_TOTAL_SOURCES
.with_label_values(&[network_env, &pair])
.set(sources.len() as f64);

Ok(timestamps.last().copied().unwrap())
}

pub async fn get_price_deviation_for_source_from_chain(
pool: deadpool::managed::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
pair: &str,
source: &str,
decimals: u32,
) -> Result<(u64, f64), MonitoringError> {
let mut conn = pool.get().await.map_err(MonitoringError::Connection)?;

let config = get_config(None).await;
let network_env = &config.network_str();
let data_type = "spot";

let decimals = *config.decimals(DataType::Spot).get(&pair).unwrap();
let filtered_by_source_result: Result<SpotEntry, _> = match config.network().name {
NetworkName::Testnet => {
testnet_dsl::spot_entry
.filter(testnet_dsl::pair_id.eq(pair))
.filter(testnet_dsl::source.eq(source))
.order(testnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
}
NetworkName::Mainnet => {
mainnet_dsl::mainnet_spot_entry
.filter(mainnet_dsl::pair_id.eq(pair))
.filter(mainnet_dsl::source.eq(source))
.order(mainnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
}
};

let mut prices = Vec::new();
for src in &sources {
let result: Result<SpotEntry, _> = match config.network().name {
NetworkName::Testnet => {
testnet_dsl::spot_entry
.filter(testnet_dsl::pair_id.eq(&pair))
.filter(testnet_dsl::source.eq(src))
.order(testnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
}
NetworkName::Mainnet => {
mainnet_dsl::mainnet_spot_entry
.filter(mainnet_dsl::pair_id.eq(&pair))
.filter(mainnet_dsl::source.eq(src))
.order(mainnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
}
};

if let Ok(data) = result {
match filtered_by_source_result {
Ok(data) => {
let time = time_since_last_update(&data);
let price_as_f64 = data.price.to_f64().ok_or(MonitoringError::Price(
"Failed to convert price to f64".to_string(),
))?;
let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64;
prices.push((src.clone(), normalized_price));
}
}

// Calculate deviations between sources
for (i, (src1, price1)) in prices.iter().enumerate() {
for (src2, price2) in prices.iter().skip(i + 1) {
let deviation = (price1 - price2).abs() / price1;
LONG_TAIL_ASSET_DEVIATION
.with_label_values(&[network_env, &pair, data_type, src1, src2])
.set(deviation);
let (source_deviation, _) = source_deviation(&data, normalized_price).await?;
Ok((time, source_deviation))
}
Err(e) => Err(e.into()),
}

Ok(0)
}

0 comments on commit 718829f

Please sign in to comment.