Skip to content

Commit

Permalink
✨ implement CoinGecko API integration for dynamic coin mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
yezz123 committed Dec 12, 2024
1 parent 7e485df commit 0d5e3e1
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 24 deletions.
105 changes: 87 additions & 18 deletions src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,95 @@
use std::collections::HashMap;

use arc_swap::ArcSwap;
use lazy_static::lazy_static;
use phf::phf_map;
use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use std::{collections::HashMap, error::Error};

pub(crate) static LOW_SOURCES_THRESHOLD: usize = 6;

#[allow(unused)]
pub(crate) static COINGECKO_IDS: phf::Map<&'static str, &'static str> = phf_map! {
"BTC/USD" => "bitcoin",
"ETH/USD" => "ethereum",
"LUSD/USD" => "liquity-usd",
"WBTC/USD" => "wrapped-bitcoin",
"DAI/USD" => "dai",
"USDC/USD" => "usd-coin",
"USDT/USD" => "tether",
"WSTETH/USD" => "wrapped-steth",
"LORDS/USD" => "lords",
"STRK/USD" => "starknet",
"ZEND/USD" => "zklend-2",
"NSTR/USD" => "nostra",
};
// Define the Coin struct to store the id and symbol of the coin.
#[derive(Debug, Serialize, Deserialize)]
pub struct Coin {
id: String,
symbol: String,
}

// We already have the Link for the CoinGecko API, so we can use it to fetch the data.
// We will use the CoinGecko API to fetch the data.
// Example: https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=100&page=1
// We will fetch the data for the first 100 coins.
// here is the data how its look like:
// [
// {
// "id": "bitcoin",
// "symbol": "btc",
// "name": "Bitcoin",
// "image": "https://coin-images.coingecko.com/coins/images/1/large/bitcoin.png?1696501400",
// "current_price": 100390,
// ...
// },
// ...
// ]
#[allow(dead_code)]
async fn get_coingecko_mappings() -> Result<HashMap<String, String>, Box<dyn Error>> {
let client = reqwest::Client::new();
let mut mappings = HashMap::new();
let mut page = 1;

loop {
let url = format!(
"https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=100&page={}",
page
);

let response = client
.get(&url)
.header("User-Agent", "Crypto Data Fetcher")
.send()
.await?;

if !response.status().is_success() {
return Err(format!("API request failed with status: {}", response.status()).into());
}

let coins: Vec<Coin> = response.json().await?;
if coins.is_empty() {
break;
}

for coin in coins {
// Convert symbol to uppercase and create pair format
let pair = format!("{}/USD", coin.symbol.to_uppercase());
mappings.insert(pair, coin.id);
}

page += 1;
}

Ok(mappings)
}

// Replace the static phf_map with a lazy_static ArcSwap
lazy_static! {
pub static ref COINGECKO_IDS: ArcSwap<HashMap<String, String>> =
ArcSwap::new(Arc::new(HashMap::new()));
}

#[allow(dead_code)]
pub async fn initialize_coingecko_mappings() {
match get_coingecko_mappings().await {
Ok(mappings) => {
COINGECKO_IDS.store(Arc::new(mappings));
tracing::info!("Successfully initialized CoinGecko mappings");
}
Err(e) => {
tracing::error!("Failed to initialize CoinGecko mappings: {}", e);
// You might want to panic here depending on how critical this is
// panic!("Failed to initialize CoinGecko mappings: {}", e);
}
}
}

lazy_static! {
/// TODO: Current storage of long tail assets here is not really good.
Expand Down
16 changes: 16 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use tokio::task::JoinHandle;
use tokio::time::interval;

use config::{get_config, init_long_tail_asset_configuration, periodic_config_update, DataType};
use constants::initialize_coingecko_mappings;
use processing::common::{check_publisher_balance, data_indexers_are_synced};
use tracing::instrument;
use utils::{is_long_tail_asset, log_monitoring_results, log_tasks_results};
Expand All @@ -51,6 +52,9 @@ struct MonitoringTask {

#[tokio::main]
async fn main() {
// Initialize CoinGecko mappings
initialize_coingecko_mappings().await;

// Start configuring a `fmt` subscriber
let subscriber = tracing_subscriber::fmt()
.compact()
Expand Down Expand Up @@ -122,11 +126,23 @@ async fn spawn_monitoring_tasks(
name: "VRF Monitoring".to_string(),
handle: tokio::spawn(vrf_monitor(pool.clone())),
},
MonitoringTask {
name: "CoinGecko Mappings Update".to_string(),
handle: tokio::spawn(periodic_coingecko_update()),
},
];

tasks
}

async fn periodic_coingecko_update() {
let mut interval = interval(Duration::from_secs(86400)); // 1 day
loop {
interval.tick().await;
initialize_coingecko_mappings().await;
}
}

#[instrument]
async fn handle_task_results(tasks: Vec<MonitoringTask>) {
let mut results = HashMap::new();
Expand Down
7 changes: 5 additions & 2 deletions src/monitoring/on_off_deviation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub async fn on_off_price_deviation(
data_type: DataType,
cache: Cache<(String, u64), CoinPricesDTO>,
) -> Result<(f64, u32), MonitoringError> {
let ids = &COINGECKO_IDS;
let ids = COINGECKO_IDS.load();
let config = get_config(None).await;
let client = &config.network().provider;
let field_pair = cairo_short_string_to_felt(&pair_id).expect("failed to convert pair id");
Expand Down Expand Up @@ -81,7 +81,10 @@ pub async fn on_off_price_deviation(

let (deviation, num_sources_aggregated) = match data_type {
DataType::Spot => {
let coingecko_id = *ids.get(&pair_id).expect("Failed to get coingecko id");
let coingecko_id = ids
.get(&pair_id)
.expect("Failed to get coingecko id")
.clone();

let coins_prices =
query_defillama_api(timestamp, coingecko_id.to_owned(), cache).await?;
Expand Down
14 changes: 10 additions & 4 deletions src/monitoring/price_deviation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,13 @@ pub async fn price_deviation<T: Entry>(
normalized_price: f64,
cache: Cache<(String, u64), CoinPricesDTO>,
) -> Result<f64, MonitoringError> {
let ids = &COINGECKO_IDS;
let ids = COINGECKO_IDS.load();

let pair_id = query.pair_id().to_string();
let coingecko_id = *ids.get(&pair_id).expect("Failed to get coingecko id");
let coingecko_id = ids
.get(&pair_id)
.expect("Failed to get coingecko id")
.clone();

let coins_prices = query_defillama_api(
query.timestamp().timestamp().try_into().unwrap(),
Expand Down Expand Up @@ -82,9 +85,12 @@ pub async fn raw_price_deviation(
price: f64,
cache: Cache<(String, u64), CoinPricesDTO>,
) -> Result<f64, MonitoringError> {
let ids = &COINGECKO_IDS;
let ids = COINGECKO_IDS.load();

let coingecko_id = *ids.get(pair_id).expect("Failed to get coingecko id");
let coingecko_id = ids
.get(pair_id)
.expect("Failed to get coingecko id")
.clone();

let coins_prices = query_defillama_api(
chrono::Utc::now().timestamp().try_into().unwrap(),
Expand Down

0 comments on commit 0d5e3e1

Please sign in to comment.