Skip to content

Commit

Permalink
feat(api): add Raydium price fetcher cache warmup for symbols
Browse files Browse the repository at this point in the history
  • Loading branch information
armyhaylenko committed Jan 28, 2025
1 parent 1adc382 commit 39c0457
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 14 deletions.
22 changes: 16 additions & 6 deletions nft_ingester/src/api/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::{
sync::{broadcast::Receiver, Mutex},
task::{JoinError, JoinSet},
};
use tracing::{error, info};
use tracing::{error, info, warn};
use usecase::proofs::MaybeProofChecker;
use uuid::Uuid;

Expand Down Expand Up @@ -111,6 +111,20 @@ pub async fn start_api(
}

let addr = SocketAddr::from(([0, 0, 0, 0], port));
let token_price_fetcher = Arc::new(RaydiumTokenPriceFetcher::new(
"https://api-v3.raydium.io".to_string(),
crate::raydium_price_fetcher::CACHE_TTL,
red_metrics,
));
let tpf = token_price_fetcher.clone();
tasks.lock().await.spawn(async move {
if let Err(e) = tpf.warmup().await {
warn!(error = %e, "Failed to warm up Raydium token price fetcher, cache is empty: {:?}", e);
}
let (symbol_cache_size, _) = tpf.get_cache_sizes();
info!(%symbol_cache_size, "Warmed up Raydium token price fetcher with {} symbols", symbol_cache_size);
Ok(())
});
let api = DasApi::new(
pg_client.clone(),
rocks_db,
Expand All @@ -123,11 +137,7 @@ pub async fn start_api(
json_middleware_config.unwrap_or_default(),
account_balance_getter,
storage_service_base_url,
Arc::new(RaydiumTokenPriceFetcher::new(
"https://api-v3.raydium.io".to_string(),
crate::raydium_price_fetcher::CACHE_TTL,
red_metrics,
)),
token_price_fetcher,
native_mint_pubkey,
);

Expand Down
61 changes: 53 additions & 8 deletions nft_ingester/src/raydium_price_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,43 @@ impl RaydiumTokenPriceFetcher {
}
}

pub async fn warmup(&self) -> Result<(), IngesterError> {
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct MintListItem {
address: String,
symbol: String,
}
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct MintListResponse {
mint_list: Vec<MintListItem>,
}
// returns well-known token infos
let req = "mint/list";
let response = self.get(req).await.map_err(|e| UsecaseError::Reqwest(e.to_string()))?;

let tokens_data = response
.get("data")
.and_then(|mint_list| {
serde_json::from_value::<MintListResponse>(mint_list.clone()).ok()
})
.ok_or_else(|| {
UsecaseError::Reqwest(format!(
"No 'data' field in RaydiumTokenPriceFetcher ids response. Full response: {:#?}",
response
))
})?;

for MintListItem { address, symbol } in tokens_data.mint_list {
self.symbol_cache.insert(address.clone(), symbol.clone()).await;
}

self.symbol_cache.run_pending_tasks().await;

Ok(())
}

async fn get(&self, endpoint: &str) -> Result<serde_json::Value, IngesterError> {
let start_time = chrono::Utc::now();
let response = reqwest::get(format!("{host}/{ep}", host = self.host, ep = endpoint))
Expand All @@ -53,6 +90,13 @@ impl RaydiumTokenPriceFetcher {
}
response
}

/// Returns the approximate sizes of the symbol and the price caches.
///
/// The return format is (symbol_cache_size, price_cache_size).
pub fn get_cache_sizes(&self) -> (u64, u64) {
(self.symbol_cache.weighted_size(), self.price_cache.weighted_size())
}
}

#[async_trait]
Expand All @@ -61,6 +105,12 @@ impl TokenPriceFetcher for RaydiumTokenPriceFetcher {
&self,
token_ids: &[String],
) -> Result<HashMap<String, String>, UsecaseError> {
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct MintIdsItem {
address: String,
symbol: String,
}
let token_ids_str: Vec<String> = token_ids.iter().map(ToString::to_string).collect();
let mut result = HashMap::with_capacity(token_ids.len());
let mut missing_token_ids = Vec::new();
Expand All @@ -80,21 +130,16 @@ impl TokenPriceFetcher for RaydiumTokenPriceFetcher {

let tokens_data = response
.get("data")
.and_then(|td| td.as_array())
.and_then(|item| serde_json::from_value::<Vec<Option<MintIdsItem>>>(item.clone()).ok())
.ok_or_else(|| {
UsecaseError::Reqwest(format!(
"No 'data' field in RaydiumTokenPriceFetcher ids response. Full response: {:#?}",
response
))
})?;

for data in tokens_data {
if let (Some(address), Some(symbol)) = (
data.get("address").and_then(|a| a.as_str()),
data.get("symbol").and_then(|s| s.as_str()),
) {
let address = address.to_string();
let symbol = symbol.to_string();
for maybe_token_data in tokens_data {
if let Some(MintIdsItem { address, symbol }) = maybe_token_data {
self.symbol_cache.insert(address.clone(), symbol.clone()).await;
result.insert(address, symbol);
}
Expand Down
9 changes: 9 additions & 0 deletions nft_ingester/tests/price_fetch_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,13 @@ mod tests {
assert!(prices.get(&token_pie).unwrap().clone() > 0.0);
assert!(prices.get(&non_existed_token).is_none());
}

#[tokio::test(flavor = "multi_thread")]
async fn test_token_price_fetcher_warmup() {
let token_price_fetcher = RaydiumTokenPriceFetcher::default();
token_price_fetcher.warmup().await.expect("warmup must succeed");

// check that the cache was pre-filled by some token symbols
assert!(token_price_fetcher.get_cache_sizes().0 > 0);
}
}

0 comments on commit 39c0457

Please sign in to comment.