From b1842e05a3faa224eefaa47afe5e7079588acc17 Mon Sep 17 00:00:00 2001 From: akhercha Date: Fri, 26 Jul 2024 15:02:18 +0200 Subject: [PATCH] fix(long_tail_assets): fixes from review --- src/processing/future.rs | 136 ++++++++++++---------------- src/processing/spot.rs | 186 +++++++++++++++++---------------------- src/types.rs | 17 ++++ 3 files changed, 156 insertions(+), 183 deletions(-) diff --git a/src/processing/future.rs b/src/processing/future.rs index fefeb6e..9785e08 100644 --- a/src/processing/future.rs +++ b/src/processing/future.rs @@ -35,54 +35,48 @@ pub async fn process_data_by_pair( let config = get_config(None).await; - let result: Result = match config.network().name { + let data: FutureEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::future_entry .filter(testnet_dsl::pair_id.eq(pair.clone())) .order(testnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } NetworkName::Mainnet => { mainnet_dsl::mainnet_future_entry .filter(mainnet_dsl::pair_id.eq(pair.clone())) .order(mainnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } }; log::info!("Processing data for pair: {}", pair); - match result { - Ok(data) => { - let network_env = &config.network_str(); - let data_type = "future"; + let network_env = &config.network_str(); + let data_type = "future"; - let seconds_since_last_publish = time_since_last_update(&data); - let time_labels = - TIME_SINCE_LAST_UPDATE_PAIR_ID.with_label_values(&[network_env, &pair, data_type]); - let num_sources_labels = - NUM_SOURCES.with_label_values(&[network_env, &pair, data_type]); + let seconds_since_last_publish = time_since_last_update(&data); + let time_labels = + TIME_SINCE_LAST_UPDATE_PAIR_ID.with_label_values(&[network_env, &pair, data_type]); + let num_sources_labels = NUM_SOURCES.with_label_values(&[network_env, &pair, data_type]); - time_labels.set(seconds_since_last_publish as f64); + time_labels.set(seconds_since_last_publish as f64); - let (on_off_deviation, num_sources_aggregated) = on_off_price_deviation( - pair.clone(), - data.timestamp.timestamp() as u64, - DataType::Future, - ) - .await?; + let (on_off_deviation, num_sources_aggregated) = on_off_price_deviation( + pair.clone(), + data.timestamp.timestamp() as u64, + DataType::Future, + ) + .await?; - ON_OFF_PRICE_DEVIATION - .with_label_values(&[network_env, &pair.clone(), data_type]) - .set(on_off_deviation); - num_sources_labels.set(num_sources_aggregated as i64); + ON_OFF_PRICE_DEVIATION + .with_label_values(&[network_env, &pair.clone(), data_type]) + .set(on_off_deviation); + num_sources_labels.set(num_sources_aggregated as i64); - Ok(seconds_since_last_publish) - } - Err(e) => Err(e.into()), - } + Ok(seconds_since_last_publish) } pub async fn process_data_by_pair_and_sources( @@ -118,14 +112,14 @@ pub async fn process_data_by_pair_and_source( let config = get_config(None).await; - let filtered_by_source_result: Result = match config.network().name { + let data: FutureEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::future_entry .filter(testnet_dsl::pair_id.eq(pair)) .filter(testnet_dsl::source.eq(src)) .order(testnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } NetworkName::Mainnet => { mainnet_dsl::mainnet_future_entry @@ -133,41 +127,35 @@ pub async fn process_data_by_pair_and_source( .filter(mainnet_dsl::source.eq(src)) .order(mainnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } }; - match filtered_by_source_result { - Ok(data) => { - let network_env = &config.network_str(); - let data_type = "future"; - - // Get the labels - let price_labels = PAIR_PRICE.with_label_values(&[network_env, pair, src, data_type]); - let deviation_labels = - PRICE_DEVIATION.with_label_values(&[network_env, pair, src, data_type]); - let source_deviation_labels = - PRICE_DEVIATION_SOURCE.with_label_values(&[network_env, pair, src, data_type]); - - // Compute metrics - let time = time_since_last_update(&data); - let price_as_f64 = data.price.to_f64().ok_or(MonitoringError::Price( - "Failed to convert price to f64".to_string(), - ))?; - let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64; - - let deviation = price_deviation(&data, normalized_price).await?; - let (source_deviation, _) = source_deviation(&data, normalized_price).await?; - - // Set the metrics - price_labels.set(normalized_price); - deviation_labels.set(deviation); - source_deviation_labels.set(source_deviation); - - Ok(time) - } - Err(e) => Err(e.into()), - } + let network_env = &config.network_str(); + let data_type = "future"; + + // Get the labels + let price_labels = PAIR_PRICE.with_label_values(&[network_env, pair, src, data_type]); + let deviation_labels = PRICE_DEVIATION.with_label_values(&[network_env, pair, src, data_type]); + let source_deviation_labels = + PRICE_DEVIATION_SOURCE.with_label_values(&[network_env, pair, src, data_type]); + + // Compute metrics + let time = time_since_last_update(&data); + let price_as_f64 = data.price.to_f64().ok_or(MonitoringError::Price( + "Failed to convert price to f64".to_string(), + ))?; + let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64; + + let deviation = price_deviation(&data, normalized_price).await?; + let (source_deviation, _) = source_deviation(&data, normalized_price).await?; + + // Set the metrics + price_labels.set(normalized_price); + deviation_labels.set(deviation); + source_deviation_labels.set(source_deviation); + + Ok(time) } pub async fn process_data_by_publisher( @@ -178,40 +166,32 @@ pub async fn process_data_by_publisher( let config = get_config(None).await; - let result: Result = match config.network().name { + let data: FutureEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::future_entry .filter(testnet_dsl::publisher.eq(publisher.clone())) .order(testnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } NetworkName::Mainnet => { mainnet_dsl::mainnet_future_entry .filter(mainnet_dsl::publisher.eq(publisher.clone())) .order(mainnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } }; log::info!("Processing data for publisher: {}", publisher); - match result { - Ok(data) => { - let network_env = &config.network_str(); + let network_env = &config.network_str(); - let seconds_since_last_publish = time_since_last_update(&data); - let time_labels = TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[ - network_env, - &publisher, - "future", - ]); + let seconds_since_last_publish = time_since_last_update(&data); + let time_labels = + TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[network_env, &publisher, "future"]); - time_labels.set(seconds_since_last_publish as f64); + time_labels.set(seconds_since_last_publish as f64); - Ok(()) - } - Err(e) => Err(e.into()), - } + Ok(()) } diff --git a/src/processing/spot.rs b/src/processing/spot.rs index ce32016..21166ac 100644 --- a/src/processing/spot.rs +++ b/src/processing/spot.rs @@ -28,6 +28,7 @@ use diesel::ExpressionMethods; use diesel_async::pooled_connection::AsyncDieselConnectionManager; use diesel_async::AsyncPgConnection; use diesel_async::RunQueryDsl; +use pragma_monitoring::types::Deviation; pub async fn process_data_by_pair( pool: deadpool::managed::Pool>, @@ -37,53 +38,47 @@ pub async fn process_data_by_pair( let config = get_config(None).await; - let result: Result = match config.network().name { + let data: SpotEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::spot_entry .filter(testnet_dsl::pair_id.eq(pair.clone())) .order(testnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } NetworkName::Mainnet => { mainnet_dsl::mainnet_spot_entry .filter(mainnet_dsl::pair_id.eq(pair.clone())) .order(mainnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } }; log::info!("Processing data for pair: {}", pair); - match result { - Ok(data) => { - let network_env = &config.network_str(); - let data_type = "spot"; - - let seconds_since_last_publish = time_since_last_update(&data); - let time_labels = - TIME_SINCE_LAST_UPDATE_PAIR_ID.with_label_values(&[network_env, &pair, data_type]); - let num_sources_labels = - NUM_SOURCES.with_label_values(&[network_env, &pair, data_type]); - - let (on_off_deviation, num_sources_aggregated) = on_off_price_deviation( - pair.clone(), - data.timestamp.timestamp() as u64, - DataType::Spot, - ) - .await?; - - ON_OFF_PRICE_DEVIATION - .with_label_values(&[network_env, &pair.clone(), data_type]) - .set(on_off_deviation); - time_labels.set(seconds_since_last_publish as f64); - num_sources_labels.set(num_sources_aggregated as i64); - - Ok(seconds_since_last_publish) - } - Err(e) => Err(e.into()), - } + let network_env = &config.network_str(); + let data_type = "spot"; + + let seconds_since_last_publish = time_since_last_update(&data); + let time_labels = + TIME_SINCE_LAST_UPDATE_PAIR_ID.with_label_values(&[network_env, &pair, data_type]); + let num_sources_labels = NUM_SOURCES.with_label_values(&[network_env, &pair, data_type]); + + let (on_off_deviation, num_sources_aggregated) = on_off_price_deviation( + pair.clone(), + data.timestamp.timestamp() as u64, + DataType::Spot, + ) + .await?; + + ON_OFF_PRICE_DEVIATION + .with_label_values(&[network_env, &pair.clone(), data_type]) + .set(on_off_deviation); + time_labels.set(seconds_since_last_publish as f64); + num_sources_labels.set(num_sources_aggregated as i64); + + Ok(seconds_since_last_publish) } pub async fn process_data_by_pair_and_sources( @@ -115,14 +110,14 @@ pub async fn process_data_by_pair_and_source( let config = get_config(None).await; - let filtered_by_source_result: Result = match config.network().name { + let data: SpotEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::spot_entry .filter(testnet_dsl::pair_id.eq(pair)) .filter(testnet_dsl::source.eq(src)) .order(testnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } NetworkName::Mainnet => { mainnet_dsl::mainnet_spot_entry @@ -130,41 +125,35 @@ pub async fn process_data_by_pair_and_source( .filter(mainnet_dsl::source.eq(src)) .order(mainnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } }; - match filtered_by_source_result { - Ok(data) => { - let network_env = &config.network_str(); - let data_type = "spot"; - - // Get the labels - let price_labels = PAIR_PRICE.with_label_values(&[network_env, pair, src, data_type]); - let deviation_labels = - PRICE_DEVIATION.with_label_values(&[network_env, pair, src, data_type]); - let source_deviation_labels = - PRICE_DEVIATION_SOURCE.with_label_values(&[network_env, pair, src, data_type]); - - // Compute metrics - let time = time_since_last_update(&data); - let price_as_f64 = data.price.to_f64().ok_or(MonitoringError::Price( - "Failed to convert price to f64".to_string(), - ))?; - let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64; - - let deviation = price_deviation(&data, normalized_price).await?; - let (source_deviation, _) = source_deviation(&data, normalized_price).await?; - - // Set the metrics - price_labels.set(normalized_price); - deviation_labels.set(deviation); - source_deviation_labels.set(source_deviation); - - Ok(time) - } - Err(e) => Err(e.into()), - } + let network_env = &config.network_str(); + let data_type = "spot"; + + // Get the labels + let price_labels = PAIR_PRICE.with_label_values(&[network_env, pair, src, data_type]); + let deviation_labels = PRICE_DEVIATION.with_label_values(&[network_env, pair, src, data_type]); + let source_deviation_labels = + PRICE_DEVIATION_SOURCE.with_label_values(&[network_env, pair, src, data_type]); + + // Compute metrics + let time = time_since_last_update(&data); + let price_as_f64 = data.price.to_f64().ok_or(MonitoringError::Price( + "Failed to convert price to f64".to_string(), + ))?; + let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64; + + let deviation = price_deviation(&data, normalized_price).await?; + let (source_deviation, _) = source_deviation(&data, normalized_price).await?; + + // Set the metrics + price_labels.set(normalized_price); + deviation_labels.set(deviation); + source_deviation_labels.set(source_deviation); + + Ok(time) } pub async fn process_data_by_publisher( @@ -175,42 +164,34 @@ pub async fn process_data_by_publisher( let config = get_config(None).await; - let result: Result = match config.network().name { + let data: SpotEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::spot_entry .filter(testnet_dsl::publisher.eq(publisher.clone())) .order(testnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } NetworkName::Mainnet => { mainnet_dsl::mainnet_spot_entry .filter(mainnet_dsl::publisher.eq(publisher.clone())) .order(mainnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } }; log::info!("Processing data for publisher: {}", publisher); - match result { - Ok(data) => { - let network_env = &config.network_str(); + let network_env = &config.network_str(); - let seconds_since_last_publish = time_since_last_update(&data); - let time_labels = TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[ - network_env, - &publisher, - "spot", - ]); + let seconds_since_last_publish = time_since_last_update(&data); + let time_labels = + TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[network_env, &publisher, "spot"]); - time_labels.set(seconds_since_last_publish as f64); + time_labels.set(seconds_since_last_publish as f64); - Ok(()) - } - Err(e) => Err(e.into()), - } + Ok(()) } pub async fn process_long_tail_asset( @@ -222,25 +203,25 @@ pub async fn process_long_tail_asset( let network_env = &config.network_str(); let decimals = *config.decimals(DataType::Spot).get(&pair).unwrap(); - let mut timestamps = Vec::with_capacity(sources.len()); - let mut deviations = Vec::with_capacity(sources.len()); + let mut deviations: Vec = Vec::with_capacity(sources.len()); for source in sources.iter() { log::info!("Processing data for pair: {} and source: {}", pair, source); - let (timestamp, price_deviation) = + let deviation = get_price_deviation_for_source_from_chain(pool.clone(), &pair, source, decimals) .await?; - timestamps.push(timestamp); - deviations.push(price_deviation); + deviations.push(deviation); } + // NOTE: Safe to unwrap because we're sure that only long tails assets enter + // the [process_long_tail_asset] function because we used [is_long_tail_asset] earlier. let threshold = get_long_tail_threshold(&pair, sources.len()).unwrap(); // TODO: Maybe we should only consider recent sources, i.e in the last hour? // Count deviating sources let deviating_sources = deviations .iter() - .filter(|&&deviation| deviation >= threshold) + .filter(|&&deviation| deviation.price >= threshold) .count(); // Set the metric for the number of deviating sources @@ -253,7 +234,7 @@ pub async fn process_long_tail_asset( .with_label_values(&[network_env, &pair, "spot"]) .set(sources.len() as f64); - Ok(timestamps.last().copied().unwrap()) + Ok(deviations.last().copied().unwrap().time_since_last_update) } pub async fn get_price_deviation_for_source_from_chain( @@ -261,19 +242,19 @@ pub async fn get_price_deviation_for_source_from_chain( pair: &str, source: &str, decimals: u32, -) -> Result<(u64, f64), MonitoringError> { +) -> Result { let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; let config = get_config(None).await; - let filtered_by_source_result: Result = match config.network().name { + let data: SpotEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::spot_entry .filter(testnet_dsl::pair_id.eq(pair)) .filter(testnet_dsl::source.eq(source)) .order(testnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } NetworkName::Mainnet => { mainnet_dsl::mainnet_spot_entry @@ -281,20 +262,15 @@ pub async fn get_price_deviation_for_source_from_chain( .filter(mainnet_dsl::source.eq(source)) .order(mainnet_dsl::block_timestamp.desc()) .first(&mut conn) - .await + .await? } }; - match filtered_by_source_result { - Ok(data) => { - let time = time_since_last_update(&data); - let price_as_f64 = data.price.to_f64().ok_or(MonitoringError::Price( - "Failed to convert price to f64".to_string(), - ))?; - let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64; - let (source_deviation, _) = source_deviation(&data, normalized_price).await?; - Ok((time, source_deviation)) - } - Err(e) => Err(e.into()), - } + let time_since_last_update = time_since_last_update(&data); + let price_as_f64 = data.price.to_f64().ok_or(MonitoringError::Price( + "Failed to convert price to f64".to_string(), + ))?; + let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64; + let (source_deviation, _) = source_deviation(&data, normalized_price).await?; + Ok(Deviation::new(source_deviation, time_since_last_update)) } diff --git a/src/types.rs b/src/types.rs index 9678231..6c00501 100644 --- a/src/types.rs +++ b/src/types.rs @@ -6,6 +6,23 @@ use crate::{ models::{FutureEntry, SpotEntry}, }; +#[allow(dead_code)] +#[derive(Debug, Clone, Copy)] +pub struct Deviation { + pub price: f64, + pub time_since_last_update: u64, +} + +#[allow(dead_code)] +impl Deviation { + pub fn new(price: f64, time_since_last_update: u64) -> Self { + Deviation { + price, + time_since_last_update, + } + } +} + #[allow(dead_code)] pub trait Entry { fn pair_id(&self) -> &str;