Skip to content

Commit

Permalink
Begin reworking L1Client logic
Browse files Browse the repository at this point in the history
Closes ##2491

Begin to add chunked fetching of stake table events.
  • Loading branch information
tbro committed Jan 27, 2025
1 parent e343feb commit 6cfed7f
Showing 1 changed file with 71 additions and 29 deletions.
100 changes: 71 additions & 29 deletions types/src/v0/impls/l1.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use std::{
cmp::{min, Ordering},
fmt::Debug,
num::NonZeroUsize,
sync::Arc,
time::Instant,
cmp::{min, Ordering}, fmt::Debug, iter::FromFn, num::NonZeroUsize, sync::Arc, time::Instant
};

use anyhow::Context;
Expand Down Expand Up @@ -679,6 +675,25 @@ impl L1Client {
(state, block)
}

/// Divide the range `start..=end` into chunks of size
/// `events_max_block_range`.
fn chunky(&self, start: u64, end: u64) -> FromFn<impl FnMut() -> Option<(u64, u64)>> {
let mut start = start;
let chunk_size = self.options().l1_events_max_block_range;
let chunks = std::iter::from_fn(move || {
let chunk_end = min(start + chunk_size - 1, end);
if chunk_end < start {
return None;
}

let chunk = (start, chunk_end);
start = chunk_end + 1;
Some(chunk)
});
chunks

}

/// Get fee info for each `Deposit` occurring between `prev`
/// and `new`. Returns `Vec<FeeInfo>`
pub async fn get_finalized_deposits(
Expand All @@ -701,19 +716,7 @@ impl L1Client {

// Divide the range `prev_finalized..=new_finalized` into chunks of size
// `events_max_block_range`.
let mut start = prev;
let end = new_finalized;
let chunk_size = opt.l1_events_max_block_range;
let chunks = std::iter::from_fn(move || {
let chunk_end = min(start + chunk_size - 1, end);
if chunk_end < start {
return None;
}

let chunk = (start, chunk_end);
start = chunk_end + 1;
Some(chunk)
});
let chunks = self.chunky(prev, new_finalized);

// Fetch events for each chunk.
let events = stream::iter(chunks).then(|(from, to)| {
Expand Down Expand Up @@ -750,19 +753,45 @@ impl L1Client {
contract: Address,
block: u64,
) -> anyhow::Result<StakeTables> {
// TODO stake_table_address needs to be passed in to L1Client
// before update loop starts.
let stake_table_contract = PermissionedStakeTable::new(contract, self.provider.clone());

let events = stake_table_contract
.stakers_updated_filter()
.from_block(0)
.to_block(block)
.query()
.await?;

Ok(StakeTables::from_l1_events(events.clone()))
// TODO 1. check l1 cache for last known, 2. fetch missing
// then: get last known block, so we need to store last fetched block in state
let last_fetched_block = 0;
let chunks = self.chunky(last_fetched_block, block);

// Fetch events for each chunk.
let events = stream::iter(chunks).then(|(from, to)| {
// TODO stake_table_address needs to be passed in to L1Client
// before update loop starts.
let stake_table_contract = PermissionedStakeTable::new(contract, self.provider.clone());
let retry_delay = self.options().l1_retry_delay;
async move {
tracing::debug!(from, to, "fetch events in range");

// query for stake table events, loop until successful.
loop {
match stake_table_contract
.stakers_updated_filter()
.from_block(0)
.to_block(block)
.query()
.await
{
Ok(events) => break stream::iter(events),
Err(err) => {
tracing::warn!(from, to, %err, "StakeTable L1Event Error");
sleep(retry_delay).await;
}
}
}
}
});

let events = events.flatten().collect::<Vec<_>>().await;

Ok(StakeTables::from_l1_events(events))
}

fn options(&self) -> &L1ClientOptions {
(*self.provider).as_ref().options()
}
Expand Down Expand Up @@ -1182,6 +1211,19 @@ mod test {
tracing::info!(?final_state, "state updated");
}


#[tokio::test]
async fn test_chunky() {

let anvil = Anvil::new().spawn();
let opt = L1ClientOptions { l1_events_max_block_range: 2, ..Default::default()};
let l1_client = opt.connect(vec![anvil.endpoint().parse().unwrap()]);
let chunks = l1_client.chunky(3, 10);
let v = stream::iter(chunks).collect::<Vec<_>>().await;

assert_eq![vec![(3,4), (5,6), (7,8), (9,10)], v];
}

#[tokio::test]
async fn test_fetch_stake_table() -> anyhow::Result<()> {
setup_test();
Expand Down

0 comments on commit 6cfed7f

Please sign in to comment.