Skip to content

Commit

Permalink
graph: Add get_block_number_with_retry method for firehose endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Mar 6, 2025
1 parent e727a25 commit fd36092
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 16 deletions.
8 changes: 5 additions & 3 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1018,10 +1018,12 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
ChainClient::Firehose(endpoints) => {
let endpoint = endpoints.endpoint().await?;
let block = endpoint
.get_block_by_number::<codec::Block>(ptr.number as u64, &self.logger)
.get_block_by_number_with_retry::<codec::Block>(ptr.number as u64, &self.logger)
.await
.map_err(|e| anyhow!("Failed to fetch block from firehose: {}", e))?;

.context(format!(
"Failed to fetch block {} from firehose",
ptr.number
))?;
Ok(block.hash() == ptr.hash)
}
ChainClient::Rpc(adapter) => {
Expand Down
88 changes: 75 additions & 13 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
prelude::{anyhow, debug, DeploymentHash},
substreams_rpc,
};
use anyhow::Context;
use async_trait::async_trait;
use futures03::{StreamExt, TryStreamExt};
use http0::uri::{Scheme, Uri};
Expand Down Expand Up @@ -443,11 +444,43 @@ impl FirehoseEndpoint {
}
}

pub async fn get_block_by_number<M>(
&self,
number: u64,
pub async fn get_block_by_ptr_with_retry<M>(
self: Arc<Self>,
ptr: &BlockPtr,
logger: &Logger,
) -> Result<M, anyhow::Error>
where
M: prost::Message + BlockchainBlock + Default + 'static,
{
let retry_log_message = format!("get_block_by_ptr for block {}", ptr);
let endpoint = self.cheap_clone();
let logger = logger.cheap_clone();
let ptr_for_retry = ptr.clone();

retry(retry_log_message, &logger)
.limit(ENV_VARS.firehose_block_fetch_retry_limit)
.timeout_secs(ENV_VARS.firehose_block_fetch_timeout)
.run(move || {
let endpoint = endpoint.cheap_clone();
let logger = logger.cheap_clone();
let ptr = ptr_for_retry.clone();
async move {
endpoint
.get_block_by_ptr::<M>(&ptr, &logger)
.await
.context(format!(
"Failed to fetch block by ptr {} from firehose",
ptr
))
}
})
.await
.map_err(move |e| {
anyhow::anyhow!("Failed to fetch block by ptr {} from firehose: {}", ptr, e)
})
}

async fn get_block_by_number<M>(&self, number: u64, logger: &Logger) -> Result<M, anyhow::Error>
where
M: prost::Message + BlockchainBlock + Default + 'static,
{
Expand All @@ -473,6 +506,44 @@ impl FirehoseEndpoint {
}
}

pub async fn get_block_by_number_with_retry<M>(
self: Arc<Self>,
number: u64,
logger: &Logger,
) -> Result<M, anyhow::Error>
where
M: prost::Message + BlockchainBlock + Default + 'static,
{
let retry_log_message = format!("get_block_by_number for block {}", number);
let endpoint = self.cheap_clone();
let logger = logger.cheap_clone();

retry(retry_log_message, &logger)
.limit(ENV_VARS.firehose_block_fetch_retry_limit)
.timeout_secs(ENV_VARS.firehose_block_fetch_timeout)
.run(move || {
let endpoint = endpoint.cheap_clone();
let logger = logger.cheap_clone();
async move {
endpoint
.get_block_by_number::<M>(number, &logger)
.await
.context(format!(
"Failed to fetch block by number {} from firehose",
number
))
}
})
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to fetch block by number {} from firehose: {}",
number,
e
)
})
}

pub async fn load_blocks_by_numbers<M>(
self: Arc<Self>,
numbers: Vec<u64>,
Expand All @@ -488,16 +559,7 @@ impl FirehoseEndpoint {
.map(move |number| {
let e = self.cheap_clone();
let l = logger.clone();
let retry_log_message = format!("get_block_by_number for block {}", number);

retry(retry_log_message, &l)
.limit(ENV_VARS.firehose_block_fetch_retry_limit)
.timeout_secs(ENV_VARS.firehose_block_fetch_timeout)
.run(move || {
let e = e.cheap_clone();
let l = l.clone();
async move { e.get_block_by_number::<M>(number, &l).await }
})
async move { e.get_block_by_number_with_retry::<M>(number, &l).await }
})
.buffered(ENV_VARS.firehose_block_batch_size);

Expand Down

0 comments on commit fd36092

Please sign in to comment.