Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Introduce Rate limiting for Coingecko integration #55

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions benches/coingecko_benchmarks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pragma_monitoring::coingecko::get_coingecko_mappings;
use pragma_monitoring::constants::COINGECKO_IDS;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Runtime;

// Create a mock initialization function instead of calling the API
Expand All @@ -25,19 +27,23 @@ fn criterion_benchmark(c: &mut Criterion) {

let mut group = c.benchmark_group("coingecko_operations");

// Benchmark simple lookup
group.bench_function("single_lookup", |b| {
// Configure longer sampling time for rate limited operations
group.measurement_time(Duration::from_secs(20));
group.sample_size(10);

// Benchmark simple lookup from ArcSwap
group.bench_function("arcswap_lookup", |b| {
b.iter(|| {
let mappings = COINGECKO_IDS.load();
black_box(mappings.get("BTC/USD").cloned())
});
});

// Benchmark concurrent lookups with smaller load
group.bench_function("concurrent_lookups", |b| {
// Benchmark concurrent lookups from ArcSwap
group.bench_function("concurrent_arcswap_lookups", |b| {
b.iter(|| {
rt.block_on(async {
let handles: Vec<_> = (0..3) // Reduced to just 3 concurrent lookups
let handles: Vec<_> = (0..3)
.map(|_| {
tokio::spawn(async {
let mappings = COINGECKO_IDS.load();
Expand All @@ -51,6 +57,35 @@ fn criterion_benchmark(c: &mut Criterion) {
});
});

// Benchmark rate-limited API calls
group.bench_function("rate_limited_api_calls", |b| {
b.iter(|| rt.block_on(async { black_box(get_coingecko_mappings().await) }));
});

// Benchmark cached lookups
group.bench_function("cached_lookups", |b| {
b.iter(|| {
rt.block_on(async {
// First call will cache, subsequent calls will use cache
let _ = get_coingecko_mappings().await;
black_box(get_coingecko_mappings().await)
})
});
});

// Benchmark concurrent rate-limited API calls
group.bench_function("concurrent_rate_limited_calls", |b| {
b.iter(|| {
rt.block_on(async {
let handles: Vec<_> = (0..3)
.map(|_| tokio::spawn(async { black_box(get_coingecko_mappings().await) }))
.collect();

futures::future::join_all(handles).await
});
});
});

group.finish();
}

Expand Down
136 changes: 126 additions & 10 deletions src/coingecko.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,65 @@
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio::time::sleep;

#[derive(Debug, Serialize, Deserialize)]
pub struct Coin {
id: String,
symbol: String,
const RATE_LIMIT_PER_MINUTE: u32 = 30; // CoinGecko's free tier limit
const CACHE_DURATION: Duration = Duration::from_secs(300); // Cache for 5 minutes
const BACKOFF_BASE: Duration = Duration::from_secs(2);
const MAX_RETRIES: u32 = 3;

#[derive(Debug, Clone)]
struct RateLimiter {
last_reset: Instant,
requests_made: u32,
}

impl RateLimiter {
fn new() -> Self {
Self {
last_reset: Instant::now(),
requests_made: 0,
}
}

async fn wait_if_needed(&mut self) {
let now = Instant::now();
let elapsed = now.duration_since(self.last_reset);

if elapsed >= Duration::from_secs(60) {
// Reset counter if a minute has passed
self.last_reset = now;
self.requests_made = 0;
} else if self.requests_made >= RATE_LIMIT_PER_MINUTE {
// Wait for the remainder of the minute if we've hit the limit
let wait_time = Duration::from_secs(60) - elapsed;
tracing::info!(
"Rate limit reached, waiting for {} seconds",
wait_time.as_secs()
);
sleep(wait_time).await;
self.last_reset = Instant::now();
self.requests_made = 0;
}
}

fn increment(&mut self) {
self.requests_made += 1;
}
}

fn to_usd_pair(symbol: String) -> String {
format!("{}/USD", symbol)
#[derive(Debug, Clone)]
struct CacheEntry {
data: Arc<HashMap<String, String>>,
timestamp: Instant,
}

lazy_static! {
static ref RATE_LIMITER: Arc<RwLock<RateLimiter>> = Arc::new(RwLock::new(RateLimiter::new()));
static ref CACHE: Arc<RwLock<Option<CacheEntry>>> = Arc::new(RwLock::new(None));
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -19,26 +70,91 @@ pub enum CoinGeckoError {
RequestFailed(String),
#[error("Coingecko Coins Failed to parse response: {0}")]
ParseError(#[from] reqwest::Error),
#[error("Rate limit exceeded")]
RateLimitExceeded,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Coin {
id: String,
symbol: String,
}

pub async fn get_coingecko_mappings() -> Result<HashMap<String, String>, CoinGeckoError> {
// Check cache first
if let Some(cache_entry) = CACHE.read().await.as_ref() {
if cache_entry.timestamp.elapsed() < CACHE_DURATION {
tracing::debug!("Returning cached CoinGecko mappings");
return Ok((*cache_entry.data).clone());
}
}

// If not in cache, fetch with exponential backoff
let mut retry_count = 0;
let mut last_error = None;

while retry_count < MAX_RETRIES {
match fetch_mappings().await {
Ok(mappings) => {
// Update cache
let cache_entry = CacheEntry {
data: Arc::new(mappings.clone()),
timestamp: Instant::now(),
};
*CACHE.write().await = Some(cache_entry);
return Ok(mappings);
}
Err(e) => {
retry_count += 1;
last_error = Some(e);

if retry_count < MAX_RETRIES {
let backoff = BACKOFF_BASE * 2u32.pow(retry_count - 1);
tracing::warn!(
"Attempt {} failed, retrying after {} seconds",
retry_count,
backoff.as_secs()
);
sleep(backoff).await;
}
}
}
}

Err(last_error.unwrap_or(CoinGeckoError::EmptyResponse))
}

async fn fetch_mappings() -> Result<HashMap<String, String>, CoinGeckoError> {
let client = reqwest::Client::new();
let mut mappings = HashMap::new();
let mut page = 1;

loop {
// Wait if we need to respect rate limits
RATE_LIMITER.write().await.wait_if_needed().await;

let url = format!(
"https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=100&page={}",
page
);

let response = client
.get(&url)
.header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36")
.header(
"User-Agent",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
)
.send()
.await
.map_err(CoinGeckoError::ParseError)?;

// Increment the rate limiter counter
RATE_LIMITER.write().await.increment();

if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
return Err(CoinGeckoError::RateLimitExceeded);
}

if !response.status().is_success() {
return Err(CoinGeckoError::RequestFailed(response.status().to_string()));
}
Expand All @@ -51,9 +167,9 @@ pub async fn get_coingecko_mappings() -> Result<HashMap<String, String>, CoinGec
break;
}

coins.into_iter().for_each(|coin| {
mappings.insert(to_usd_pair(coin.symbol.to_uppercase()), coin.id);
});
for coin in coins {
mappings.insert(format!("{}/USD", coin.symbol.to_uppercase()), coin.id);
}

page += 1;
}
Expand Down
53 changes: 7 additions & 46 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ use arc_swap::ArcSwap;
use lazy_static::lazy_static;
use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;

use crate::coingecko::get_coingecko_mappings;
pub(crate) static LOW_SOURCES_THRESHOLD: usize = 6;
Expand All @@ -15,54 +12,18 @@ lazy_static! {
ArcSwap::new(Arc::new(HashMap::new()));
}

const MAX_RETRIES: u32 = 3;
const RETRY_DELAY: Duration = Duration::from_secs(2);

#[allow(dead_code)]
pub async fn initialize_coingecko_mappings() {
let mappings = retry_with_backoff(get_coingecko_mappings)
.await
.unwrap_or_else(|e| {
tracing::error!(
"Failed to initialize CoinGecko mappings after {} retries: {}",
MAX_RETRIES,
e
);
match get_coingecko_mappings().await {
Ok(mappings) => {
COINGECKO_IDS.store(Arc::new(mappings));
tracing::info!("Successfully initialized CoinGecko mappings");
}
Err(e) => {
tracing::error!("Failed to initialize CoinGecko mappings: {}", e);
panic!("Cannot start monitoring without CoinGecko mappings: {}", e);
});

COINGECKO_IDS.store(Arc::new(mappings));
tracing::info!("Successfully initialized CoinGecko mappings");
}

async fn retry_with_backoff<F, Fut, T, E>(f: F) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
let mut attempts = 0;
let mut last_error = None;

while attempts < MAX_RETRIES {
match f().await {
Ok(result) => return Ok(result),
Err(e) => {
attempts += 1;
last_error = Some(e);
if attempts < MAX_RETRIES {
tracing::warn!(
"Attempt {} failed, retrying after {} seconds",
attempts,
RETRY_DELAY.as_secs()
);
sleep(RETRY_DELAY).await;
}
}
}
}

Err(last_error.unwrap())
}

lazy_static! {
Expand Down