Skip to content

Commit

Permalink
refactor: move block delays to EthereumConfig and support multiple de…
Browse files Browse the repository at this point in the history
…lays

Co-Authored-By: Jayant Krishnamurthy <[email protected]>
  • Loading branch information
devin-ai-integration[bot] and Jayant Krishnamurthy committed Jan 30, 2025
1 parent 71e1a2f commit db9439b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 93 deletions.
7 changes: 5 additions & 2 deletions apps/fortuna/config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ chains:
target_profit_pct: 20
max_profit_pct: 100

# A list of block delays for processing blocks multiple times. Each number represents
# how many blocks to wait before processing. For example, [5, 10, 20] means process
# blocks after 5 blocks, then again after 10 blocks, and finally after 20 blocks.
block_delays: [5, 10, 20]

# Historical commitments -- delete this block for local development purposes
commitments:
# prettier-ignore
Expand Down Expand Up @@ -78,5 +83,3 @@ keeper:
value: 0xabcd
# For production, you can store the private key in a file.
# file: keeper-key.txt
# Number of blocks to wait before processing new blocks (in addition to reveal_delay_blocks)
delay_blocks: 5
18 changes: 10 additions & 8 deletions apps/fortuna/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ pub struct EthereumConfig {
/// Maximum number of hashes to record in a request.
/// This should be set according to the maximum gas limit the provider supports for callbacks.
pub max_num_hashes: Option<u32>,

/// A list of delays (in blocks) that indicates how many blocks should be delayed
/// before we process a block. For retry logic, we can process blocks multiple times
/// at each specified delay. For example: [5, 10, 20].
#[serde(default = "default_block_delays")]
pub block_delays: Vec<u64>,
}

fn default_block_delays() -> Vec<u64> {
vec![5]
}

fn default_priority_fee_multiplier_pct() -> u64 {
Expand Down Expand Up @@ -341,14 +351,6 @@ pub struct KeeperConfig {
/// This key *does not need to be a registered provider*. In particular, production deployments
/// should ensure this is a different key in order to reduce the severity of security breaches.
pub private_key: SecretString,

/// Number of blocks to wait before processing new blocks (in addition to reveal_delay_blocks)
#[serde(default = "default_delay_blocks")]
pub delay_blocks: u64,
}

fn default_delay_blocks() -> u64 {
5
}

// A secret is a string that can be provided either as a literal in the config,
Expand Down
112 changes: 29 additions & 83 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl KeeperMetrics {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BlockRange {
pub from: BlockNumber,
pub to: BlockNumber,
Expand Down Expand Up @@ -335,7 +335,7 @@ pub async fn run_keeper_threads(
.in_current_span(),
);

let (tx, _rx) = mpsc::channel::<BlockRange>(1000);
let (tx, rx) = mpsc::channel::<BlockRange>(1000);
// Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
spawn(
watch_blocks_wrapper(
Expand All @@ -346,57 +346,18 @@ pub async fn run_keeper_threads(
)
.in_current_span(),
);
// Create channels for both immediate and delayed block processing
let (tx_immediate, rx_immediate) = mpsc::channel::<BlockRange>(1000);
let (tx_delayed, rx_delayed) = mpsc::channel::<BlockRange>(1000);

// Spawn a thread to watch for new blocks and send the range of blocks to both channels
spawn(
watch_blocks_wrapper(
chain_state.clone(),
latest_safe_block,
tx_immediate,
chain_eth_config.geth_rpc_wss.clone(),
)
.in_current_span(),
);

// Clone the tx_delayed channel for the second watch_blocks_wrapper
spawn(
watch_blocks_wrapper(
chain_state.clone(),
latest_safe_block,
tx_delayed,
chain_eth_config.geth_rpc_wss.clone(),
)
.in_current_span(),
);

// Spawn a thread for immediate block processing
// Spawn a thread for block processing with configured delays
spawn(
process_new_blocks(
chain_state.clone(),
rx_immediate,
rx,
Arc::clone(&contract),
gas_limit,
chain_eth_config.escalation_policy.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span(),
);

// Spawn a thread for delayed block processing
spawn(
process_new_blocks_delayed(
chain_state.clone(),
rx_delayed,
Arc::clone(&contract),
gas_limit,
chain_eth_config.escalation_policy.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
5,
chain_eth_config.clone(),
)
.in_current_span(),
);
Expand Down Expand Up @@ -1006,8 +967,10 @@ pub async fn watch_blocks(
}
}

/// It waits on rx channel to receive block ranges and then calls process_block_range to process them.
/// It waits on rx channel to receive block ranges and then calls process_block_range to process them
/// for each configured block delay.
#[tracing::instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
pub async fn process_new_blocks(
chain_state: BlockchainState,
mut rx: mpsc::Receiver<BlockRange>,
Expand All @@ -1016,12 +979,14 @@ pub async fn process_new_blocks(
escalation_policy: EscalationPolicyConfig,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
chain_eth_config: EthereumConfig,
) {
tracing::info!("Waiting for new block ranges to process");
loop {
if let Some(block_range) = rx.recv().await {
// Process blocks immediately first
process_block_range(
block_range,
block_range.clone(),
Arc::clone(&contract),
gas_limit,
escalation_policy.clone(),
Expand All @@ -1031,44 +996,25 @@ pub async fn process_new_blocks(
)
.in_current_span()
.await;
}
}
}

#[tracing::instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
pub async fn process_new_blocks_delayed(
chain_state: BlockchainState,
mut rx: mpsc::Receiver<BlockRange>,
contract: Arc<InstrumentedSignablePythContract>,
gas_limit: U256,
escalation_policy: EscalationPolicyConfig,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
delay_blocks: u64,
) {
tracing::info!(
"Waiting for new block ranges to process with {} block delay",
delay_blocks
);
loop {
if let Some(block_range) = rx.recv().await {
let from_block_after_delay = block_range.from + delay_blocks;
let adjusted_range = BlockRange {
from: from_block_after_delay,
to: block_range.to + delay_blocks,
};
process_block_range(
adjusted_range,
Arc::clone(&contract),
gas_limit,
escalation_policy.clone(),
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await;
// Then process with each configured delay
for delay in &chain_eth_config.block_delays {
let adjusted_range = BlockRange {
from: block_range.from + delay,
to: block_range.to + delay,
};
process_block_range(
adjusted_range,
Arc::clone(&contract),
gas_limit,
escalation_policy.clone(),
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await;
}
}
}
}
Expand Down

0 comments on commit db9439b

Please sign in to comment.