From 10dc4a05b80d04983eb7204c548f4f6dabb26177 Mon Sep 17 00:00:00 2001 From: Dev Kalra Date: Fri, 3 May 2024 21:27:42 +0530 Subject: [PATCH] feat(fortuna_staging): use spans to create a hierarchy of logs (#1540) * use spans to create a hierarchy of logs * minor imp * remove chain id * add a sequence processing * added comments * consistent with other threads * extract method out * add field to process block range * add field to watch events logs * rename method * extract process batch method * tidy * update log for eth * remove comment * update version * address feedback --- apps/fortuna/Cargo.lock | 4 +- apps/fortuna/Cargo.toml | 2 +- apps/fortuna/src/chain/ethereum.rs | 6 +- apps/fortuna/src/keeper.rs | 294 +++++++++++++++-------------- 4 files changed, 165 insertions(+), 141 deletions(-) diff --git a/apps/fortuna/Cargo.lock b/apps/fortuna/Cargo.lock index 3573d7fb62..f2163868ee 100644 --- a/apps/fortuna/Cargo.lock +++ b/apps/fortuna/Cargo.lock @@ -1488,7 +1488,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "5.1.1" +version = "5.2.1" dependencies = [ "anyhow", "axum", @@ -2822,7 +2822,7 @@ dependencies = [ [[package]] name = "pythnet-sdk" -version = "2.0.0" +version = "2.1.0" dependencies = [ "bincode", "borsh", diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index 4259f6933c..b65957538f 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "5.1.1" +version = "5.2.1" edition = "2021" [dependencies] diff --git a/apps/fortuna/src/chain/ethereum.rs b/apps/fortuna/src/chain/ethereum.rs index 5d543960b0..c890823f56 100644 --- a/apps/fortuna/src/chain/ethereum.rs +++ b/apps/fortuna/src/chain/ethereum.rs @@ -277,7 +277,11 @@ impl EntropyReader for PythContract { Err(e) => match e { ContractError::ProviderError { e } => Err(anyhow!(e)), _ => { - tracing::info!("Gas estimation for reveal with callback failed: {:?}", e); + tracing::info!( + sequence_number = sequence_number, + "Gas estimation failed. error: {:?}", + e + ); Ok(None) } }, diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 202bd155ed..4496050161 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -33,9 +33,17 @@ use { Duration, }, }, - tracing, + tracing::{ + self, + Instrument, + }, }; +#[derive(Debug)] +pub struct BlockRange { + pub from: BlockNumber, + pub to: BlockNumber, +} /// How much to wait before retrying in case of an RPC error const RETRY_INTERVAL: Duration = Duration::from_secs(5); @@ -59,33 +67,24 @@ async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber { return latest_confirmed_block - chain_state.reveal_delay_blocks } Err(e) => { - tracing::error!( - "Chain: {} - error while getting block number. error: {:?}", - &chain_state.id, - e - ); + tracing::error!("error while getting block number. error: {:?}", e); time::sleep(RETRY_INTERVAL).await; } } } } -/// Run threads to handle events for the last `BACKLOG_RANGE` blocks. Watch for new blocks and +/// Run threads to handle events for the last `BACKLOG_RANGE` blocks, watch for new blocks and /// handle any events for the new blocks. +#[tracing::instrument(name="keeper", skip_all, fields(chain_id=chain_state.id))] pub async fn run_keeper_threads( private_key: String, chain_eth_config: EthereumConfig, chain_state: BlockchainState, ) { - tracing::info!("Chain: {} - starting keeper", &chain_state.id); - - let latest_safe_block = get_latest_safe_block(&chain_state).await; - - tracing::info!( - "Chain: {} - latest safe block: {}", - &chain_state.id, - &latest_safe_block - ); + tracing::info!("starting keeper"); + let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; + tracing::info!("latest safe block: {}", &latest_safe_block); let contract = Arc::new( SignablePythContract::from_config(&chain_eth_config, &private_key) @@ -93,64 +92,48 @@ pub async fn run_keeper_threads( .expect("Chain config should be valid"), ); - let backlog_chain_state = chain_state.clone(); - let backlog_contract = contract.clone(); // Spawn a thread to handle the events from last BACKLOG_RANGE blocks. - spawn(async move { - let from_block = latest_safe_block.saturating_sub(BACKLOG_RANGE); - process_block_range( + spawn( + process_backlog( BlockRange { - from: from_block, + from: latest_safe_block.saturating_sub(BACKLOG_RANGE), to: latest_safe_block, }, - backlog_contract, + contract.clone(), chain_eth_config.gas_limit, - backlog_chain_state.clone(), + chain_state.clone(), ) - .await; - tracing::info!( - "Chain: {} - backlog processing completed", - &backlog_chain_state.id - ); - }); + .in_current_span(), + ); let (tx, rx) = mpsc::channel::(1000); - - let watch_blocks_chain_state = chain_state.clone(); // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel. - spawn(async move { - loop { - if let Err(e) = watch_blocks( - watch_blocks_chain_state.clone(), - latest_safe_block, - tx.clone(), - chain_eth_config.geth_rpc_wss.clone(), - ) - .await - { - tracing::error!( - "Chain: {} - error in watching blocks. error: {:?}", - &watch_blocks_chain_state.id, - e - ); - time::sleep(RETRY_INTERVAL).await; - } - } - }); + spawn( + watch_blocks_wrapper( + chain_state.clone(), + latest_safe_block, + tx, + chain_eth_config.geth_rpc_wss.clone(), + ) + .in_current_span(), + ); // Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks. - spawn(process_new_blocks( - chain_state.clone(), - rx, - Arc::clone(&contract), - chain_eth_config.gas_limit, - )); + spawn( + process_new_blocks( + chain_state.clone(), + rx, + Arc::clone(&contract), + chain_eth_config.gas_limit, + ) + .in_current_span(), + ); } -// Process an event for a chain. It estimates the gas for the reveal with callback and -// submits the transaction if the gas estimate is below the gas limit. -// It will return an Error if the gas estimation failed with a provider error or if the -// reveal with callback failed with a provider error. +/// Process an event for a chain. It estimates the gas for the reveal with callback and +/// submits the transaction if the gas estimate is below the gas limit. +/// It will return an Error if the gas estimation failed with a provider error or if the +/// reveal with callback failed with a provider error. pub async fn process_event( event: RequestedWithCallbackEvent, chain_config: &BlockchainState, @@ -164,10 +147,8 @@ pub async fn process_event( Ok(result) => result, Err(e) => { tracing::error!( - "Chain: {} - error while revealing for provider: {} and sequence number: {} with error: {:?}", - &chain_config.id, - event.provider_address, - event.sequence_number, + sequence_number = &event.sequence_number, + "Error while revealing with error: {:?}", e ); return Ok(()); @@ -182,6 +163,7 @@ pub async fn process_event( event.user_random_number, provider_revelation, ) + .in_current_span() .await; match gas_estimate_res { @@ -194,8 +176,8 @@ pub async fn process_event( if gas_estimate > gas_limit { tracing::error!( - "Chain: {} - gas estimate for reveal with callback is higher than the gas limit", - &chain_config.id + sequence_number = &event.sequence_number, + "Gas estimate for reveal with callback is higher than the gas limit" ); return Ok(()); } @@ -222,12 +204,10 @@ pub async fn process_event( // and concluded that its Ok to not reveal. _ => { tracing::error!( - "Chain: {} - error while revealing for provider: {} and sequence number: {} with error: {:?}", - &chain_config.id, - event.provider_address, - event.sequence_number, - e - ); + sequence_number = &event.sequence_number, + "Error while revealing with error: {:?}", + e + ); return Ok(()); } }, @@ -236,34 +216,34 @@ pub async fn process_event( match pending_tx.await { Ok(res) => { tracing::info!( - "Chain: {} - revealed for provider: {} and sequence number: {} with res: {:?}", - &chain_config.id, - event.provider_address, - event.sequence_number, + sequence_number = &event.sequence_number, + "Revealed with res: {:?}", res ); Ok(()) } Err(e) => { tracing::error!( - "Chain: {} - error while revealing for provider: {} and sequence number: {} with error: {:?}", - &chain_config.id, - event.provider_address, - event.sequence_number, + sequence_number = &event.sequence_number, + "Error while revealing with error: {:?}", e ); Err(e.into()) } } } - None => Ok(()), + None => { + tracing::info!( + sequence_number = &event.sequence_number, + "Not processing event" + ); + Ok(()) + } }, Err(e) => { tracing::error!( - "Chain: {} - error while simulating reveal for provider: {} and sequence number: {} \n error: {:?}", - &chain_config.id, - event.provider_address, - event.sequence_number, + sequence_number = &event.sequence_number, + "Error while simulating reveal with error: {:?}", e ); Err(e) @@ -272,21 +252,14 @@ pub async fn process_event( } -/// Process a range of blocks for a chain. It will fetch events for the blocks in the provided range -/// and then try to process them one by one. If the process fails, it will retry indefinitely. +/// Process a range of blocks in batches. It calls the `process_single_block_batch` method for each batch. +#[tracing::instrument(skip_all, fields(range_from_block=block_range.from, range_to_block=block_range.to))] pub async fn process_block_range( block_range: BlockRange, contract: Arc, gas_limit: U256, chain_state: api::BlockchainState, ) { - tracing::info!( - "Chain: {} - processing blocks from: {} to: {}", - &chain_state.id, - block_range.from, - block_range.to - ); - let BlockRange { from: first_block, to: last_block, @@ -297,41 +270,64 @@ pub async fn process_block_range( if to_block > last_block { to_block = last_block; } + + process_single_block_batch( + BlockRange { + from: current_block, + to: to_block, + }, + contract.clone(), + gas_limit, + chain_state.clone(), + ) + .in_current_span() + .await; + + current_block = to_block + 1; + } +} + +/// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch +/// and then try to process them one by one. If the process fails, it will retry indefinitely. +#[tracing::instrument(name="batch", skip_all, fields(batch_from_block=block_range.from, batch_to_block=block_range.to))] +pub async fn process_single_block_batch( + block_range: BlockRange, + contract: Arc, + gas_limit: U256, + chain_state: api::BlockchainState, +) { + loop { let events_res = chain_state .contract - .get_request_with_callback_events(current_block, to_block) + .get_request_with_callback_events(block_range.from, block_range.to) .await; match events_res { Ok(events) => { - for event in events { + tracing::info!(num_of_events = &events.len(), "Processing",); + for event in &events { + tracing::info!(sequence_number = &event.sequence_number, "Processing event",); while let Err(e) = - process_event(event.clone(), &chain_state, &contract, gas_limit).await + process_event(event.clone(), &chain_state, &contract, gas_limit) + .in_current_span() + .await { tracing::error!( - "Chain: {} - error while processing event for sequence number: {}. Waiting for {} seconds before retry. error: {:?}", - &chain_state.id, - &event.sequence_number, + sequence_number = &event.sequence_number, + "Error while processing event. Waiting for {} seconds before retry. error: {:?}", RETRY_INTERVAL.as_secs(), e ); time::sleep(RETRY_INTERVAL).await; } + tracing::info!(sequence_number = &event.sequence_number, "Processed event",); } - tracing::info!( - "Chain: {} - backlog processed from block: {} to block: {}", - &chain_state.id, - ¤t_block, - &to_block - ); - current_block = to_block + 1; + tracing::info!(num_of_events = &events.len(), "Processed",); + break; } Err(e) => { tracing::error!( - "Chain: {} - error while getting events from block: {} to block: {}. Waiting for {} seconds before retry. error: {:?}", - &chain_state.id, - ¤t_block, - &to_block, + "Error while getting events. Waiting for {} seconds before retry. error: {:?}", RETRY_INTERVAL.as_secs(), e ); @@ -341,9 +337,29 @@ pub async fn process_block_range( } } -pub struct BlockRange { - pub from: BlockNumber, - pub to: BlockNumber, +/// Wrapper for the `watch_blocks` method. If there was an error while watching, it will retry after a delay. +/// It retries indefinitely. +#[tracing::instrument(name="watch_blocks", skip_all, fields(initial_safe_block=latest_safe_block))] +pub async fn watch_blocks_wrapper( + chain_state: BlockchainState, + latest_safe_block: BlockNumber, + tx: mpsc::Sender, + geth_rpc_wss: Option, +) { + loop { + if let Err(e) = watch_blocks( + chain_state.clone(), + latest_safe_block, + tx.clone(), + geth_rpc_wss.clone(), + ) + .in_current_span() + .await + { + tracing::error!("watching blocks. error: {:?}", e); + time::sleep(RETRY_INTERVAL).await; + } + } } /// Watch for new blocks and send the range of blocks for which events have not been handled to the `tx` channel. @@ -356,27 +372,19 @@ pub async fn watch_blocks( tx: mpsc::Sender, geth_rpc_wss: Option, ) -> Result<()> { - tracing::info!( - "Chain: {} - watching blocks to handle new events", - &chain_state.id - ); + tracing::info!("Watching blocks to handle new events"); let mut last_safe_block_processed = latest_safe_block; let provider_option = match geth_rpc_wss { Some(wss) => Some(match Provider::::connect(wss.clone()).await { Ok(provider) => provider, Err(e) => { - tracing::error!( - "Chain: {} - error while connecting to wss: {}. error: {:?}", - &chain_state.id, - wss, - e - ); + tracing::error!("Error while connecting to wss: {}. error: {:?}", wss, e); return Err(e.into()); } }), None => { - tracing::info!("Chain: {} - no wss provided", &chain_state.id); + tracing::info!("No wss provided"); None } }; @@ -396,7 +404,7 @@ pub async fn watch_blocks( } } - let latest_safe_block = get_latest_safe_block(&chain_state).await; + let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; if latest_safe_block > last_safe_block_processed { match tx .send(BlockRange { @@ -407,17 +415,15 @@ pub async fn watch_blocks( { Ok(_) => { tracing::info!( - "Chain: {} - block range sent to handle events from: {} to: {}", - &chain_state.id, - &last_safe_block_processed + 1, - &latest_safe_block + from_block = &last_safe_block_processed + 1, + to_block = &latest_safe_block, + "Block range sent to handle events", ); last_safe_block_processed = latest_safe_block; } Err(e) => { tracing::error!( - "Chain: {} - error while sending block range to handle events. These will be handled in next call. error: {:?}", - &chain_state.id, + "Error while sending block range to handle events. These will be handled in next call. error: {:?}", e ); } @@ -427,17 +433,15 @@ pub async fn watch_blocks( } /// It waits on rx channel to receive block ranges and then calls process_block_range to process them. +#[tracing::instrument(skip_all)] pub async fn process_new_blocks( chain_state: BlockchainState, mut rx: mpsc::Receiver, contract: Arc, gas_limit: U256, ) { + tracing::info!("Waiting for new block ranges to process"); loop { - tracing::info!( - "Chain: {} - waiting for new block ranges to process", - &chain_state.id - ); if let Some(block_range) = rx.recv().await { process_block_range( block_range, @@ -445,7 +449,23 @@ pub async fn process_new_blocks( gas_limit, chain_state.clone(), ) + .in_current_span() .await; } } } + +/// Processes the backlog_range for a chain. +#[tracing::instrument(skip_all)] +pub async fn process_backlog( + backlog_range: BlockRange, + contract: Arc, + gas_limit: U256, + chain_state: BlockchainState, +) { + tracing::info!("Processing backlog"); + process_block_range(backlog_range, contract, gas_limit, chain_state) + .in_current_span() + .await; + tracing::info!("Backlog processed"); +}