From 27f6e1134832c25f939f7df646723bd98bb9aa89 Mon Sep 17 00:00:00 2001 From: Yasser Tahiri Date: Wed, 29 Jan 2025 20:33:11 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Update=20Dev=20Branch=20with=20new?= =?UTF-8?q?=20changes=20(#56)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * :wrench: Update configuration for Slack Alert Manager (#54) * :sparkles: Introduce Rate limiting for Coingecko integration (#55) --- benches/coingecko_benchmarks.rs | 45 +++++++++-- prometheus/alertmanager.yml | 22 +++--- prometheus/templates/slack.tmpl | 58 ++++++++++++++ src/coingecko.rs | 136 +++++++++++++++++++++++++++++--- src/constants.rs | 53 ++----------- 5 files changed, 243 insertions(+), 71 deletions(-) create mode 100644 prometheus/templates/slack.tmpl diff --git a/benches/coingecko_benchmarks.rs b/benches/coingecko_benchmarks.rs index 694c082..efb7ce0 100644 --- a/benches/coingecko_benchmarks.rs +++ b/benches/coingecko_benchmarks.rs @@ -1,7 +1,9 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use pragma_monitoring::coingecko::get_coingecko_mappings; use pragma_monitoring::constants::COINGECKO_IDS; use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use tokio::runtime::Runtime; // Create a mock initialization function instead of calling the API @@ -25,19 +27,23 @@ fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("coingecko_operations"); - // Benchmark simple lookup - group.bench_function("single_lookup", |b| { + // Configure longer sampling time for rate limited operations + group.measurement_time(Duration::from_secs(20)); + group.sample_size(10); + + // Benchmark simple lookup from ArcSwap + group.bench_function("arcswap_lookup", |b| { b.iter(|| { let mappings = COINGECKO_IDS.load(); black_box(mappings.get("BTC/USD").cloned()) }); }); - // Benchmark concurrent lookups with smaller load - group.bench_function("concurrent_lookups", |b| { + // Benchmark concurrent lookups from ArcSwap + group.bench_function("concurrent_arcswap_lookups", |b| { b.iter(|| { rt.block_on(async { - let handles: Vec<_> = (0..3) // Reduced to just 3 concurrent lookups + let handles: Vec<_> = (0..3) .map(|_| { tokio::spawn(async { let mappings = COINGECKO_IDS.load(); @@ -51,6 +57,35 @@ fn criterion_benchmark(c: &mut Criterion) { }); }); + // Benchmark rate-limited API calls + group.bench_function("rate_limited_api_calls", |b| { + b.iter(|| rt.block_on(async { black_box(get_coingecko_mappings().await) })); + }); + + // Benchmark cached lookups + group.bench_function("cached_lookups", |b| { + b.iter(|| { + rt.block_on(async { + // First call will cache, subsequent calls will use cache + let _ = get_coingecko_mappings().await; + black_box(get_coingecko_mappings().await) + }) + }); + }); + + // Benchmark concurrent rate-limited API calls + group.bench_function("concurrent_rate_limited_calls", |b| { + b.iter(|| { + rt.block_on(async { + let handles: Vec<_> = (0..3) + .map(|_| tokio::spawn(async { black_box(get_coingecko_mappings().await) })) + .collect(); + + futures::future::join_all(handles).await + }); + }); + }); + group.finish(); } diff --git a/prometheus/alertmanager.yml b/prometheus/alertmanager.yml index f978464..52764db 100644 --- a/prometheus/alertmanager.yml +++ b/prometheus/alertmanager.yml @@ -30,10 +30,10 @@ route: severity: critical continue: true - - receiver: "internal-warning" + - receiver: "Slack-internal" group_wait: 10s match_re: - severity: warning + severity: warning|critical continue: true - receiver: "public" @@ -42,26 +42,27 @@ route: severity: critical continue: true - - receiver: "Slack-internal" - group_wait: 10s - match_re: - severity: warning - continue: true - receivers: - name: "internal-warning" telegram_configs: - - bot_token: "${{TELEGRAM_TOKEN}}}}" + - bot_token: "${{TELEGRAM_TOKEN}}" chat_id: -1001904637278 parse_mode: "HTML" message: '{{ template "telegram.default.message" . }}' + - name: "Slack-internal" slack_configs: - channel: "#internal-warning" - send_resolved: false + send_resolved: true + title: '{{ template "slack.title" . }}' + icon_emoji: '{{ template "slack.icon_emoji" . }}' + color: '{{ template "slack.color" . }}' + text: '{{ template "slack.text" . }}' + - name: "internal-critical" opsgenie_configs: - api_key: "${{OPS_GENIE_API_KEY}}" + - name: "public" telegram_configs: - bot_token: "${{TELEGRAM_TOKEN}}" @@ -72,3 +73,4 @@ receivers: # The directory from which notification templates are read. templates: - "/config/templates/alert.tmpl" + - "/config/templates/slack.tmpl" diff --git a/prometheus/templates/slack.tmpl b/prometheus/templates/slack.tmpl new file mode 100644 index 0000000..de4ec5e --- /dev/null +++ b/prometheus/templates/slack.tmpl @@ -0,0 +1,58 @@ +{{/* Slack alert title */}} +{{ define "slack.title" -}} +[{{ .Status | toUpper }}{{if eq .Status "firing"}}:{{ .Alerts.Firing | len }}{{end}}] +{{ if eq .Status "firing" -}} + {{- if eq .CommonLabels.severity "critical" }}🔥 + {{- else if eq .CommonLabels.severity "warning" }}⚠️ + {{- else }}ℹ️ + {{- end }} +{{- else -}} + ✅ +{{- end }} +{{ .CommonLabels.alertname }} +{{- end }} + +{{/* Slack attachment color */}} +{{ define "slack.color" -}} +{{- if eq .Status "firing" -}} + {{- if eq .CommonLabels.severity "critical" -}} + danger + {{- else if eq .CommonLabels.severity "warning" -}} + warning + {{- else -}} + #439FE0 + {{- end -}} +{{- else -}} + good +{{- end -}} +{{- end }} + +{{/* Slack icon emoji */}} +{{ define "slack.icon_emoji" -}} +:alert: +{{- end }} + +{{/* Main alert text */}} +{{ define "slack.text" -}} +{{- range .Alerts -}} +*Alert Details:* +{{- if .Annotations.summary }} +• *Summary:* {{ .Annotations.summary }} +{{- end }} +{{- if .Annotations.description }} +• *Description:* {{ .Annotations.description }} +{{- end }} + +*Additional Information:* +• *Network:* {{ .Labels.network }} +• *Publisher:* {{ .Labels.publisher }} +• *Severity:* {{ .Labels.severity | toUpper }} +• *Type:* {{ .Labels.type }} +• *Started:* {{ .StartsAt | since }} + +{{- if .Labels.pair }} +• *Trading Pair:* {{ .Labels.pair }} +{{- end }} +--- +{{ end }} +{{- end }} diff --git a/src/coingecko.rs b/src/coingecko.rs index 17b8da8..d8c550d 100644 --- a/src/coingecko.rs +++ b/src/coingecko.rs @@ -1,14 +1,65 @@ +use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::RwLock; +use tokio::time::sleep; -#[derive(Debug, Serialize, Deserialize)] -pub struct Coin { - id: String, - symbol: String, +const RATE_LIMIT_PER_MINUTE: u32 = 30; // CoinGecko's free tier limit +const CACHE_DURATION: Duration = Duration::from_secs(300); // Cache for 5 minutes +const BACKOFF_BASE: Duration = Duration::from_secs(2); +const MAX_RETRIES: u32 = 3; + +#[derive(Debug, Clone)] +struct RateLimiter { + last_reset: Instant, + requests_made: u32, +} + +impl RateLimiter { + fn new() -> Self { + Self { + last_reset: Instant::now(), + requests_made: 0, + } + } + + async fn wait_if_needed(&mut self) { + let now = Instant::now(); + let elapsed = now.duration_since(self.last_reset); + + if elapsed >= Duration::from_secs(60) { + // Reset counter if a minute has passed + self.last_reset = now; + self.requests_made = 0; + } else if self.requests_made >= RATE_LIMIT_PER_MINUTE { + // Wait for the remainder of the minute if we've hit the limit + let wait_time = Duration::from_secs(60) - elapsed; + tracing::info!( + "Rate limit reached, waiting for {} seconds", + wait_time.as_secs() + ); + sleep(wait_time).await; + self.last_reset = Instant::now(); + self.requests_made = 0; + } + } + + fn increment(&mut self) { + self.requests_made += 1; + } } -fn to_usd_pair(symbol: String) -> String { - format!("{}/USD", symbol) +#[derive(Debug, Clone)] +struct CacheEntry { + data: Arc>, + timestamp: Instant, +} + +lazy_static! { + static ref RATE_LIMITER: Arc> = Arc::new(RwLock::new(RateLimiter::new())); + static ref CACHE: Arc>> = Arc::new(RwLock::new(None)); } #[derive(Debug, thiserror::Error)] @@ -19,14 +70,69 @@ pub enum CoinGeckoError { RequestFailed(String), #[error("Coingecko Coins Failed to parse response: {0}")] ParseError(#[from] reqwest::Error), + #[error("Rate limit exceeded")] + RateLimitExceeded, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Coin { + id: String, + symbol: String, } pub async fn get_coingecko_mappings() -> Result, CoinGeckoError> { + // Check cache first + if let Some(cache_entry) = CACHE.read().await.as_ref() { + if cache_entry.timestamp.elapsed() < CACHE_DURATION { + tracing::debug!("Returning cached CoinGecko mappings"); + return Ok((*cache_entry.data).clone()); + } + } + + // If not in cache, fetch with exponential backoff + let mut retry_count = 0; + let mut last_error = None; + + while retry_count < MAX_RETRIES { + match fetch_mappings().await { + Ok(mappings) => { + // Update cache + let cache_entry = CacheEntry { + data: Arc::new(mappings.clone()), + timestamp: Instant::now(), + }; + *CACHE.write().await = Some(cache_entry); + return Ok(mappings); + } + Err(e) => { + retry_count += 1; + last_error = Some(e); + + if retry_count < MAX_RETRIES { + let backoff = BACKOFF_BASE * 2u32.pow(retry_count - 1); + tracing::warn!( + "Attempt {} failed, retrying after {} seconds", + retry_count, + backoff.as_secs() + ); + sleep(backoff).await; + } + } + } + } + + Err(last_error.unwrap_or(CoinGeckoError::EmptyResponse)) +} + +async fn fetch_mappings() -> Result, CoinGeckoError> { let client = reqwest::Client::new(); let mut mappings = HashMap::new(); let mut page = 1; loop { + // Wait if we need to respect rate limits + RATE_LIMITER.write().await.wait_if_needed().await; + let url = format!( "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=100&page={}", page @@ -34,11 +140,21 @@ pub async fn get_coingecko_mappings() -> Result, CoinGec let response = client .get(&url) - .header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36") + .header( + "User-Agent", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" + ) .send() .await .map_err(CoinGeckoError::ParseError)?; + // Increment the rate limiter counter + RATE_LIMITER.write().await.increment(); + + if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS { + return Err(CoinGeckoError::RateLimitExceeded); + } + if !response.status().is_success() { return Err(CoinGeckoError::RequestFailed(response.status().to_string())); } @@ -51,9 +167,9 @@ pub async fn get_coingecko_mappings() -> Result, CoinGec break; } - coins.into_iter().for_each(|coin| { - mappings.insert(to_usd_pair(coin.symbol.to_uppercase()), coin.id); - }); + for coin in coins { + mappings.insert(format!("{}/USD", coin.symbol.to_uppercase()), coin.id); + } page += 1; } diff --git a/src/constants.rs b/src/constants.rs index be446c4..7479dd7 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -2,10 +2,7 @@ use arc_swap::ArcSwap; use lazy_static::lazy_static; use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; use std::collections::{HashMap, HashSet}; -use std::future::Future; use std::sync::Arc; -use std::time::Duration; -use tokio::time::sleep; use crate::coingecko::get_coingecko_mappings; pub(crate) static LOW_SOURCES_THRESHOLD: usize = 6; @@ -15,54 +12,18 @@ lazy_static! { ArcSwap::new(Arc::new(HashMap::new())); } -const MAX_RETRIES: u32 = 3; -const RETRY_DELAY: Duration = Duration::from_secs(2); - #[allow(dead_code)] pub async fn initialize_coingecko_mappings() { - let mappings = retry_with_backoff(get_coingecko_mappings) - .await - .unwrap_or_else(|e| { - tracing::error!( - "Failed to initialize CoinGecko mappings after {} retries: {}", - MAX_RETRIES, - e - ); + match get_coingecko_mappings().await { + Ok(mappings) => { + COINGECKO_IDS.store(Arc::new(mappings)); + tracing::info!("Successfully initialized CoinGecko mappings"); + } + Err(e) => { + tracing::error!("Failed to initialize CoinGecko mappings: {}", e); panic!("Cannot start monitoring without CoinGecko mappings: {}", e); - }); - - COINGECKO_IDS.store(Arc::new(mappings)); - tracing::info!("Successfully initialized CoinGecko mappings"); -} - -async fn retry_with_backoff(f: F) -> Result -where - F: Fn() -> Fut, - Fut: Future>, - E: std::fmt::Display, -{ - let mut attempts = 0; - let mut last_error = None; - - while attempts < MAX_RETRIES { - match f().await { - Ok(result) => return Ok(result), - Err(e) => { - attempts += 1; - last_error = Some(e); - if attempts < MAX_RETRIES { - tracing::warn!( - "Attempt {} failed, retrying after {} seconds", - attempts, - RETRY_DELAY.as_secs() - ); - sleep(RETRY_DELAY).await; - } - } } } - - Err(last_error.unwrap()) } lazy_static! {