Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

Reset provider #449

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion fendermint/vm/topdown/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use thiserror::Error;
pub enum Error {
#[error("Incoming items are not order sequentially")]
NotSequential,
#[error("The parent view update with block height is not sequential: {0:?}")]
#[error(
"Cache update wrt height is not sequential, cache might be changed during fetching: {0:?}"
)]
NonSequentialParentViewInsert(SequentialAppendError),
#[error("Parent chain reorg detected")]
ParentChainReorgDetected,
Expand Down
4 changes: 4 additions & 0 deletions fendermint/vm/topdown/src/finality/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ impl<T> CachedFinalityProvider<T> {
pub fn cached_blocks(&self) -> Stm<BlockHeight> {
self.inner.cached_blocks()
}

pub fn first_non_null_block(&self, height: BlockHeight) -> Stm<Option<BlockHeight>> {
self.inner.first_non_null_block(height)
}
}

#[cfg(test)]
Expand Down
152 changes: 106 additions & 46 deletions fendermint/vm/topdown/src/finality/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,31 +145,36 @@ impl FinalityWithNull {
/// Get the latest height tracked in the provider, includes both cache and last committed finality
pub(crate) fn latest_height(&self) -> Stm<Option<BlockHeight>> {
let h = if let Some(h) = self.latest_height_in_cache()? {
tracing::debug!(height = h, "latest height found in cache");
h
} else if let Some(p) = self.last_committed_finality()? {
tracing::debug!(
height = p.height,
"no cache, use previous last committed finality"
);
p.height
} else {
return Ok(None);
};
Ok(Some(h))
}
}

