diff --git a/Cargo.lock b/Cargo.lock index 05d22c7..dcab050 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -64,6 +64,17 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-trait" version = "0.1.74" @@ -293,6 +304,15 @@ dependencies = [ "inout", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -327,6 +347,30 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crunchy" version = "0.2.2" @@ -609,6 +653,27 @@ dependencies = [ "uint", ] +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -1227,6 +1292,30 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "moka" +version = "0.12.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" +dependencies = [ + "async-lock", + "async-trait", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "event-listener", + "futures-util", + "once_cell", + "parking_lot", + "quanta", + "rustc_version", + "smallvec", + "tagptr", + "thiserror 1.0.50", + "triomphe", + "uuid 1.6.1", +] + [[package]] name = "multer" version = "2.1.0" @@ -1404,6 +1493,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1591,6 +1686,7 @@ dependencies = [ "futures", "hyper", "lazy_static", + "moka", "num-bigint", "phf", "prometheus", @@ -1684,6 +1780,21 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.36" @@ -1729,6 +1840,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.4.1", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -2500,6 +2620,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -2852,6 +2978,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "triomphe" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" + [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index 2f9c9f6..65cd909 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ env_logger = "0.10.1" futures = "0.3.28" hyper = "0.14.27" lazy_static = "1.4.0" +moka = { version = "0.12.8", features = ["future"] } num-bigint = "0.4" phf = { version = "0.11", features = ["macros"] } prometheus = "0.13.3" diff --git a/src/main.rs b/src/main.rs index 4655400..69d15ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,13 +33,17 @@ use deadpool::managed::Pool; use diesel_async::pooled_connection::AsyncDieselConnectionManager; use diesel_async::AsyncPgConnection; use dotenv::dotenv; +use moka::future::Cache; +use monitoring::price_deviation::CoinPricesDTO; use tokio::task::JoinHandle; use tokio::time::interval; use config::{get_config, init_long_tail_asset_configuration, periodic_config_update, DataType}; use processing::common::{check_publisher_balance, data_indexers_are_synced}; +use tracing::instrument; use utils::{is_long_tail_asset, log_monitoring_results, log_tasks_results}; +#[derive(Debug)] struct MonitoringTask { name: String, handle: JoinHandle<()>, @@ -77,9 +81,12 @@ async fn main() { handle_task_results(monitoring_tasks).await; } +#[instrument(skip_all)] async fn spawn_monitoring_tasks( pool: Pool>, ) -> Vec { + let cache = Cache::new(10_000); + let tasks = vec![ MonitoringTask { name: "Config Update".to_string(), @@ -87,11 +94,21 @@ async fn spawn_monitoring_tasks( }, MonitoringTask { name: "Spot Monitoring".to_string(), - handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Spot)), + handle: tokio::spawn(onchain_monitor( + pool.clone(), + true, + &DataType::Spot, + cache.clone(), + )), }, MonitoringTask { name: "Future Monitoring".to_string(), - handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Future)), + handle: tokio::spawn(onchain_monitor( + pool.clone(), + true, + &DataType::Future, + cache.clone(), + )), }, MonitoringTask { name: "Publisher Monitoring".to_string(), @@ -99,7 +116,7 @@ async fn spawn_monitoring_tasks( }, MonitoringTask { name: "API Monitoring".to_string(), - handle: tokio::spawn(api_monitor()), + handle: tokio::spawn(api_monitor(cache.clone())), }, MonitoringTask { name: "VRF Monitoring".to_string(), @@ -110,6 +127,7 @@ async fn spawn_monitoring_tasks( tasks } +#[instrument] async fn handle_task_results(tasks: Vec) { let mut results = HashMap::new(); for task in tasks { @@ -119,7 +137,8 @@ async fn handle_task_results(tasks: Vec) { log_monitoring_results(results); } -pub(crate) async fn api_monitor() { +#[instrument(skip(cache))] +pub(crate) async fn api_monitor(cache: Cache<(String, u64), CoinPricesDTO>) { let monitoring_config = get_config(None).await; tracing::info!("[API] Monitoring API.."); @@ -132,13 +151,14 @@ pub(crate) async fn api_monitor() { .sources(DataType::Spot) .iter() .flat_map(|(pair, sources)| { + let my_cache = cache.clone(); if is_long_tail_asset(pair) { vec![tokio::spawn(Box::pin( processing::api::process_long_tail_assets(pair.clone(), sources.clone()), ))] } else { vec![tokio::spawn(Box::pin( - processing::api::process_data_by_pair(pair.clone()), + processing::api::process_data_by_pair(pair.clone(), my_cache), ))] } }) @@ -153,10 +173,12 @@ pub(crate) async fn api_monitor() { } } +#[instrument(skip(pool, cache))] pub(crate) async fn onchain_monitor( pool: Pool>, wait_for_syncing: bool, data_type: &DataType, + cache: Cache<(String, u64), CoinPricesDTO>, ) { let monitoring_config = get_config(None).await; @@ -188,12 +210,14 @@ pub(crate) async fn onchain_monitor( tokio::spawn(Box::pin(processing::spot::process_data_by_pair( pool.clone(), pair.clone(), + cache.clone(), ))), tokio::spawn(Box::pin( processing::spot::process_data_by_pair_and_sources( pool.clone(), pair.clone(), sources.to_vec(), + cache.clone(), ), )), ] @@ -205,12 +229,14 @@ pub(crate) async fn onchain_monitor( tokio::spawn(Box::pin(processing::future::process_data_by_pair( pool.clone(), pair.clone(), + cache.clone(), ))), tokio::spawn(Box::pin( processing::future::process_data_by_pair_and_sources( pool.clone(), pair.clone(), sources.to_vec(), + cache.clone(), ), )), ] @@ -223,6 +249,7 @@ pub(crate) async fn onchain_monitor( } } +#[instrument(skip(pool))] pub(crate) async fn publisher_monitor( pool: Pool>, wait_for_syncing: bool, @@ -266,6 +293,7 @@ pub(crate) async fn publisher_monitor( } } +#[instrument(skip(pool))] pub(crate) async fn vrf_monitor(pool: Pool>) { tracing::info!("[VRF] Monitoring VRF requests.."); diff --git a/src/monitoring/on_off_deviation.rs b/src/monitoring/on_off_deviation.rs index ac86020..9f5e979 100644 --- a/src/monitoring/on_off_deviation.rs +++ b/src/monitoring/on_off_deviation.rs @@ -1,4 +1,5 @@ use bigdecimal::ToPrimitive; +use moka::future::Cache; use starknet::{ core::{ types::{BlockId, BlockTag, Felt, FunctionCall}, @@ -16,6 +17,8 @@ use crate::{ utils::try_felt_to_u32, }; +use super::price_deviation::CoinPricesDTO; + /// On-chain price deviation from the reference price. /// Returns the deviation and the number of sources aggregated. /// @@ -33,6 +36,7 @@ pub async fn on_off_price_deviation( pair_id: String, timestamp: u64, data_type: DataType, + cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result<(f64, u32), MonitoringError> { let ids = &COINGECKO_IDS; let config = get_config(None).await; @@ -79,7 +83,8 @@ pub async fn on_off_price_deviation( DataType::Spot => { let coingecko_id = *ids.get(&pair_id).expect("Failed to get coingecko id"); - let coins_prices = query_defillama_api(timestamp, coingecko_id).await?; + let coins_prices = + query_defillama_api(timestamp, coingecko_id.to_owned(), cache).await?; let api_id = format!("coingecko:{}", coingecko_id); diff --git a/src/monitoring/price_deviation.rs b/src/monitoring/price_deviation.rs index 34ca235..5173f3b 100644 --- a/src/monitoring/price_deviation.rs +++ b/src/monitoring/price_deviation.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use moka::future::Cache; + use crate::{ constants::COINGECKO_IDS, error::MonitoringError, processing::common::query_defillama_api, types::Entry, @@ -17,13 +19,13 @@ use crate::{ // } // } // } -#[derive(serde::Deserialize, Debug)] +#[derive(serde::Deserialize, Debug, Clone)] pub struct CoinPricesDTO { coins: HashMap, } #[allow(unused)] -#[derive(serde::Deserialize, Debug)] +#[derive(serde::Deserialize, Debug, Clone)] pub struct CoinPriceDTO { price: f64, symbol: String, @@ -46,6 +48,7 @@ impl CoinPriceDTO { pub async fn price_deviation( query: &T, normalized_price: f64, + cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result { let ids = &COINGECKO_IDS; @@ -54,7 +57,8 @@ pub async fn price_deviation( let coins_prices = query_defillama_api( query.timestamp().timestamp().try_into().unwrap(), - coingecko_id, + coingecko_id.to_owned(), + cache, ) .await?; @@ -73,14 +77,19 @@ pub async fn price_deviation( } /// Calculates the raw deviation of the price from a trusted API (DefiLLama) -pub async fn raw_price_deviation(pair_id: &String, price: f64) -> Result { +pub async fn raw_price_deviation( + pair_id: &String, + price: f64, + cache: Cache<(String, u64), CoinPricesDTO>, +) -> Result { let ids = &COINGECKO_IDS; let coingecko_id = *ids.get(pair_id).expect("Failed to get coingecko id"); let coins_prices = query_defillama_api( chrono::Utc::now().timestamp().try_into().unwrap(), - coingecko_id, + coingecko_id.to_owned(), + cache, ) .await?; diff --git a/src/processing/api.rs b/src/processing/api.rs index a6ccdc4..a20217f 100644 --- a/src/processing/api.rs +++ b/src/processing/api.rs @@ -1,6 +1,7 @@ use std::time::Duration; use bigdecimal::{Num, ToPrimitive}; +use moka::future::Cache; use num_bigint::BigInt; use starknet::{ core::types::{BlockId, BlockTag}, @@ -14,12 +15,16 @@ use crate::{ }, error::MonitoringError, monitoring::{ - price_deviation::raw_price_deviation, time_since_last_update::raw_time_since_last_update, + price_deviation::{raw_price_deviation, CoinPricesDTO}, + time_since_last_update::raw_time_since_last_update, }, processing::common::query_pragma_api, }; -pub async fn process_data_by_pair(pair: String) -> Result<(), MonitoringError> { +pub async fn process_data_by_pair( + pair: String, + cache: Cache<(String, u64), CoinPricesDTO>, +) -> Result<(), MonitoringError> { // Query the Pragma API let config = get_config(None).await; let network_env = &config.network_str(); @@ -37,7 +42,7 @@ pub async fn process_data_by_pair(pair: String) -> Result<(), MonitoringError> { let normalized_price = parsed_price.to_string().parse::().unwrap() / 10_f64.powi(result.decimals as i32); - let price_deviation = raw_price_deviation(&pair, normalized_price).await?; + let price_deviation = raw_price_deviation(&pair, normalized_price, cache).await?; let time_since_last_update = raw_time_since_last_update(result.timestamp)?; API_PRICE_DEVIATION diff --git a/src/processing/common.rs b/src/processing/common.rs index f3539ea..e426b99 100644 --- a/src/processing/common.rs +++ b/src/processing/common.rs @@ -5,6 +5,7 @@ use crate::{ constants::{INDEXER_BLOCKS_LEFT, PUBLISHER_BALANCE}, error::MonitoringError, }; +use moka::future::Cache; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use serde::{Deserialize, Serialize}; use starknet::core::types::Felt; @@ -241,10 +242,16 @@ pub async fn query_pragma_api( /// else it will use the regular api endpoint. pub async fn query_defillama_api( timestamp: u64, - coingecko_id: &str, + coingecko_id: String, + cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result { let api_key = std::env::var("DEFILLAMA_API_KEY"); + if let Some(cached_value) = cache.get(&(coingecko_id.clone(), timestamp)).await { + tracing::info!("Using cached defillama value.."); + return Ok(cached_value); + } + let request_url = if let Ok(api_key) = api_key { format!( "https://pro-api.llama.fi/{apikey}/coins/prices/historical/{timestamp}/coingecko:{id}", @@ -269,12 +276,18 @@ pub async fn query_defillama_api( .await .map_err(|e| MonitoringError::Api(format!("Failed to get response text: {}", e)))?; - Ok(serde_json::from_str(&response_text).map_err(|e| { + let coin_prices: CoinPricesDTO = serde_json::from_str(&response_text).map_err(|e| { MonitoringError::Api(format!( "Failed to parse JSON: {}. Response: {}", e, response_text )) - })?) + })?; + + cache + .insert((coingecko_id, timestamp), coin_prices.clone()) + .await; + + Ok(coin_prices) } pub async fn check_publisher_balance( diff --git a/src/processing/future.rs b/src/processing/future.rs index 6c2b4bc..d6dc476 100644 --- a/src/processing/future.rs +++ b/src/processing/future.rs @@ -14,6 +14,7 @@ use crate::constants::TIME_SINCE_LAST_UPDATE_PUBLISHER; use crate::diesel::QueryDsl; use crate::error::MonitoringError; use crate::models::FutureEntry; +use crate::monitoring::price_deviation::CoinPricesDTO; use crate::monitoring::{ on_off_price_deviation, price_deviation, source_deviation, time_since_last_update, }; @@ -27,10 +28,12 @@ use diesel::ExpressionMethods; use diesel_async::pooled_connection::AsyncDieselConnectionManager; use diesel_async::AsyncPgConnection; use diesel_async::RunQueryDsl; +use moka::future::Cache; pub async fn process_data_by_pair( pool: deadpool::managed::Pool>, pair: String, + cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result { let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; @@ -76,6 +79,7 @@ pub async fn process_data_by_pair( pair.clone(), data.timestamp.timestamp() as u64, DataType::Future, + cache, ) .await?; @@ -91,6 +95,7 @@ pub async fn process_data_by_pair_and_sources( pool: deadpool::managed::Pool>, pair: String, sources: Vec, + cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result { let mut timestamps = Vec::new(); @@ -103,7 +108,9 @@ pub async fn process_data_by_pair_and_sources( for src in sources { tracing::info!("Processing data for pair: {} and source: {}", pair, src); - let res = process_data_by_pair_and_source(pool.clone(), &pair, &src, decimals).await?; + let res = + process_data_by_pair_and_source(pool.clone(), &pair, &src, decimals, cache.clone()) + .await?; timestamps.push(res); } @@ -115,6 +122,7 @@ pub async fn process_data_by_pair_and_source( pair: &str, src: &str, decimals: u32, + cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result { let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; @@ -163,7 +171,7 @@ pub async fn process_data_by_pair_and_source( ))?; let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64; - let deviation = price_deviation(&data, normalized_price).await?; + let deviation = price_deviation(&data, normalized_price, cache.clone()).await?; let (source_deviation, _) = source_deviation(&data, normalized_price).await?; // Set the metrics diff --git a/src/processing/spot.rs b/src/processing/spot.rs index c3be2eb..17a48cc 100644 --- a/src/processing/spot.rs +++ b/src/processing/spot.rs @@ -15,6 +15,7 @@ use crate::constants::{LONG_TAIL_ASSET_SOURCE_DEVIATION, LONG_TAIL_ASSET_TOTAL_S use crate::diesel::QueryDsl; use crate::error::MonitoringError; use crate::models::SpotEntry; +use crate::monitoring::price_deviation::CoinPricesDTO; use crate::monitoring::{ on_off_price_deviation, price_deviation, source_deviation, time_since_last_update, }; @@ -28,11 +29,13 @@ use diesel::ExpressionMethods; use diesel_async::pooled_connection::AsyncDieselConnectionManager; use diesel_async::AsyncPgConnection; use diesel_async::RunQueryDsl; +use moka::future::Cache; use pragma_monitoring::types::Deviation; pub async fn process_data_by_pair( pool: deadpool::managed::Pool>, pair: String, + cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result { let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; @@ -76,6 +79,7 @@ pub async fn process_data_by_pair( pair.clone(), data.timestamp.timestamp() as u64, DataType::Spot, + cache, ) .await?; @@ -92,6 +96,7 @@ pub async fn process_data_by_pair_and_sources( pool: deadpool::managed::Pool>, pair: String, sources: Vec, + cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result { let mut timestamps = Vec::new(); let config = get_config(None).await; @@ -100,7 +105,9 @@ pub async fn process_data_by_pair_and_sources( for src in sources { tracing::info!("Processing data for pair: {} and source: {}", pair, src); - let res = process_data_by_pair_and_source(pool.clone(), &pair, &src, decimals).await?; + let res = + process_data_by_pair_and_source(pool.clone(), &pair, &src, decimals, cache.clone()) + .await?; timestamps.push(res); } @@ -112,6 +119,7 @@ pub async fn process_data_by_pair_and_source( pair: &str, src: &str, decimals: u32, + cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result { let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; @@ -160,7 +168,7 @@ pub async fn process_data_by_pair_and_source( ))?; let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64; - let deviation = price_deviation(&data, normalized_price).await?; + let deviation = price_deviation(&data, normalized_price, cache).await?; let (source_deviation, _) = source_deviation(&data, normalized_price).await?; // Set the metrics diff --git a/src/tests/monitoring.rs b/src/tests/monitoring.rs index 8a0e1f8..54e53d2 100644 --- a/src/tests/monitoring.rs +++ b/src/tests/monitoring.rs @@ -11,6 +11,7 @@ use crate::{ use arc_swap::Guard; use deadpool::managed::Pool; use diesel_async::pooled_connection::AsyncDieselConnectionManager; +use moka::future::Cache; use rstest::rstest; use tokio::sync::Mutex; @@ -27,10 +28,12 @@ async fn detects_publisher_down( let database = Arc::new(Mutex::new(database)); let db_clone = database.clone(); + let cache = Cache::new(10_000); + // Spawn non-blocking monitor let monitor_handle = tokio::spawn(async move { let db = db_clone.lock().await; - onchain_monitor(db.clone(), false, &DataType::Spot).await; + onchain_monitor(db.clone(), false, &DataType::Spot, cache).await; }); // Publish a wrong price