Skip to content

Commit

Permalink
feat(fortuna_staging): use spans to create a hierarchy of logs (#1540)
Browse files Browse the repository at this point in the history
* use spans to create a hierarchy of logs

* minor imp

* remove chain id

* add a sequence processing

* added comments

* consistent with other threads

* extract method out

* add field to process block range

* add field to watch events logs

* rename method

* extract process batch method

* tidy

* update log for eth

* remove comment

* update version

* address feedback
Dev Kalra authored May 3, 2024
1 parent 586a439 commit 10dc4a0
Showing 4 changed files with 165 additions and 141 deletions.
4 changes: 2 additions & 2 deletions apps/fortuna/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "5.1.1"
version = "5.2.1"
edition = "2021"

[dependencies]
6 changes: 5 additions & 1 deletion apps/fortuna/src/chain/ethereum.rs
Original file line number Diff line number Diff line change
@@ -277,7 +277,11 @@ impl EntropyReader for PythContract {
Err(e) => match e {
ContractError::ProviderError { e } => Err(anyhow!(e)),
_ => {
tracing::info!("Gas estimation for reveal with callback failed: {:?}", e);
tracing::info!(
sequence_number = sequence_number,
"Gas estimation failed. error: {:?}",
e
);
Ok(None)
}
},
294 changes: 157 additions & 137 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
@@ -33,9 +33,17 @@ use {
Duration,
},
},
tracing,
tracing::{
self,
Instrument,
},
};

#[derive(Debug)]
pub struct BlockRange {
pub from: BlockNumber,
pub to: BlockNumber,
}

/// How much to wait before retrying in case of an RPC error
const RETRY_INTERVAL: Duration = Duration::from_secs(5);
@@ -59,98 +67,73 @@ async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
return latest_confirmed_block - chain_state.reveal_delay_blocks
}
Err(e) => {
tracing::error!(
"Chain: {} - error while getting block number. error: {:?}",
&chain_state.id,
e
);
tracing::error!("error while getting block number. error: {:?}", e);
time::sleep(RETRY_INTERVAL).await;
}
}
}
}

