Skip to content

Commit

Permalink
✨ Update Dev Branch with new changes (#56)
Browse files Browse the repository at this point in the history
* 🔧 Update configuration for Slack Alert Manager (#54)

* ✨ Introduce Rate limiting for Coingecko integration (#55)
  • Loading branch information
yezz123 authored Jan 29, 2025
1 parent 852b724 commit 27f6e11
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 71 deletions.
45 changes: 40 additions & 5 deletions benches/coingecko_benchmarks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pragma_monitoring::coingecko::get_coingecko_mappings;
use pragma_monitoring::constants::COINGECKO_IDS;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Runtime;

// Create a mock initialization function instead of calling the API
Expand All @@ -25,19 +27,23 @@ fn criterion_benchmark(c: &mut Criterion) {

let mut group = c.benchmark_group("coingecko_operations");

// Benchmark simple lookup
group.bench_function("single_lookup", |b| {
// Configure longer sampling time for rate limited operations
group.measurement_time(Duration::from_secs(20));
group.sample_size(10);

// Benchmark simple lookup from ArcSwap
group.bench_function("arcswap_lookup", |b| {
b.iter(|| {
let mappings = COINGECKO_IDS.load();
black_box(mappings.get("BTC/USD").cloned())
});
});

// Benchmark concurrent lookups with smaller load
group.bench_function("concurrent_lookups", |b| {
// Benchmark concurrent lookups from ArcSwap
group.bench_function("concurrent_arcswap_lookups", |b| {
b.iter(|| {
rt.block_on(async {
let handles: Vec<_> = (0..3) // Reduced to just 3 concurrent lookups
let handles: Vec<_> = (0..3)
.map(|_| {
tokio::spawn(async {
let mappings = COINGECKO_IDS.load();
Expand All @@ -51,6 +57,35 @@ fn criterion_benchmark(c: &mut Criterion) {
});
});

// Benchmark rate-limited API calls
group.bench_function("rate_limited_api_calls", |b| {
b.iter(|| rt.block_on(async { black_box(get_coingecko_mappings().await) }));
});

// Benchmark cached lookups
group.bench_function("cached_lookups", |b| {
b.iter(|| {
rt.block_on(async {
// First call will cache, subsequent calls will use cache
let _ = get_coingecko_mappings().await;
black_box(get_coingecko_mappings().await)
})
});
});

// Benchmark concurrent rate-limited API calls
group.bench_function("concurrent_rate_limited_calls", |b| {
b.iter(|| {
rt.block_on(async {
let handles: Vec<_> = (0..3)
.map(|_| tokio::spawn(async { black_box(get_coingecko_mappings().await) }))
.collect();

futures::future::join_all(handles).await
});
});
});

group.finish();
}

Expand Down
22 changes: 12 additions & 10 deletions prometheus/alertmanager.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ route:
severity: critical
continue: true

- receiver: "internal-warning"
- receiver: "Slack-internal"
group_wait: 10s
match_re:
severity: warning
severity: warning|critical
continue: true

- receiver: "public"
Expand All @@ -42,26 +42,27 @@ route:
severity: critical
continue: true

- receiver: "Slack-internal"
group_wait: 10s
match_re:
severity: warning
continue: true

receivers:
- name: "internal-warning"
telegram_configs:
- bot_token: "${{TELEGRAM_TOKEN}}}}"
- bot_token: "${{TELEGRAM_TOKEN}}"
chat_id: -1001904637278
parse_mode: "HTML"
message: '{{ template "telegram.default.message" . }}'

- name: "Slack-internal"
slack_configs:
- channel: "#internal-warning"
send_resolved: false
send_resolved: true
title: '{{ template "slack.title" . }}'
icon_emoji: '{{ template "slack.icon_emoji" . }}'
color: '{{ template "slack.color" . }}'
text: '{{ template "slack.text" . }}'

- name: "internal-critical"
opsgenie_configs:
- api_key: "${{OPS_GENIE_API_KEY}}"

- name: "public"
telegram_configs:
- bot_token: "${{TELEGRAM_TOKEN}}"
Expand All @@ -72,3 +73,4 @@ receivers:
# The directory from which notification templates are read.
templates:
- "/config/templates/alert.tmpl"
- "/config/templates/slack.tmpl"
58 changes: 58 additions & 0 deletions prometheus/templates/slack.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{{/* Slack alert title */}}
{{ define "slack.title" -}}
[{{ .Status | toUpper }}{{if eq .Status "firing"}}:{{ .Alerts.Firing | len }}{{end}}]
{{ if eq .Status "firing" -}}
{{- if eq .CommonLabels.severity "critical" }}🔥
{{- else if eq .CommonLabels.severity "warning" }}⚠️
{{- else }}ℹ️
{{- end }}
{{- else -}}
{{- end }}
{{ .CommonLabels.alertname }}
{{- end }}

{{/* Slack attachment color */}}
{{ define "slack.color" -}}
{{- if eq .Status "firing" -}}
{{- if eq .CommonLabels.severity "critical" -}}
danger
{{- else if eq .CommonLabels.severity "warning" -}}
warning
{{- else -}}
#439FE0
{{- end -}}
{{- else -}}
good
{{- end -}}
{{- end }}

{{/* Slack icon emoji */}}
{{ define "slack.icon_emoji" -}}
:alert:
{{- end }}

{{/* Main alert text */}}
{{ define "slack.text" -}}
{{- range .Alerts -}}
*Alert Details:*
{{- if .Annotations.summary }}
• *Summary:* {{ .Annotations.summary }}
{{- end }}
{{- if .Annotations.description }}
• *Description:* {{ .Annotations.description }}
{{- end }}

*Additional Information:*
• *Network:* {{ .Labels.network }}
• *Publisher:* {{ .Labels.publisher }}
• *Severity:* {{ .Labels.severity | toUpper }}
• *Type:* {{ .Labels.type }}
• *Started:* {{ .StartsAt | since }}

{{- if .Labels.pair }}
• *Trading Pair:* {{ .Labels.pair }}
{{- end }}
---
{{ end }}
{{- end }}
136 changes: 126 additions & 10 deletions src/coingecko.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,65 @@
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio::time::sleep;

#[derive(Debug, Serialize, Deserialize)]
pub struct Coin {
id: String,
symbol: String,
const RATE_LIMIT_PER_MINUTE: u32 = 30; // CoinGecko's free tier limit
const CACHE_DURATION: Duration = Duration::from_secs(300); // Cache for 5 minutes
const BACKOFF_BASE: Duration = Duration::from_secs(2);
const MAX_RETRIES: u32 = 3;

#[derive(Debug, Clone)]
struct RateLimiter {
last_reset: Instant,
requests_made: u32,
}

impl RateLimiter {
fn new() -> Self {
Self {
last_reset: Instant::now(),
requests_made: 0,
}
}

async fn wait_if_needed(&mut self) {
let now = Instant::now();
let elapsed = now.duration_since(self.last_reset);

if elapsed >= Duration::from_secs(60) {
// Reset counter if a minute has passed
self.last_reset = now;
self.requests_made = 0;
} else if self.requests_made >= RATE_LIMIT_PER_MINUTE {
// Wait for the remainder of the minute if we've hit the limit
let wait_time = Duration::from_secs(60) - elapsed;
tracing::info!(
"Rate limit reached, waiting for {} seconds",
wait_time.as_secs()
);
sleep(wait_time).await;
self.last_reset = Instant::now();
self.requests_made = 0;
}
}

fn increment(&mut self) {
self.requests_made += 1;
}
}

fn to_usd_pair(symbol: String) -> String {
format!("{}/USD", symbol)
#[derive(Debug, Clone)]
struct CacheEntry {
data: Arc<HashMap<String, String>>,
timestamp: Instant,
}

lazy_static! {
static ref RATE_LIMITER: Arc<RwLock<RateLimiter>> = Arc::new(RwLock::new(RateLimiter::new()));
static ref CACHE: Arc<RwLock<Option<CacheEntry>>> = Arc::new(RwLock::new(None));
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -19,26 +70,91 @@ pub enum CoinGeckoError {
RequestFailed(String),
#[error("Coingecko Coins Failed to parse response: {0}")]
ParseError(#[from] reqwest::Error),
#[error("Rate limit exceeded")]
RateLimitExceeded,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Coin {
id: String,
symbol: String,
}

pub async fn get_coingecko_mappings() -> Result<HashMap<String, String>, CoinGeckoError> {
// Check cache first
if let Some(cache_entry) = CACHE.read().await.as_ref() {
if cache_entry.timestamp.elapsed() < CACHE_DURATION {
tracing::debug!("Returning cached CoinGecko mappings");
return Ok((*cache_entry.data).clone());
}
}

// If not in cache, fetch with exponential backoff
let mut retry_count = 0;
let mut last_error = None;

while retry_count < MAX_RETRIES {
match fetch_mappings().await {
Ok(mappings) => {
// Update cache
let cache_entry = CacheEntry {
data: Arc::new(mappings.clone()),
timestamp: Instant::now(),
};
*CACHE.write().await = Some(cache_entry);
return Ok(mappings);
}
Err(e) => {
retry_count += 1;
last_error = Some(e);

if retry_count < MAX_RETRIES {
let backoff = BACKOFF_BASE * 2u32.pow(retry_count - 1);
tracing::warn!(
"Attempt {} failed, retrying after {} seconds",
retry_count,
backoff.as_secs()
);
sleep(backoff).await;
}
}
}
}

Err(last_error.unwrap_or(CoinGeckoError::EmptyResponse))
}

async fn fetch_mappings() -> Result<HashMap<String, String>, CoinGeckoError> {
let client = reqwest::Client::new();
let mut mappings = HashMap::new();
let mut page = 1;

loop {
// Wait if we need to respect rate limits
RATE_LIMITER.write().await.wait_if_needed().await;

let url = format!(
"https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=100&page={}",
page
);

let response = client
.get(&url)
.header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36")
.header(
"User-Agent",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
)
.send()
.await
.map_err(CoinGeckoError::ParseError)?;

// Increment the rate limiter counter
RATE_LIMITER.write().await.increment();

if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
return Err(CoinGeckoError::RateLimitExceeded);
}

if !response.status().is_success() {
return Err(CoinGeckoError::RequestFailed(response.status().to_string()));
}
Expand All @@ -51,9 +167,9 @@ pub async fn get_coingecko_mappings() -> Result<HashMap<String, String>, CoinGec
break;
}

coins.into_iter().for_each(|coin| {
mappings.insert(to_usd_pair(coin.symbol.to_uppercase()), coin.id);
});
for coin in coins {
mappings.insert(format!("{}/USD", coin.symbol.to_uppercase()), coin.id);
}

page += 1;
}
Expand Down
Loading

0 comments on commit 27f6e11

Please sign in to comment.