/// All the private functions
impl FinalityWithNull {
/// Get the first non-null block in the range [start, end].
fn first_non_null_block_before(&self, height: BlockHeight) -> Stm<Option<BlockHeight>> {
/// Get the first non-null block in the range of earliest cache block till the height specified, inclusive.
pub(crate) fn first_non_null_block(&self, height: BlockHeight) -> Stm<Option<BlockHeight>> {
let cache = self.cached_data.read()?;
Ok(cache.lower_bound().and_then(|lower_bound| {
for h in (lower_bound..height).rev() {
for h in (lower_bound..=height).rev() {
if let Some(Some(_)) = cache.get_value(h) {
return Some(h);
}
}
None
}))
}
}

/// All the private functions
impl FinalityWithNull {
fn propose_next_height(&self) -> Stm<Option<BlockHeight>> {
let latest_height = if let Some(h) = self.latest_height_in_cache()? {
h
Expand All @@ -188,18 +193,17 @@ impl FinalityWithNull {
let candidate_height = min(max_proposal_height, latest_height);
tracing::debug!(max_proposal_height, candidate_height, "propose heights");

let first_non_null_height =
if let Some(h) = self.first_non_null_block_before(candidate_height)? {
h
} else {
tracing::debug!(height = candidate_height, "no non-null block found before");
return Ok(None);
};
let first_non_null_height = if let Some(h) = self.first_non_null_block(candidate_height)? {
h
} else {
tracing::debug!(height = candidate_height, "no non-null block found before");
return Ok(None);
};

tracing::debug!(first_non_null_height, candidate_height);
// an extra layer of delay
let maybe_proposal_height =
self.first_non_null_block_before(first_non_null_height - self.config.proposal_delay())?;
self.first_non_null_block(first_non_null_height - self.config.proposal_delay())?;
tracing::debug!(
delayed_height = maybe_proposal_height,
delay = self.config.proposal_delay()
Expand Down Expand Up @@ -387,19 +391,18 @@ mod tests {
let parent_blocks = vec![
(100, Some((vec![0; 32], vec![], vec![]))), // last committed block
(101, Some((vec![1; 32], vec![], vec![]))), // cache start
(102, Some((vec![2; 32], vec![], vec![]))), // final proposal height
(103, Some((vec![3; 32], vec![], vec![]))), // final delayed height
(104, Some((vec![4; 32], vec![], vec![]))),
(105, Some((vec![5; 32], vec![], vec![]))), // first non null block
(106, Some((vec![6; 32], vec![], vec![]))), // max proposal height (last committed + 6)
(107, Some((vec![7; 32], vec![], vec![]))),
(108, Some((vec![8; 32], vec![], vec![]))), // cache latest height
(102, Some((vec![2; 32], vec![], vec![]))),
(103, Some((vec![3; 32], vec![], vec![]))),
(104, Some((vec![4; 32], vec![], vec![]))), // final delayed height + proposal height
(105, Some((vec![5; 32], vec![], vec![]))),
(106, Some((vec![6; 32], vec![], vec![]))), // max proposal height (last committed + 6), first non null block
(107, Some((vec![7; 32], vec![], vec![]))), // cache latest height
];
let provider = new_provider(parent_blocks).await;

let f = IPCParentFinality {
height: 102,
block_hash: vec![2; 32],
height: 104,
block_hash: vec![4; 32],
};
assert_eq!(
atomically(|| provider.next_proposal()).await,
Expand All @@ -419,7 +422,7 @@ mod tests {
);

// this ensures sequential insertion is still valid
atomically_or_err(|| provider.new_parent_view(109, None))
atomically_or_err(|| provider.new_parent_view(108, None))
.await
.unwrap();
}
Expand All @@ -429,20 +432,20 @@ mod tests {
// max_proposal_range is 6. proposal_delay is 2
let parent_blocks = vec![
(100, Some((vec![0; 32], vec![], vec![]))), // last committed block
(101, Some((vec![1; 32], vec![], vec![]))), // cache start and final height
(102, Some((vec![2; 32], vec![], vec![]))), // delayed height
(103, Some((vec![3; 32], vec![], vec![]))),
(104, Some((vec![4; 32], vec![], vec![]))), // first non null block
(105, Some((vec![4; 32], vec![], vec![]))), // cache latest height
(101, Some((vec![1; 32], vec![], vec![]))),
(102, Some((vec![2; 32], vec![], vec![]))),
(103, Some((vec![3; 32], vec![], vec![]))), // delayed height + final height
(104, Some((vec![4; 32], vec![], vec![]))),
(105, Some((vec![4; 32], vec![], vec![]))), // cache latest height, first non null block
// max proposal height is 106
];
let provider = new_provider(parent_blocks).await;

assert_eq!(
atomically(|| provider.next_proposal()).await,
Some(IPCParentFinality {
height: 101,
block_hash: vec![1; 32]
height: 103,
block_hash: vec![3; 32]
})
);
}
Expand All @@ -459,15 +462,44 @@ mod tests {
(107, None),
(108, None),
(109, None),
(110, Some((vec![4; 32], vec![], vec![]))), // cache latest height
(110, Some((vec![4; 32], vec![], vec![]))), // first non null block
(111, None), // cache latest height
// max proposal height is 112
];
let mut provider = new_provider(parent_blocks).await;
provider.config.max_proposal_range = Some(8);
provider.config.max_proposal_range = Some(10);

assert_eq!(atomically(|| provider.next_proposal()).await, None);
}

#[tokio::test]
async fn test_with_some_null_blocks() {
// max_proposal_range is 10. proposal_delay is 2
let parent_blocks = vec![
(102, Some((vec![2; 32], vec![], vec![]))), // last committed block
(103, None),
(104, None),
(105, Some((vec![5; 32], vec![], vec![]))), // final block
(106, None),
(107, None),
(108, None), // delayed block
(109, None),
(110, Some((vec![10; 32], vec![], vec![]))), // first non null block
(111, None), // cache latest height
// max proposal height is 112
];
let mut provider = new_provider(parent_blocks).await;
provider.config.max_proposal_range = Some(10);

assert_eq!(
atomically(|| provider.next_proposal()).await,
Some(IPCParentFinality {
height: 105,
block_hash: vec![5; 32]
})
);
}

#[tokio::test]
async fn test_with_partially_null_blocks_i() {
// max_proposal_range is 10. proposal_delay is 2
Expand All @@ -477,14 +509,14 @@ mod tests {
(104, None), // we wont have a proposal because after delay, there is no more non-null proposal
(105, None),
(106, None),
(107, Some((vec![7; 32], vec![], vec![]))), // first non null block
(108, None),
(109, None),
(110, Some((vec![10; 32], vec![], vec![]))), // cache latest height
(107, None),
(108, None), // delayed block
(109, Some((vec![8; 32], vec![], vec![]))),
(110, Some((vec![10; 32], vec![], vec![]))), // cache latest height, first non null block
// max proposal height is 112
];
let mut provider = new_provider(parent_blocks).await;
provider.config.max_proposal_range = Some(8);
provider.config.max_proposal_range = Some(10);

assert_eq!(atomically(|| provider.next_proposal()).await, None);
}
Expand All @@ -494,24 +526,52 @@ mod tests {
// max_proposal_range is 10. proposal_delay is 2
let parent_blocks = vec![
(102, Some((vec![2; 32], vec![], vec![]))), // last committed block
(103, Some((vec![3; 32], vec![], vec![]))), // first non null delayed block, final
(103, Some((vec![3; 32], vec![], vec![]))),
(104, None),
(105, None), // delayed block
(105, None),
(106, None),
(107, Some((vec![7; 32], vec![], vec![]))), // first non null block
(108, None),
(107, Some((vec![7; 32], vec![], vec![]))), // first non null after delay
(108, None), // delayed block
(109, None),
(110, Some((vec![10; 32], vec![], vec![]))), // cache latest height
(110, Some((vec![10; 32], vec![], vec![]))), // cache latest height, first non null block
// max proposal height is 112
];
let mut provider = new_provider(parent_blocks).await;
provider.config.max_proposal_range = Some(8);
provider.config.max_proposal_range = Some(10);

assert_eq!(
atomically(|| provider.next_proposal()).await,
Some(IPCParentFinality {
height: 103,
block_hash: vec![3; 32]
height: 107,
block_hash: vec![7; 32]
})
);
}

#[tokio::test]
async fn test_with_partially_null_blocks_iii() {
let parent_blocks = vec![
(102, Some((vec![2; 32], vec![], vec![]))), // last committed block
(103, Some((vec![3; 32], vec![], vec![]))),
(104, None),
(105, None),
(106, None),
(107, Some((vec![7; 32], vec![], vec![]))), // first non null delayed block, final
(108, None), // delayed block
(109, None),
(110, Some((vec![10; 32], vec![], vec![]))), // first non null block
(111, None),
(112, None),
// max proposal height is 122
];
let mut provider = new_provider(parent_blocks).await;
provider.config.max_proposal_range = Some(20);

assert_eq!(
atomically(|| provider.next_proposal()).await,
Some(IPCParentFinality {
height: 107,
block_hash: vec![7; 32]
})
);
}
Expand Down
5 changes: 1 addition & 4 deletions fendermint/vm/topdown/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0, MIT
//! A constant running process that fetch or listener to parent state

mod pointers;
mod syncer;
mod tendermint;

Expand Down Expand Up @@ -155,9 +154,7 @@ where
let mut interval = tokio::time::interval(config.polling_interval);

tokio::spawn(async move {
let lotus_syncer = LotusParentSyncer::new(config, parent_client, provider, query)
.await
.expect("");
let lotus_syncer = LotusParentSyncer::new(config, parent_client, provider, query);
let mut tendermint_syncer = TendermintAwareSyncer::new(lotus_syncer, tendermint_client);

loop {
Expand Down
50 changes: 0 additions & 50 deletions fendermint/vm/topdown/src/sync/pointers.rs

This file was deleted.

Loading