diff --git a/apps/fortuna/config.sample.yaml b/apps/fortuna/config.sample.yaml index 6774ef1b07..d2fa7fe205 100644 --- a/apps/fortuna/config.sample.yaml +++ b/apps/fortuna/config.sample.yaml @@ -39,6 +39,11 @@ chains: target_profit_pct: 20 max_profit_pct: 100 + # A list of block delays for processing blocks multiple times. Each number represents + # how many blocks to wait before processing. For example, [5, 10, 20] means process + # blocks after 5 blocks, then again after 10 blocks, and finally after 20 blocks. + block_delays: [5, 10, 20] + # Historical commitments -- delete this block for local development purposes commitments: # prettier-ignore @@ -78,5 +83,3 @@ keeper: value: 0xabcd # For production, you can store the private key in a file. # file: keeper-key.txt - # Number of blocks to wait before processing new blocks (in addition to reveal_delay_blocks) - delay_blocks: 5 diff --git a/apps/fortuna/src/config.rs b/apps/fortuna/src/config.rs index 95b633c5c6..b3f9fb0887 100644 --- a/apps/fortuna/src/config.rs +++ b/apps/fortuna/src/config.rs @@ -177,6 +177,16 @@ pub struct EthereumConfig { /// Maximum number of hashes to record in a request. /// This should be set according to the maximum gas limit the provider supports for callbacks. pub max_num_hashes: Option<u32>, + + /// A list of delays (in blocks) that indicates how many blocks should be delayed + /// before we process a block. For retry logic, we can process blocks multiple times + /// at each specified delay. For example: [5, 10, 20]. + #[serde(default = "default_block_delays")] + pub block_delays: Vec<u64>, +} + +fn default_block_delays() -> Vec<u64> { + vec![5] } fn default_priority_fee_multiplier_pct() -> u64 { @@ -341,14 +351,6 @@ pub struct KeeperConfig { /// This key *does not need to be a registered provider*. In particular, production deployments /// should ensure this is a different key in order to reduce the severity of security breaches. pub private_key: SecretString, - - /// Number of blocks to wait before processing new blocks (in addition to reveal_delay_blocks) - #[serde(default = "default_delay_blocks")] - pub delay_blocks: u64, -} - -fn default_delay_blocks() -> u64 { - 5 } // A secret is a string that can be provided either as a literal in the config, diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 5d8c8255d1..3845789169 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -250,7 +250,7 @@ impl KeeperMetrics { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BlockRange { pub from: BlockNumber, pub to: BlockNumber, @@ -335,7 +335,7 @@ pub async fn run_keeper_threads( .in_current_span(), ); - let (tx, _rx) = mpsc::channel::<BlockRange>(1000); + let (tx, rx) = mpsc::channel::<BlockRange>(1000); // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel. spawn( watch_blocks_wrapper( @@ -346,57 +346,18 @@ pub async fn run_keeper_threads( ) .in_current_span(), ); - // Create channels for both immediate and delayed block processing - let (tx_immediate, rx_immediate) = mpsc::channel::<BlockRange>(1000); - let (tx_delayed, rx_delayed) = mpsc::channel::<BlockRange>(1000); - // Spawn a thread to watch for new blocks and send the range of blocks to both channels - spawn( - watch_blocks_wrapper( - chain_state.clone(), - latest_safe_block, - tx_immediate, - chain_eth_config.geth_rpc_wss.clone(), - ) - .in_current_span(), - ); - - // Clone the tx_delayed channel for the second watch_blocks_wrapper - spawn( - watch_blocks_wrapper( - chain_state.clone(), - latest_safe_block, - tx_delayed, - chain_eth_config.geth_rpc_wss.clone(), - ) - .in_current_span(), - ); - - // Spawn a thread for immediate block processing + // Spawn a thread for block processing with configured delays spawn( process_new_blocks( chain_state.clone(), - rx_immediate, + rx, Arc::clone(&contract), gas_limit, chain_eth_config.escalation_policy.clone(), metrics.clone(), fulfilled_requests_cache.clone(), - ) - .in_current_span(), - ); - - // Spawn a thread for delayed block processing - spawn( - process_new_blocks_delayed( - chain_state.clone(), - rx_delayed, - Arc::clone(&contract), - gas_limit, - chain_eth_config.escalation_policy.clone(), - metrics.clone(), - fulfilled_requests_cache.clone(), - 5, + chain_eth_config.clone(), ) .in_current_span(), ); @@ -1006,8 +967,10 @@ pub async fn watch_blocks( } } -/// It waits on rx channel to receive block ranges and then calls process_block_range to process them. +/// It waits on rx channel to receive block ranges and then calls process_block_range to process them +/// for each configured block delay. #[tracing::instrument(skip_all)] +#[allow(clippy::too_many_arguments)] pub async fn process_new_blocks( chain_state: BlockchainState, mut rx: mpsc::Receiver<BlockRange>, @@ -1016,12 +979,14 @@ pub async fn process_new_blocks( escalation_policy: EscalationPolicyConfig, metrics: Arc<KeeperMetrics>, fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>, + chain_eth_config: EthereumConfig, ) { tracing::info!("Waiting for new block ranges to process"); loop { if let Some(block_range) = rx.recv().await { + // Process blocks immediately first process_block_range( - block_range, + block_range.clone(), Arc::clone(&contract), gas_limit, escalation_policy.clone(), @@ -1031,44 +996,25 @@ pub async fn process_new_blocks( ) .in_current_span() .await; - } - } -} -#[tracing::instrument(skip_all)] -#[allow(clippy::too_many_arguments)] -pub async fn process_new_blocks_delayed( - chain_state: BlockchainState, - mut rx: mpsc::Receiver<BlockRange>, - contract: Arc<InstrumentedSignablePythContract>, - gas_limit: U256, - escalation_policy: EscalationPolicyConfig, - metrics: Arc<KeeperMetrics>, - fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>, - delay_blocks: u64, -) { - tracing::info!( - "Waiting for new block ranges to process with {} block delay", - delay_blocks - ); - loop { - if let Some(block_range) = rx.recv().await { - let from_block_after_delay = block_range.from + delay_blocks; - let adjusted_range = BlockRange { - from: from_block_after_delay, - to: block_range.to + delay_blocks, - }; - process_block_range( - adjusted_range, - Arc::clone(&contract), - gas_limit, - escalation_policy.clone(), - chain_state.clone(), - metrics.clone(), - fulfilled_requests_cache.clone(), - ) - .in_current_span() - .await; + // Then process with each configured delay + for delay in &chain_eth_config.block_delays { + let adjusted_range = BlockRange { + from: block_range.from + delay, + to: block_range.to + delay, + }; + process_block_range( + adjusted_range, + Arc::clone(&contract), + gas_limit, + escalation_policy.clone(), + chain_state.clone(), + metrics.clone(), + fulfilled_requests_cache.clone(), + ) + .in_current_span() + .await; + } } } }