diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 876bb5a92b0..117e3033b18 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -1018,10 +1018,12 @@ impl TriggersAdapterTrait for TriggersAdapter { ChainClient::Firehose(endpoints) => { let endpoint = endpoints.endpoint().await?; let block = endpoint - .get_block_by_number::(ptr.number as u64, &self.logger) + .get_block_by_number_with_retry::(ptr.number as u64, &self.logger) .await - .map_err(|e| anyhow!("Failed to fetch block from firehose: {}", e))?; - + .context(format!( + "Failed to fetch block {} from firehose", + ptr.number + ))?; Ok(block.hash() == ptr.hash) } ChainClient::Rpc(adapter) => { diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index f496a3ea3ce..504a42b3c06 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -13,6 +13,7 @@ use crate::{ prelude::{anyhow, debug, DeploymentHash}, substreams_rpc, }; +use anyhow::Context; use async_trait::async_trait; use futures03::{StreamExt, TryStreamExt}; use http0::uri::{Scheme, Uri}; @@ -443,11 +444,43 @@ impl FirehoseEndpoint { } } - pub async fn get_block_by_number( - &self, - number: u64, + pub async fn get_block_by_ptr_with_retry( + self: Arc, + ptr: &BlockPtr, logger: &Logger, ) -> Result + where + M: prost::Message + BlockchainBlock + Default + 'static, + { + let retry_log_message = format!("get_block_by_ptr for block {}", ptr); + let endpoint = self.cheap_clone(); + let logger = logger.cheap_clone(); + let ptr_for_retry = ptr.clone(); + + retry(retry_log_message, &logger) + .limit(ENV_VARS.firehose_block_fetch_retry_limit) + .timeout_secs(ENV_VARS.firehose_block_fetch_timeout) + .run(move || { + let endpoint = endpoint.cheap_clone(); + let logger = logger.cheap_clone(); + let ptr = ptr_for_retry.clone(); + async move { + endpoint + .get_block_by_ptr::(&ptr, &logger) + .await + .context(format!( + "Failed to fetch block by ptr {} from firehose", + ptr + )) + } + }) + .await + .map_err(move |e| { + anyhow::anyhow!("Failed to fetch block by ptr {} from firehose: {}", ptr, e) + }) + } + + async fn get_block_by_number(&self, number: u64, logger: &Logger) -> Result where M: prost::Message + BlockchainBlock + Default + 'static, { @@ -473,6 +506,44 @@ impl FirehoseEndpoint { } } + pub async fn get_block_by_number_with_retry( + self: Arc, + number: u64, + logger: &Logger, + ) -> Result + where + M: prost::Message + BlockchainBlock + Default + 'static, + { + let retry_log_message = format!("get_block_by_number for block {}", number); + let endpoint = self.cheap_clone(); + let logger = logger.cheap_clone(); + + retry(retry_log_message, &logger) + .limit(ENV_VARS.firehose_block_fetch_retry_limit) + .timeout_secs(ENV_VARS.firehose_block_fetch_timeout) + .run(move || { + let endpoint = endpoint.cheap_clone(); + let logger = logger.cheap_clone(); + async move { + endpoint + .get_block_by_number::(number, &logger) + .await + .context(format!( + "Failed to fetch block by number {} from firehose", + number + )) + } + }) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to fetch block by number {} from firehose: {}", + number, + e + ) + }) + } + pub async fn load_blocks_by_numbers( self: Arc, numbers: Vec, @@ -488,16 +559,7 @@ impl FirehoseEndpoint { .map(move |number| { let e = self.cheap_clone(); let l = logger.clone(); - let retry_log_message = format!("get_block_by_number for block {}", number); - - retry(retry_log_message, &l) - .limit(ENV_VARS.firehose_block_fetch_retry_limit) - .timeout_secs(ENV_VARS.firehose_block_fetch_timeout) - .run(move || { - let e = e.cheap_clone(); - let l = l.clone(); - async move { e.get_block_by_number::(number, &l).await } - }) + async move { e.get_block_by_number_with_retry::(number, &l).await } }) .buffered(ENV_VARS.firehose_block_batch_size);