/// Run threads to handle events for the last `BACKLOG_RANGE` blocks. Watch for new blocks and
/// Run threads to handle events for the last `BACKLOG_RANGE` blocks, watch for new blocks and
/// handle any events for the new blocks.
#[tracing::instrument(name="keeper", skip_all, fields(chain_id=chain_state.id))]
pub async fn run_keeper_threads(
private_key: String,
chain_eth_config: EthereumConfig,
chain_state: BlockchainState,
) {
tracing::info!("Chain: {} - starting keeper", &chain_state.id);

let latest_safe_block = get_latest_safe_block(&chain_state).await;

tracing::info!(
"Chain: {} - latest safe block: {}",
&chain_state.id,
&latest_safe_block
);
tracing::info!("starting keeper");
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
tracing::info!("latest safe block: {}", &latest_safe_block);

let contract = Arc::new(
SignablePythContract::from_config(&chain_eth_config, &private_key)
.await
.expect("Chain config should be valid"),
);

let backlog_chain_state = chain_state.clone();
let backlog_contract = contract.clone();
// Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
spawn(async move {
let from_block = latest_safe_block.saturating_sub(BACKLOG_RANGE);
process_block_range(
spawn(
process_backlog(
BlockRange {
from: from_block,
from: latest_safe_block.saturating_sub(BACKLOG_RANGE),
to: latest_safe_block,
},
backlog_contract,
contract.clone(),
chain_eth_config.gas_limit,
backlog_chain_state.clone(),
chain_state.clone(),
)
.await;
tracing::info!(
"Chain: {} - backlog processing completed",
&backlog_chain_state.id
);
});
.in_current_span(),
);

let (tx, rx) = mpsc::channel::<BlockRange>(1000);

let watch_blocks_chain_state = chain_state.clone();
// Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
spawn(async move {
loop {
if let Err(e) = watch_blocks(
watch_blocks_chain_state.clone(),
latest_safe_block,
tx.clone(),
chain_eth_config.geth_rpc_wss.clone(),
)
.await
{
tracing::error!(
"Chain: {} - error in watching blocks. error: {:?}",
&watch_blocks_chain_state.id,
e
);
time::sleep(RETRY_INTERVAL).await;
}
}
});
spawn(
watch_blocks_wrapper(
chain_state.clone(),
latest_safe_block,
tx,
chain_eth_config.geth_rpc_wss.clone(),
)
.in_current_span(),
);
// Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks.
spawn(process_new_blocks(
chain_state.clone(),
rx,
Arc::clone(&contract),
chain_eth_config.gas_limit,
));
spawn(
process_new_blocks(
chain_state.clone(),
rx,
Arc::clone(&contract),
chain_eth_config.gas_limit,
)
.in_current_span(),
);
}


// Process an event for a chain. It estimates the gas for the reveal with callback and
// submits the transaction if the gas estimate is below the gas limit.
// It will return an Error if the gas estimation failed with a provider error or if the
// reveal with callback failed with a provider error.
/// Process an event for a chain. It estimates the gas for the reveal with callback and
/// submits the transaction if the gas estimate is below the gas limit.
/// It will return an Error if the gas estimation failed with a provider error or if the
/// reveal with callback failed with a provider error.
pub async fn process_event(
event: RequestedWithCallbackEvent,
chain_config: &BlockchainState,
@@ -164,10 +147,8 @@ pub async fn process_event(
Ok(result) => result,
Err(e) => {
tracing::error!(
"Chain: {} - error while revealing for provider: {} and sequence number: {} with error: {:?}",
&chain_config.id,
event.provider_address,
event.sequence_number,
sequence_number = &event.sequence_number,
"Error while revealing with error: {:?}",
e
);
return Ok(());
@@ -182,6 +163,7 @@ pub async fn process_event(
event.user_random_number,
provider_revelation,
)
.in_current_span()
.await;

match gas_estimate_res {
@@ -194,8 +176,8 @@ pub async fn process_event(

if gas_estimate > gas_limit {
tracing::error!(
"Chain: {} - gas estimate for reveal with callback is higher than the gas limit",
&chain_config.id
sequence_number = &event.sequence_number,
"Gas estimate for reveal with callback is higher than the gas limit"
);
return Ok(());
}
@@ -222,12 +204,10 @@ pub async fn process_event(
// and concluded that its Ok to not reveal.
_ => {
tracing::error!(
"Chain: {} - error while revealing for provider: {} and sequence number: {} with error: {:?}",
&chain_config.id,
event.provider_address,
event.sequence_number,
e
);
sequence_number = &event.sequence_number,
"Error while revealing with error: {:?}",
e
);
return Ok(());
}
},
@@ -236,34 +216,34 @@ pub async fn process_event(
match pending_tx.await {
Ok(res) => {
tracing::info!(
"Chain: {} - revealed for provider: {} and sequence number: {} with res: {:?}",
&chain_config.id,
event.provider_address,
event.sequence_number,
sequence_number = &event.sequence_number,
"Revealed with res: {:?}",
res
);
Ok(())
}
Err(e) => {
tracing::error!(
"Chain: {} - error while revealing for provider: {} and sequence number: {} with error: {:?}",
&chain_config.id,
event.provider_address,
event.sequence_number,
sequence_number = &event.sequence_number,
"Error while revealing with error: {:?}",
e
);
Err(e.into())
}
}
}
None => Ok(()),
None => {
tracing::info!(
sequence_number = &event.sequence_number,
"Not processing event"
);
Ok(())
}
},
Err(e) => {
tracing::error!(
"Chain: {} - error while simulating reveal for provider: {} and sequence number: {} \n error: {:?}",
&chain_config.id,
event.provider_address,
event.sequence_number,
sequence_number = &event.sequence_number,
"Error while simulating reveal with error: {:?}",
e
);
Err(e)
@@ -272,21 +252,14 @@ pub async fn process_event(
}


/// Process a range of blocks for a chain. It will fetch events for the blocks in the provided range
/// and then try to process them one by one. If the process fails, it will retry indefinitely.
/// Process a range of blocks in batches. It calls the `process_single_block_batch` method for each batch.
#[tracing::instrument(skip_all, fields(range_from_block=block_range.from, range_to_block=block_range.to))]
pub async fn process_block_range(
block_range: BlockRange,
contract: Arc<SignablePythContract>,
gas_limit: U256,
chain_state: api::BlockchainState,
) {
tracing::info!(
"Chain: {} - processing blocks from: {} to: {}",
&chain_state.id,
block_range.from,
block_range.to
);

let BlockRange {
from: first_block,
to: last_block,
@@ -297,41 +270,64 @@ pub async fn process_block_range(
if to_block > last_block {
to_block = last_block;
}

process_single_block_batch(
BlockRange {
from: current_block,
to: to_block,
},
contract.clone(),
gas_limit,
chain_state.clone(),
)
.in_current_span()
.await;

current_block = to_block + 1;
}
}

/// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch
/// and then try to process them one by one. If the process fails, it will retry indefinitely.
#[tracing::instrument(name="batch", skip_all, fields(batch_from_block=block_range.from, batch_to_block=block_range.to))]
pub async fn process_single_block_batch(
block_range: BlockRange,
contract: Arc<SignablePythContract>,
gas_limit: U256,
chain_state: api::BlockchainState,
) {
loop {
let events_res = chain_state
.contract
.get_request_with_callback_events(current_block, to_block)
.get_request_with_callback_events(block_range.from, block_range.to)
.await;

match events_res {
Ok(events) => {
for event in events {
tracing::info!(num_of_events = &events.len(), "Processing",);
for event in &events {
tracing::info!(sequence_number = &event.sequence_number, "Processing event",);
while let Err(e) =
process_event(event.clone(), &chain_state, &contract, gas_limit).await
process_event(event.clone(), &chain_state, &contract, gas_limit)
.in_current_span()
.await
{
tracing::error!(
"Chain: {} - error while processing event for sequence number: {}. Waiting for {} seconds before retry. error: {:?}",
&chain_state.id,
&event.sequence_number,
sequence_number = &event.sequence_number,
"Error while processing event. Waiting for {} seconds before retry. error: {:?}",
RETRY_INTERVAL.as_secs(),
e
);
time::sleep(RETRY_INTERVAL).await;
}
tracing::info!(sequence_number = &event.sequence_number, "Processed event",);
}
tracing::info!(
"Chain: {} - backlog processed from block: {} to block: {}",
&chain_state.id,
&current_block,
&to_block
);
current_block = to_block + 1;
tracing::info!(num_of_events = &events.len(), "Processed",);
break;
}
Err(e) => {
tracing::error!(
"Chain: {} - error while getting events from block: {} to block: {}. Waiting for {} seconds before retry. error: {:?}",
&chain_state.id,
&current_block,
&to_block,
"Error while getting events. Waiting for {} seconds before retry. error: {:?}",
RETRY_INTERVAL.as_secs(),
e
);
@@ -341,9 +337,29 @@ pub async fn process_block_range(
}
}

pub struct BlockRange {
pub from: BlockNumber,
pub to: BlockNumber,
/// Wrapper for the `watch_blocks` method. If there was an error while watching, it will retry after a delay.
/// It retries indefinitely.
#[tracing::instrument(name="watch_blocks", skip_all, fields(initial_safe_block=latest_safe_block))]
pub async fn watch_blocks_wrapper(
chain_state: BlockchainState,
latest_safe_block: BlockNumber,
tx: mpsc::Sender<BlockRange>,
geth_rpc_wss: Option<String>,
) {
loop {
if let Err(e) = watch_blocks(
chain_state.clone(),
latest_safe_block,
tx.clone(),
geth_rpc_wss.clone(),
)
.in_current_span()
.await
{
tracing::error!("watching blocks. error: {:?}", e);
time::sleep(RETRY_INTERVAL).await;
}
}
}

/// Watch for new blocks and send the range of blocks for which events have not been handled to the `tx` channel.
@@ -356,27 +372,19 @@ pub async fn watch_blocks(
tx: mpsc::Sender<BlockRange>,
geth_rpc_wss: Option<String>,
) -> Result<()> {
tracing::info!(
"Chain: {} - watching blocks to handle new events",
&chain_state.id
);
tracing::info!("Watching blocks to handle new events");
let mut last_safe_block_processed = latest_safe_block;

let provider_option = match geth_rpc_wss {
Some(wss) => Some(match Provider::<Ws>::connect(wss.clone()).await {
Ok(provider) => provider,
Err(e) => {
tracing::error!(
"Chain: {} - error while connecting to wss: {}. error: {:?}",
&chain_state.id,
wss,
e
);
tracing::error!("Error while connecting to wss: {}. error: {:?}", wss, e);
return Err(e.into());
}
}),
None => {
tracing::info!("Chain: {} - no wss provided", &chain_state.id);
tracing::info!("No wss provided");
None
}
};
@@ -396,7 +404,7 @@ pub async fn watch_blocks(
}
}

let latest_safe_block = get_latest_safe_block(&chain_state).await;
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
if latest_safe_block > last_safe_block_processed {
match tx
.send(BlockRange {
@@ -407,17 +415,15 @@ pub async fn watch_blocks(
{
Ok(_) => {
tracing::info!(
"Chain: {} - block range sent to handle events from: {} to: {}",
&chain_state.id,
&last_safe_block_processed + 1,
&latest_safe_block
from_block = &last_safe_block_processed + 1,
to_block = &latest_safe_block,
"Block range sent to handle events",
);
last_safe_block_processed = latest_safe_block;
}
Err(e) => {
tracing::error!(
"Chain: {} - error while sending block range to handle events. These will be handled in next call. error: {:?}",
&chain_state.id,
"Error while sending block range to handle events. These will be handled in next call. error: {:?}",
e
);
}
@@ -427,25 +433,39 @@ pub async fn watch_blocks(
}

/// It waits on rx channel to receive block ranges and then calls process_block_range to process them.
#[tracing::instrument(skip_all)]
pub async fn process_new_blocks(
chain_state: BlockchainState,
mut rx: mpsc::Receiver<BlockRange>,
contract: Arc<SignablePythContract>,
gas_limit: U256,
) {
tracing::info!("Waiting for new block ranges to process");
loop {
tracing::info!(
"Chain: {} - waiting for new block ranges to process",
&chain_state.id
);
if let Some(block_range) = rx.recv().await {
process_block_range(
block_range,
Arc::clone(&contract),
gas_limit,
chain_state.clone(),
)
.in_current_span()
.await;
}
}
}

/// Processes the backlog_range for a chain.
#[tracing::instrument(skip_all)]
pub async fn process_backlog(
backlog_range: BlockRange,
contract: Arc<SignablePythContract>,
gas_limit: U256,
chain_state: BlockchainState,
) {
tracing::info!("Processing backlog");
process_block_range(backlog_range, contract, gas_limit, chain_state)
.in_current_span()
.await;
tracing::info!("Backlog processed");
}

0 comments on commit 10dc4a0

Please sign in to comment.