diff --git a/apps/fortuna/Cargo.lock b/apps/fortuna/Cargo.lock index da418bd62..5815382fe 100644 --- a/apps/fortuna/Cargo.lock +++ b/apps/fortuna/Cargo.lock @@ -1503,7 +1503,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "7.3.0" +version = "7.4.0" dependencies = [ "anyhow", "axum", diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index 923ccafd0..b8b93e39b 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "7.3.0" +version = "7.4.0" edition = "2021" [dependencies] diff --git a/apps/fortuna/config.sample.yaml b/apps/fortuna/config.sample.yaml index 80cc0af3f..d2fa7fe20 100644 --- a/apps/fortuna/config.sample.yaml +++ b/apps/fortuna/config.sample.yaml @@ -39,6 +39,11 @@ chains: target_profit_pct: 20 max_profit_pct: 100 + # A list of block delays for processing blocks multiple times. Each number represents + # how many blocks to wait before processing. For example, [5, 10, 20] means process + # blocks after 5 blocks, then again after 10 blocks, and finally after 20 blocks. + block_delays: [5, 10, 20] + # Historical commitments -- delete this block for local development purposes commitments: # prettier-ignore diff --git a/apps/fortuna/src/config.rs b/apps/fortuna/src/config.rs index 70698f3fa..b3f9fb088 100644 --- a/apps/fortuna/src/config.rs +++ b/apps/fortuna/src/config.rs @@ -177,6 +177,16 @@ pub struct EthereumConfig { /// Maximum number of hashes to record in a request. /// This should be set according to the maximum gas limit the provider supports for callbacks. pub max_num_hashes: Option, + + /// A list of delays (in blocks) that indicates how many blocks should be delayed + /// before we process a block. For retry logic, we can process blocks multiple times + /// at each specified delay. For example: [5, 10, 20]. + #[serde(default = "default_block_delays")] + pub block_delays: Vec, +} + +fn default_block_delays() -> Vec { + vec![5] } fn default_priority_fee_multiplier_pct() -> u64 { diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index c40ce770d..3953d8759 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -250,7 +250,7 @@ impl KeeperMetrics { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BlockRange { pub from: BlockNumber, pub to: BlockNumber, @@ -346,7 +346,8 @@ pub async fn run_keeper_threads( ) .in_current_span(), ); - // Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks. + + // Spawn a thread for block processing with configured delays spawn( process_new_blocks( chain_state.clone(), @@ -356,6 +357,7 @@ pub async fn run_keeper_threads( chain_eth_config.escalation_policy.clone(), metrics.clone(), fulfilled_requests_cache.clone(), + chain_eth_config.block_delays.clone(), ) .in_current_span(), ); @@ -965,8 +967,10 @@ pub async fn watch_blocks( } } -/// It waits on rx channel to receive block ranges and then calls process_block_range to process them. +/// It waits on rx channel to receive block ranges and then calls process_block_range to process them +/// for each configured block delay. #[tracing::instrument(skip_all)] +#[allow(clippy::too_many_arguments)] pub async fn process_new_blocks( chain_state: BlockchainState, mut rx: mpsc::Receiver, @@ -975,12 +979,14 @@ pub async fn process_new_blocks( escalation_policy: EscalationPolicyConfig, metrics: Arc, fulfilled_requests_cache: Arc>>, + block_delays: Vec, ) { tracing::info!("Waiting for new block ranges to process"); loop { if let Some(block_range) = rx.recv().await { + // Process blocks immediately first process_block_range( - block_range, + block_range.clone(), Arc::clone(&contract), gas_limit, escalation_policy.clone(), @@ -990,6 +996,25 @@ pub async fn process_new_blocks( ) .in_current_span() .await; + + // Then process with each configured delay + for delay in &block_delays { + let adjusted_range = BlockRange { + from: block_range.from.saturating_sub(*delay), + to: block_range.to.saturating_sub(*delay), + }; + process_block_range( + adjusted_range, + Arc::clone(&contract), + gas_limit, + escalation_policy.clone(), + chain_state.clone(), + metrics.clone(), + fulfilled_requests_cache.clone(), + ) + .in_current_span() + .await; + } } } }