From 247b3404a1d2302c51733f7044002ce43b4f91b9 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Mon, 6 May 2024 10:05:32 +0000 Subject: [PATCH 1/6] hack --- fendermint/app/src/cmd/run.rs | 16 ++- fendermint/vm/topdown/src/cache_store.rs | 142 ++++++++++++++++++++ fendermint/vm/topdown/src/finality/fetch.rs | 18 ++- fendermint/vm/topdown/src/finality/null.rs | 93 ++++++++++++- fendermint/vm/topdown/src/lib.rs | 2 + 5 files changed, 256 insertions(+), 15 deletions(-) create mode 100644 fendermint/vm/topdown/src/cache_store.rs diff --git a/fendermint/app/src/cmd/run.rs b/fendermint/app/src/cmd/run.rs index 28f8b9a97..25674e20d 100644 --- a/fendermint/app/src/cmd/run.rs +++ b/fendermint/app/src/cmd/run.rs @@ -25,7 +25,7 @@ use fendermint_vm_snapshot::{SnapshotManager, SnapshotParams}; use fendermint_vm_topdown::proxy::IPCProviderProxy; use fendermint_vm_topdown::sync::launch_polling_syncer; use fendermint_vm_topdown::voting::{publish_vote_loop, Error as VoteError, VoteTally}; -use fendermint_vm_topdown::{CachedFinalityProvider, IPCParentFinality, Toggle}; +use fendermint_vm_topdown::{CacheStore, CachedFinalityProvider, IPCParentFinality, Toggle}; use fvm_shared::address::{current_network, Address, Network}; use ipc_ipld_resolver::{Event as ResolverEvent, VoteRecord}; use ipc_provider::config::subnet::{EVMSubnet, SubnetConfig}; @@ -52,7 +52,8 @@ namespaces! { app, state_hist, state_store, - bit_store + bit_store, + finality_cache } } @@ -152,6 +153,9 @@ async fn run(settings: Settings) -> anyhow::Result<()> { let ns = Namespaces::default(); let db = open_db(&settings, &ns).context("error opening DB")?; + let finality_store = + CacheStore::new(db.clone(), ns.finality_cache).context("error creating cache store DB")?; + // Blockstore for actors. let state_store = NamespaceBlockstore::new(db.clone(), ns.state_store).context("error creating state DB")?; @@ -250,8 +254,12 @@ async fn run(settings: Settings) -> anyhow::Result<()> { } let ipc_provider = Arc::new(make_ipc_provider_proxy(&settings)?); - let finality_provider = - CachedFinalityProvider::uninitialized(config.clone(), ipc_provider.clone()).await?; + let finality_provider = CachedFinalityProvider::uninitialized( + config.clone(), + ipc_provider.clone(), + finality_store.clone(), + ) + .await?; let p = Arc::new(Toggle::enabled(finality_provider)); (p, Some((ipc_provider, config))) } else { diff --git a/fendermint/vm/topdown/src/cache_store.rs b/fendermint/vm/topdown/src/cache_store.rs new file mode 100644 index 000000000..7b5f87bd3 --- /dev/null +++ b/fendermint/vm/topdown/src/cache_store.rs @@ -0,0 +1,142 @@ +use anyhow::{anyhow, Ok}; +use fendermint_rocksdb::RocksDb; +use rocksdb::{BoundColumnFamily, IteratorMode, OptimisticTransactionDB}; +use std::sync::Arc; + +use crate::{BlockHeight, ParentViewPayload}; + +/// A [`Blockstore`] implementation that writes to a specific namespace, not the default like above. +#[derive(Clone)] +pub struct CacheStore { + db: Arc, + ns: String, +} + +impl CacheStore { + pub fn new(db: RocksDb, ns: String) -> anyhow::Result { + // All namespaces are pre-created during open. + if !db.has_cf_handle(&ns) { + Err(anyhow!("namespace {ns} does not exist!")) + } else { + Ok(Self { db: db.db, ns }) + } + } + + // Unfortunately there doesn't seem to be a way to avoid having to + // clone another instance for each operation :( + fn cf(&self) -> anyhow::Result> { + self.db + .cf_handle(&self.ns) + .ok_or_else(|| anyhow!("namespace {} does not exist!", self.ns)) + } +} + +impl CacheStore { + /*pub fn get(&self, height: BlockHeight) -> anyhow::Result>> { + Ok(self.db.get_cf(&self.cf()?, height.to_be_bytes())?) + } */ + + pub fn put( + &self, + height: BlockHeight, + value: Option>, + ) -> anyhow::Result<()> { + let bytes = fvm_ipld_encoding::to_vec(&value)?; + + Ok(self.db.put_cf(&self.cf()?, height.to_be_bytes(), bytes)?) + } + + pub fn delete(&self, height: BlockHeight) -> anyhow::Result<()> { + Ok(self.db.delete_cf(&self.cf()?, height.to_be_bytes())?) + } + + pub fn delete_all(&self) -> anyhow::Result<()> { + let iter = self.db.iterator_cf(&self.cf()?, IteratorMode::Start); + for item in iter { + let (key, _) = item?; + self.db.delete_cf(&self.cf()?, key)?; + } + + Ok(()) + } + + pub fn delete_below(&self, height: BlockHeight) -> anyhow::Result<()> { + let iter = self.db.iterator_cf(&self.cf()?, IteratorMode::Start); + for item in iter { + let (key, _) = item?; + let key = BlockHeight::from_be_bytes(key[0..8].try_into().unwrap()); + if key < height { + self.db.delete_cf(&self.cf()?, key.to_be_bytes())?; + } + } + + Ok(()) + } + + pub fn count(&self) -> anyhow::Result { + let mut count = 0; + let iter = self.db.iterator_cf(&self.cf()?, IteratorMode::Start); + for _ in iter { + count += 1; + } + + Ok(count) + } + + pub fn upper_bound(&self) -> anyhow::Result> { + let iter = self.db.iterator_cf(&self.cf()?, IteratorMode::End); + if let Some(item) = iter.last() { + let (key, _) = item?; + Ok(Some(BlockHeight::from_be_bytes( + key[0..8].try_into().unwrap(), + ))) + } else { + Ok(None) + } + } + + pub fn lower_bound(&self) -> anyhow::Result> { + let mut iter = self.db.iterator_cf(&self.cf()?, IteratorMode::Start); + if let Some(item) = iter.next() { + let (key, _) = item?; + Ok(Some(BlockHeight::from_be_bytes( + key[0..8].try_into().unwrap(), + ))) + } else { + Ok(None) + } + } + + pub fn get_value( + &self, + height: BlockHeight, + ) -> anyhow::Result>> { + let value = self.db.get_cf(&self.cf()?, height.to_be_bytes())?; + match value { + Some(value) => Ok(Some(fvm_ipld_encoding::from_slice(&value)?)), + None => Ok(None), + } + } + + pub fn append( + &self, + height: BlockHeight, + block: Option, + ) -> anyhow::Result<()> { + let expected_next_key = if let Some(upper) = self.upper_bound()? { + upper + 1 + } else { + 0 + }; + + if height != expected_next_key { + return Err(anyhow!( + "expected next key to be {}, but got {}", + expected_next_key, + height + )); + } + + self.put(height, Some(block)) + } +} diff --git a/fendermint/vm/topdown/src/finality/fetch.rs b/fendermint/vm/topdown/src/finality/fetch.rs index fb4203045..2b46a416c 100644 --- a/fendermint/vm/topdown/src/finality/fetch.rs +++ b/fendermint/vm/topdown/src/finality/fetch.rs @@ -5,7 +5,7 @@ use crate::finality::null::FinalityWithNull; use crate::finality::ParentViewPayload; use crate::proxy::ParentQueryProxy; use crate::{ - handle_null_round, BlockHash, BlockHeight, Config, Error, IPCParentFinality, + handle_null_round, BlockHash, BlockHeight, CacheStore, Config, Error, IPCParentFinality, ParentFinalityProvider, ParentViewProvider, }; use async_stm::{Stm, StmResult}; @@ -133,9 +133,13 @@ impl CachedFinalityProvider { /// We need this because `fendermint` has yet to be initialized and might /// not be able to provide an existing finality from the storage. This provider requires an /// existing committed finality. Providing the finality will enable other functionalities. - pub async fn uninitialized(config: Config, parent_client: Arc) -> anyhow::Result { + pub async fn uninitialized( + config: Config, + parent_client: Arc, + cache_store: CacheStore, + ) -> anyhow::Result { let genesis = parent_client.get_genesis_epoch().await?; - Ok(Self::new(config, genesis, None, parent_client)) + Ok(Self::new(config, genesis, None, parent_client, cache_store)) } /// Should always return the top down messages, only when ipc parent_client is down after exponential @@ -190,8 +194,14 @@ impl CachedFinalityProvider { genesis_epoch: BlockHeight, committed_finality: Option, parent_client: Arc, + cache_store: CacheStore, ) -> Self { - let inner = FinalityWithNull::new(config.clone(), genesis_epoch, committed_finality); + let inner = FinalityWithNull::new( + config.clone(), + genesis_epoch, + committed_finality, + cache_store.clone(), + ); Self { inner, config, diff --git a/fendermint/vm/topdown/src/finality/null.rs b/fendermint/vm/topdown/src/finality/null.rs index 9a4a7beea..635ff800d 100644 --- a/fendermint/vm/topdown/src/finality/null.rs +++ b/fendermint/vm/topdown/src/finality/null.rs @@ -4,7 +4,9 @@ use crate::finality::{ ensure_sequential, topdown_cross_msgs, validator_changes, ParentViewPayload, }; -use crate::{BlockHash, BlockHeight, Config, Error, IPCParentFinality, SequentialKeyCache}; +use crate::{ + BlockHash, BlockHeight, CacheStore, Config, Error, IPCParentFinality, SequentialKeyCache, +}; use async_stm::{abort, atomically, Stm, StmResult, TVar}; use ipc_api::cross::IpcEnvelope; use ipc_api::staking::StakingChangeRequest; @@ -23,6 +25,7 @@ pub struct FinalityWithNull { /// This is a in memory view of the committed parent finality. We need this as a starting point /// for populating the cache last_committed_finality: TVar>, + cache_store: CacheStore, } impl FinalityWithNull { @@ -30,12 +33,14 @@ impl FinalityWithNull { config: Config, genesis_epoch: BlockHeight, committed_finality: Option, + cache_store: CacheStore, ) -> Self { Self { config, genesis_epoch, cached_data: TVar::new(SequentialKeyCache::sequential()), last_committed_finality: TVar::new(committed_finality), + cache_store, } } @@ -66,6 +71,7 @@ impl FinalityWithNull { /// Clear the cache and set the committed finality to the provided value pub fn reset(&self, finality: IPCParentFinality) -> Stm<()> { self.cached_data.write(SequentialKeyCache::sequential())?; + self.cache_store.delete_all().unwrap(); self.last_committed_finality.write(Some(finality)) } @@ -118,6 +124,7 @@ impl FinalityWithNull { cache.remove_key_below(height); cache })?; + self.cache_store.delete_below(height).unwrap(); let hash = hex::encode(&finality.block_hash); @@ -137,6 +144,12 @@ impl FinalityWithNull { /// Returns the number of blocks cached. pub(crate) fn cached_blocks(&self) -> Stm { let cache = self.cached_data.read()?; + let store_count = self.cache_store.count().unwrap(); + tracing::info!( + cache_count = cache.size(), + store_count, + "COMPARE cached_blocks" + ); Ok(cache.size() as BlockHeight) } @@ -152,6 +165,12 @@ impl FinalityWithNull { pub(crate) fn latest_height_in_cache(&self) -> Stm> { let cache = self.cached_data.read()?; + let store_upper_bound = self.cache_store.upper_bound().unwrap(); + tracing::info!( + cache_upper_bound = cache.upper_bound(), + store_upper_bound, + "COMPARE latest_height_in_cache" + ); Ok(cache.upper_bound()) } @@ -170,14 +189,38 @@ impl FinalityWithNull { /// Get the first non-null block in the range of earliest cache block till the height specified, inclusive. pub(crate) fn first_non_null_block(&self, height: BlockHeight) -> Stm> { let cache = self.cached_data.read()?; - Ok(cache.lower_bound().and_then(|lower_bound| { + + let mut cached_height = 0; + + let res = Ok(cache.lower_bound().and_then(|lower_bound| { for h in (lower_bound..=height).rev() { if let Some(Some(_)) = cache.get_value(h) { + cached_height = h; return Some(h); } } None - })) + })); + + let mut stored_height = 0; + + let _res2: std::result::Result, Vec> = Ok(self + .cache_store + .lower_bound() + .unwrap() + .and_then(|lower_bound| { + for h in (lower_bound..=height).rev() { + if let Some(Some(_)) = self.cache_store.get_value(h).unwrap() { + stored_height = h; + return Some(h); + } + } + None + })); + + tracing::info!(cached_height, stored_height, "COMPARE first_non_null_block"); + + res } } @@ -243,14 +286,33 @@ impl FinalityWithNull { d: D, ) -> Stm> { let cache = self.cached_data.read()?; - Ok(cache.get_value(height).map(|v| { + + let mut cache_value = None; + + let res = Ok(cache.get_value(height).map(|v| { if let Some(i) = v.as_ref() { + cache_value = Some(i.clone()); f(i) } else { tracing::debug!(height, "a null round detected, return default"); d() } - })) + })); + + let mut stored_value = None; + let _ = self.cache_store.get_value(height).unwrap().map(|v| { + if let Some(i) = v.as_ref() { + stored_value = Some(i.clone()); + f(i) + } else { + tracing::debug!(height, "a null round detected, return default"); + d() + } + }); + + tracing::info!(?cache_value, ?stored_value, "COMPARE handle_null_block"); + + res } fn get_at_height T>( @@ -259,11 +321,23 @@ impl FinalityWithNull { f: F, ) -> Stm> { let cache = self.cached_data.read()?; - Ok(if let Some(Some(v)) = cache.get_value(height) { + + let mut cache_value = None; + let res = Ok(if let Some(Some(v)) = cache.get_value(height) { + cache_value = Some(v.clone()); Some(f(v)) } else { None - }) + }); + + let mut stored_value = None; + if let Some(Some(v)) = self.cache_store.get_value(height).unwrap() { + stored_value = Some(v.clone()); + } + + tracing::info!(?cache_value, ?stored_value, "COMPARE get_at_height"); + + res } fn parent_block_filled( @@ -310,6 +384,11 @@ impl FinalityWithNull { return abort(e); } + let r2 = self.cache_store.append(height, None); + if let Err(e) = r2 { + panic!("cache store failed to append: {:?}", e); + } + Ok(()) } diff --git a/fendermint/vm/topdown/src/lib.rs b/fendermint/vm/topdown/src/lib.rs index 91390093e..9a3356128 100644 --- a/fendermint/vm/topdown/src/lib.rs +++ b/fendermint/vm/topdown/src/lib.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT mod cache; +mod cache_store; mod error; mod finality; pub mod sync; @@ -22,6 +23,7 @@ use std::fmt::{Display, Formatter}; use std::time::Duration; pub use crate::cache::{SequentialAppendError, SequentialKeyCache, ValueIter}; +pub use crate::cache_store::*; pub use crate::error::Error; pub use crate::finality::CachedFinalityProvider; pub use crate::toggle::Toggle; From 585344a8738dde55f168759a687d5eba3d897b21 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Tue, 7 May 2024 11:02:53 +0000 Subject: [PATCH 2/6] wip --- Cargo.lock | 3 + fendermint/vm/topdown/Cargo.toml | 3 + fendermint/vm/topdown/src/cache.rs | 2 + fendermint/vm/topdown/src/cache_store.rs | 89 ++++++++++++++++++--- fendermint/vm/topdown/src/finality/fetch.rs | 13 ++- fendermint/vm/topdown/src/finality/mod.rs | 15 +++- fendermint/vm/topdown/src/finality/null.rs | 67 ++++++++++++---- fendermint/vm/topdown/src/lib.rs | 2 +- fendermint/vm/topdown/src/sync/syncer.rs | 4 +- 9 files changed, 163 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2fdee3f5e..683ff5851 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3453,6 +3453,7 @@ dependencies = [ "clap 4.5.1", "ethers", "fendermint_crypto", + "fendermint_rocksdb", "fendermint_testing", "fendermint_tracing", "fendermint_vm_event", @@ -3468,8 +3469,10 @@ dependencies = [ "libp2p", "num-traits", "rand", + "rocksdb", "serde", "serde_json", + "tempfile", "tendermint-rpc", "thiserror", "tokio", diff --git a/fendermint/vm/topdown/Cargo.toml b/fendermint/vm/topdown/Cargo.toml index 42980a06e..e0d8368b0 100644 --- a/fendermint/vm/topdown/Cargo.toml +++ b/fendermint/vm/topdown/Cargo.toml @@ -33,12 +33,15 @@ tracing = { workspace = true } fendermint_vm_genesis = { path = "../genesis" } fendermint_vm_event = { path = "../event" } fendermint_tracing = { path = "../../tracing" } +fendermint_rocksdb = { path = "../../rocksdb" } +rocksdb = { version = "0.21", features = ["multi-threaded-cf"] } [dev-dependencies] arbitrary = { workspace = true } clap = { workspace = true } rand = { workspace = true } tracing-subscriber = { workspace = true } +tempfile = { workspace = true } fendermint_crypto = { path = "../../crypto" } fendermint_testing = { path = "../../testing", features = ["smt"] } diff --git a/fendermint/vm/topdown/src/cache.rs b/fendermint/vm/topdown/src/cache.rs index c1e170e5c..ae002c28b 100644 --- a/fendermint/vm/topdown/src/cache.rs +++ b/fendermint/vm/topdown/src/cache.rs @@ -158,6 +158,8 @@ impl SequentialKeyCache { /// Insert the key and value pair only if the key is upper_bound + 1 pub fn append(&mut self, key: K, val: V) -> Result<(), SequentialAppendError> { + tracing::info!("CACHE appending block at height {:?}", key); + let expected_next_key = if let Some(upper) = self.upper_bound() { upper.add(self.increment) } else { diff --git a/fendermint/vm/topdown/src/cache_store.rs b/fendermint/vm/topdown/src/cache_store.rs index 7b5f87bd3..491a90392 100644 --- a/fendermint/vm/topdown/src/cache_store.rs +++ b/fendermint/vm/topdown/src/cache_store.rs @@ -5,7 +5,8 @@ use std::sync::Arc; use crate::{BlockHeight, ParentViewPayload}; -/// A [`Blockstore`] implementation that writes to a specific namespace, not the default like above. +/// A cache k/v implementation for storing ParentViewPayload for a specific height +/// in rocksdb with a specific namespace. #[derive(Clone)] pub struct CacheStore { db: Arc, @@ -18,10 +19,23 @@ impl CacheStore { if !db.has_cf_handle(&ns) { Err(anyhow!("namespace {ns} does not exist!")) } else { - Ok(Self { db: db.db, ns }) + let store = Self { db: db.db, ns }; + store.delete_all()?; + Ok(store) + //Ok(Self { db: db.db, ns }) } } + // creates a new instance of the cache store for testing purposes + #[cfg(test)] + pub fn new_test(ns: String) -> anyhow::Result { + use fendermint_rocksdb::RocksDbConfig; + let dir = tempfile::Builder::new().prefix(&ns).tempdir()?; + let db = RocksDb::open(dir.path().join("rocksdb"), &RocksDbConfig::default())?; + let _ = db.new_cf_handle(&ns)?; + Ok(Self { db: db.db, ns }) + } + // Unfortunately there doesn't seem to be a way to avoid having to // clone another instance for each operation :( fn cf(&self) -> anyhow::Result> { @@ -32,11 +46,7 @@ impl CacheStore { } impl CacheStore { - /*pub fn get(&self, height: BlockHeight) -> anyhow::Result>> { - Ok(self.db.get_cf(&self.cf()?, height.to_be_bytes())?) - } */ - - pub fn put( + fn put( &self, height: BlockHeight, value: Option>, @@ -60,7 +70,7 @@ impl CacheStore { Ok(()) } - pub fn delete_below(&self, height: BlockHeight) -> anyhow::Result<()> { + pub fn delete_key_below(&self, height: BlockHeight) -> anyhow::Result<()> { let iter = self.db.iterator_cf(&self.cf()?, IteratorMode::Start); for item in iter { let (key, _) = item?; @@ -73,7 +83,7 @@ impl CacheStore { Ok(()) } - pub fn count(&self) -> anyhow::Result { + pub fn size(&self) -> anyhow::Result { let mut count = 0; let iter = self.db.iterator_cf(&self.cf()?, IteratorMode::Start); for _ in iter { @@ -84,8 +94,8 @@ impl CacheStore { } pub fn upper_bound(&self) -> anyhow::Result> { - let iter = self.db.iterator_cf(&self.cf()?, IteratorMode::End); - if let Some(item) = iter.last() { + let mut iter = self.db.iterator_cf(&self.cf()?, IteratorMode::End); + if let Some(item) = iter.next() { let (key, _) = item?; Ok(Some(BlockHeight::from_be_bytes( key[0..8].try_into().unwrap(), @@ -123,10 +133,13 @@ impl CacheStore { height: BlockHeight, block: Option, ) -> anyhow::Result<()> { + tracing::info!("STORE appending block at height {}", height); + let expected_next_key = if let Some(upper) = self.upper_bound()? { upper + 1 } else { - 0 + self.put(height, Some(block))?; + return Ok(()); }; if height != expected_next_key { @@ -140,3 +153,55 @@ impl CacheStore { self.put(height, Some(block)) } } + +#[cfg(test)] +mod tests { + use crate::BlockHeight; + use crate::CacheStore; + use crate::ParentViewPayload; + + fn build_payload(height: BlockHeight) -> ParentViewPayload { + let mut p = ParentViewPayload::default(); + p.0 = height.to_be_bytes().to_vec(); + p + } + + #[test] + fn insert_works() { + let cache_store = CacheStore::new_test("test".to_string()).unwrap(); + for height in 9..100 { + cache_store + .append(height, Some(build_payload(height))) + .unwrap(); + } + + for height in 9..100 { + let value = cache_store.get_value(height).unwrap().unwrap().unwrap(); + let cache_height = BlockHeight::from_be_bytes(value.0[0..8].try_into().unwrap()); + assert_eq!(height, cache_height); + } + + assert!(cache_store.get_value(100).unwrap().is_none()); + assert_eq!(cache_store.lower_bound().unwrap(), Some(9)); + assert_eq!(cache_store.upper_bound().unwrap(), Some(99)); + } + + #[test] + fn delete_works() { + let cache_store = CacheStore::new_test("test".to_string()).unwrap(); + + for height in 0..100 { + cache_store + .append(height, Some(build_payload(height))) + .unwrap(); + } + + cache_store.delete_key_below(10).unwrap(); + assert!(cache_store.size().unwrap() == 90); + assert_eq!(cache_store.lower_bound().unwrap(), Some(10)); + + cache_store.delete_all().unwrap(); + assert!(cache_store.size().unwrap() == 0); + assert_eq!(cache_store.lower_bound().unwrap(), None); + } +} diff --git a/fendermint/vm/topdown/src/finality/fetch.rs b/fendermint/vm/topdown/src/finality/fetch.rs index 2b46a416c..a6ded6898 100644 --- a/fendermint/vm/topdown/src/finality/fetch.rs +++ b/fendermint/vm/topdown/src/finality/fetch.rs @@ -254,8 +254,8 @@ mod tests { use crate::finality::ParentViewPayload; use crate::proxy::ParentQueryProxy; use crate::{ - BlockHeight, CachedFinalityProvider, Config, IPCParentFinality, ParentViewProvider, - SequentialKeyCache, NULL_ROUND_ERR_MSG, + BlockHeight, CacheStore, CachedFinalityProvider, Config, IPCParentFinality, + ParentViewProvider, SequentialKeyCache, NULL_ROUND_ERR_MSG, }; use anyhow::anyhow; use async_trait::async_trait; @@ -365,7 +365,14 @@ mod tests { block_hash: vec![0; 32], }; - CachedFinalityProvider::new(config, genesis_epoch, Some(committed_finality), proxy) + let cache_store = CacheStore::new_test("test".to_string()).unwrap(); + CachedFinalityProvider::new( + config, + genesis_epoch, + Some(committed_finality), + proxy, + cache_store, + ) } fn new_cross_msg(nonce: u64) -> IpcEnvelope { diff --git a/fendermint/vm/topdown/src/finality/mod.rs b/fendermint/vm/topdown/src/finality/mod.rs index c6cd2dc3d..9cb43e350 100644 --- a/fendermint/vm/topdown/src/finality/mod.rs +++ b/fendermint/vm/topdown/src/finality/mod.rs @@ -12,7 +12,7 @@ use ipc_api::staking::StakingChangeRequest; pub use fetch::CachedFinalityProvider; -pub(crate) type ParentViewPayload = (BlockHash, Vec, Vec); +pub type ParentViewPayload = (BlockHash, Vec, Vec); fn ensure_sequential u64>(msgs: &[T], f: F) -> StmResult<(), Error> { if msgs.is_empty() { @@ -43,7 +43,8 @@ pub(crate) fn topdown_cross_msgs(p: &ParentViewPayload) -> Vec { mod tests { use crate::proxy::ParentQueryProxy; use crate::{ - BlockHeight, CachedFinalityProvider, Config, IPCParentFinality, ParentFinalityProvider, + BlockHeight, CacheStore, CachedFinalityProvider, Config, IPCParentFinality, + ParentFinalityProvider, }; use async_stm::atomically_or_err; use async_trait::async_trait; @@ -112,7 +113,15 @@ mod tests { proposal_delay: None, }; - CachedFinalityProvider::new(config, 10, Some(genesis_finality()), mocked_agent_proxy()) + let cache_store = + CacheStore::new_test("test".to_string()).expect("error creating cache store"); + CachedFinalityProvider::new( + config, + 10, + Some(genesis_finality()), + mocked_agent_proxy(), + cache_store, + ) } #[tokio::test] diff --git a/fendermint/vm/topdown/src/finality/null.rs b/fendermint/vm/topdown/src/finality/null.rs index 635ff800d..f137d1f3e 100644 --- a/fendermint/vm/topdown/src/finality/null.rs +++ b/fendermint/vm/topdown/src/finality/null.rs @@ -72,6 +72,7 @@ impl FinalityWithNull { pub fn reset(&self, finality: IPCParentFinality) -> Stm<()> { self.cached_data.write(SequentialKeyCache::sequential())?; self.cache_store.delete_all().unwrap(); + tracing::info!("cache cleared"); self.last_committed_finality.write(Some(finality)) } @@ -124,7 +125,8 @@ impl FinalityWithNull { cache.remove_key_below(height); cache })?; - self.cache_store.delete_below(height).unwrap(); + self.cache_store.delete_key_below(height).unwrap(); + tracing::info!(height, "cache cleared below height"); let hash = hex::encode(&finality.block_hash); @@ -143,14 +145,15 @@ impl FinalityWithNull { impl FinalityWithNull { /// Returns the number of blocks cached. pub(crate) fn cached_blocks(&self) -> Stm { - let cache = self.cached_data.read()?; - let store_count = self.cache_store.count().unwrap(); - tracing::info!( - cache_count = cache.size(), - store_count, - "COMPARE cached_blocks" - ); - Ok(cache.size() as BlockHeight) + let cache_size = self.cached_data.read()?.size(); + let store_size = self.cache_store.size().unwrap(); + tracing::info!(cache_size, store_size, "COMPARE cached_blocks"); + + if cache_size != store_size { + panic!("cached_blocks mismatch: {} != {}", cache_size, store_size); + } + + Ok(cache_size as BlockHeight) } pub(crate) fn block_hash_at_height(&self, height: BlockHeight) -> Stm> { @@ -164,14 +167,20 @@ impl FinalityWithNull { } pub(crate) fn latest_height_in_cache(&self) -> Stm> { - let cache = self.cached_data.read()?; + let cache_upper_bound = self.cached_data.read()?.upper_bound(); let store_upper_bound = self.cache_store.upper_bound().unwrap(); tracing::info!( - cache_upper_bound = cache.upper_bound(), + cache_upper_bound, store_upper_bound, "COMPARE latest_height_in_cache" ); - Ok(cache.upper_bound()) + if cache_upper_bound != store_upper_bound { + panic!( + "latest_height_in_cache mismatch: {:?} != {:?}", + cache_upper_bound, store_upper_bound + ); + } + Ok(cache_upper_bound) } /// Get the latest height tracked in the provider, includes both cache and last committed finality @@ -220,6 +229,13 @@ impl FinalityWithNull { tracing::info!(cached_height, stored_height, "COMPARE first_non_null_block"); + if cached_height != stored_height { + panic!( + "first_non_null_block mismatch: {} != {}", + cached_height, stored_height + ); + } + res } } @@ -312,6 +328,12 @@ impl FinalityWithNull { tracing::info!(?cache_value, ?stored_value, "COMPARE handle_null_block"); + if cache_value.is_some() || stored_value.is_some() { + if cache_value.unwrap().2 != stored_value.unwrap().2 { + panic!("handle_null_block mismatch"); + } + } + res } @@ -359,7 +381,14 @@ impl FinalityWithNull { let r = self.cached_data.modify(|mut cache| { let r = cache - .append(height, Some((block_hash, validator_changes, top_down_msgs))) + .append( + height, + Some(( + block_hash.clone(), + validator_changes.clone(), + top_down_msgs.clone(), + )), + ) .map_err(Error::NonSequentialParentViewInsert); (cache, r) })?; @@ -368,6 +397,13 @@ impl FinalityWithNull { return abort(e); } + let r2 = self + .cache_store + .append(height, Some((block_hash, validator_changes, top_down_msgs))); + if let Err(e) = r2 { + panic!("cache store failed to append: {:?}", e); + } + Ok(()) } @@ -450,7 +486,7 @@ impl FinalityWithNull { mod tests { use super::FinalityWithNull; use crate::finality::ParentViewPayload; - use crate::{BlockHeight, Config, IPCParentFinality}; + use crate::{BlockHeight, CacheStore, Config, IPCParentFinality}; use async_stm::{atomically, atomically_or_err}; async fn new_provider( @@ -472,7 +508,8 @@ mod tests { blocks.remove(0); - let f = FinalityWithNull::new(config, 1, Some(committed_finality)); + let cache_store = CacheStore::new_test("test".to_string()).unwrap(); + let f = FinalityWithNull::new(config, 1, Some(committed_finality), cache_store); for (h, p) in blocks { atomically_or_err(|| f.new_parent_view(h, p.clone())) .await diff --git a/fendermint/vm/topdown/src/lib.rs b/fendermint/vm/topdown/src/lib.rs index 9a3356128..4655e883b 100644 --- a/fendermint/vm/topdown/src/lib.rs +++ b/fendermint/vm/topdown/src/lib.rs @@ -25,7 +25,7 @@ use std::time::Duration; pub use crate::cache::{SequentialAppendError, SequentialKeyCache, ValueIter}; pub use crate::cache_store::*; pub use crate::error::Error; -pub use crate::finality::CachedFinalityProvider; +pub use crate::finality::{CachedFinalityProvider, ParentViewPayload}; pub use crate::toggle::Toggle; pub type BlockHeight = u64; diff --git a/fendermint/vm/topdown/src/sync/syncer.rs b/fendermint/vm/topdown/src/sync/syncer.rs index 675d4e4c6..b2b1726a1 100644 --- a/fendermint/vm/topdown/src/sync/syncer.rs +++ b/fendermint/vm/topdown/src/sync/syncer.rs @@ -394,7 +394,7 @@ mod tests { use crate::sync::ParentFinalityStateQuery; use crate::voting::VoteTally; use crate::{ - BlockHash, BlockHeight, CachedFinalityProvider, Config, IPCParentFinality, + BlockHash, BlockHeight, CacheStore, CachedFinalityProvider, Config, IPCParentFinality, SequentialKeyCache, Toggle, NULL_ROUND_ERR_MSG, }; use anyhow::anyhow; @@ -504,11 +504,13 @@ mod tests { ), ); + let cache_store = CacheStore::new_test("test".to_string()).unwrap(); let provider = CachedFinalityProvider::new( config.clone(), genesis_epoch, Some(committed_finality.clone()), proxy.clone(), + cache_store, ); let mut syncer = LotusParentSyncer::new( config, From 0aab74664614f85e23e3255eb5689e557a9e0c4c Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Tue, 7 May 2024 15:49:15 +0000 Subject: [PATCH 3/6] improved error handling --- fendermint/vm/interpreter/src/chain.rs | 13 +- fendermint/vm/topdown/src/error.rs | 2 + fendermint/vm/topdown/src/finality/fetch.rs | 21 +-- fendermint/vm/topdown/src/finality/null.rs | 145 +++++++++++++------- fendermint/vm/topdown/src/lib.rs | 8 +- fendermint/vm/topdown/src/sync/mod.rs | 6 +- fendermint/vm/topdown/src/sync/syncer.rs | 26 ++-- fendermint/vm/topdown/src/toggle.rs | 21 +-- 8 files changed, 154 insertions(+), 88 deletions(-) diff --git a/fendermint/vm/interpreter/src/chain.rs b/fendermint/vm/interpreter/src/chain.rs index 78e75de8e..39aaa4f9b 100644 --- a/fendermint/vm/interpreter/src/chain.rs +++ b/fendermint/vm/interpreter/src/chain.rs @@ -9,7 +9,7 @@ use crate::{ CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter, }; use anyhow::{bail, Context}; -use async_stm::atomically; +use async_stm::{atomically, atomically_or_err}; use async_trait::async_trait; use fendermint_tracing::emit; use fendermint_vm_actor_interface::ipc; @@ -129,7 +129,7 @@ where // The pre-requisite for proposal is that there is a quorum of gossiped votes at that height. // The final proposal can be at most as high as the quorum, but can be less if we have already, // hit some limits such as how many blocks we can propose in a single step. - let finalities = atomically(|| { + let finalities = atomically_or_err(|| { let parent = state.parent_finality_provider.next_proposal()?; let quorum = state .parent_finality_votes @@ -138,7 +138,7 @@ where Ok((parent, quorum)) }) - .await; + .await?; let maybe_finality = match finalities { (Some(parent), Some(quorum)) => Some(if parent.height <= quorum.height { @@ -206,7 +206,8 @@ where block_hash, }; let is_final = - atomically(|| env.parent_finality_provider.check_proposal(&prop)).await; + atomically_or_err(|| env.parent_finality_provider.check_proposal(&prop)) + .await?; if !is_final { return Ok(false); } @@ -362,7 +363,7 @@ where tracing::debug!("chain interpreter applied topdown msgs"); - atomically(|| { + atomically_or_err(|| { env.parent_finality_provider .set_new_finality(finality.clone(), prev_finality.clone())?; @@ -371,7 +372,7 @@ where Ok(()) }) - .await; + .await?; tracing::debug!( finality = finality.to_string(), diff --git a/fendermint/vm/topdown/src/error.rs b/fendermint/vm/topdown/src/error.rs index eef6090d7..109a0c796 100644 --- a/fendermint/vm/topdown/src/error.rs +++ b/fendermint/vm/topdown/src/error.rs @@ -15,4 +15,6 @@ pub enum Error { ParentChainReorgDetected, #[error("Cannot query parent at height {1}: {0}")] CannotQueryParent(String, BlockHeight), + #[error("Error in cache store: {0}")] + CacheStoreError(String), } diff --git a/fendermint/vm/topdown/src/finality/fetch.rs b/fendermint/vm/topdown/src/finality/fetch.rs index a6ded6898..ead50d070 100644 --- a/fendermint/vm/topdown/src/finality/fetch.rs +++ b/fendermint/vm/topdown/src/finality/fetch.rs @@ -111,11 +111,11 @@ impl ParentViewProvider for CachedF impl ParentFinalityProvider for CachedFinalityProvider { - fn next_proposal(&self) -> Stm> { + fn next_proposal(&self) -> StmResult, Error> { self.inner.next_proposal() } - fn check_proposal(&self, proposal: &IPCParentFinality) -> Stm { + fn check_proposal(&self, proposal: &IPCParentFinality) -> StmResult { self.inner.check_proposal(proposal) } @@ -123,7 +123,7 @@ impl ParentFinalityProvider &self, finality: IPCParentFinality, previous_finality: Option, - ) -> Stm<()> { + ) -> StmResult<(), Error> { self.inner.set_new_finality(finality, previous_finality) } } @@ -209,16 +209,16 @@ impl CachedFinalityProvider { } } - pub fn block_hash(&self, height: BlockHeight) -> Stm> { + pub fn block_hash(&self, height: BlockHeight) -> StmResult, Error> { self.inner.block_hash_at_height(height) } - pub fn latest_height_in_cache(&self) -> Stm> { + pub fn latest_height_in_cache(&self) -> StmResult, Error> { self.inner.latest_height_in_cache() } /// Get the latest height tracked in the provider, includes both cache and last committed finality - pub fn latest_height(&self) -> Stm> { + pub fn latest_height(&self) -> StmResult, Error> { self.inner.latest_height() } @@ -227,7 +227,7 @@ impl CachedFinalityProvider { } /// Clear the cache and set the committed finality to the provided value - pub fn reset(&self, finality: IPCParentFinality) -> Stm<()> { + pub fn reset(&self, finality: IPCParentFinality) -> StmResult<(), Error> { self.inner.reset(finality) } @@ -240,11 +240,14 @@ impl CachedFinalityProvider { } /// Returns the number of blocks cached. - pub fn cached_blocks(&self) -> Stm { + pub fn cached_blocks(&self) -> StmResult { self.inner.cached_blocks() } - pub fn first_non_null_block(&self, height: BlockHeight) -> Stm> { + pub fn first_non_null_block( + &self, + height: BlockHeight, + ) -> StmResult, Error> { self.inner.first_non_null_block(height) } } diff --git a/fendermint/vm/topdown/src/finality/null.rs b/fendermint/vm/topdown/src/finality/null.rs index f137d1f3e..da59f43b0 100644 --- a/fendermint/vm/topdown/src/finality/null.rs +++ b/fendermint/vm/topdown/src/finality/null.rs @@ -7,7 +7,7 @@ use crate::finality::{ use crate::{ BlockHash, BlockHeight, CacheStore, Config, Error, IPCParentFinality, SequentialKeyCache, }; -use async_stm::{abort, atomically, Stm, StmResult, TVar}; +use async_stm::{abort, atomically_or_err, Stm, StmError, StmResult, TVar}; use ipc_api::cross::IpcEnvelope; use ipc_api::staking::StakingChangeRequest; use std::cmp::min; @@ -52,7 +52,8 @@ impl FinalityWithNull { &self, height: BlockHeight, ) -> anyhow::Result>> { - let r = atomically(|| self.handle_null_block(height, validator_changes, Vec::new)).await; + let r = atomically_or_err(|| self.handle_null_block(height, validator_changes, Vec::new)) + .await?; Ok(r) } @@ -60,7 +61,8 @@ impl FinalityWithNull { &self, height: BlockHeight, ) -> anyhow::Result>> { - let r = atomically(|| self.handle_null_block(height, topdown_cross_msgs, Vec::new)).await; + let r = atomically_or_err(|| self.handle_null_block(height, topdown_cross_msgs, Vec::new)) + .await?; Ok(r) } @@ -69,11 +71,13 @@ impl FinalityWithNull { } /// Clear the cache and set the committed finality to the provided value - pub fn reset(&self, finality: IPCParentFinality) -> Stm<()> { + pub fn reset(&self, finality: IPCParentFinality) -> StmResult<(), Error> { self.cached_data.write(SequentialKeyCache::sequential())?; - self.cache_store.delete_all().unwrap(); + self.cache_store + .delete_all() + .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))?; tracing::info!("cache cleared"); - self.last_committed_finality.write(Some(finality)) + Ok(self.last_committed_finality.write(Some(finality))?) } pub fn new_parent_view( @@ -88,7 +92,7 @@ impl FinalityWithNull { } } - pub fn next_proposal(&self) -> Stm> { + pub fn next_proposal(&self) -> StmResult, Error> { let height = if let Some(h) = self.propose_next_height()? { h } else { @@ -103,7 +107,7 @@ impl FinalityWithNull { Ok(Some(proposal)) } - pub fn check_proposal(&self, proposal: &IPCParentFinality) -> Stm { + pub fn check_proposal(&self, proposal: &IPCParentFinality) -> StmResult { if !self.check_height(proposal)? { return Ok(false); } @@ -114,7 +118,7 @@ impl FinalityWithNull { &self, finality: IPCParentFinality, previous_finality: Option, - ) -> Stm<()> { + ) -> StmResult<(), Error> { debug_assert!(previous_finality == self.last_committed_finality.read_clone()?); // the height to clear @@ -125,7 +129,9 @@ impl FinalityWithNull { cache.remove_key_below(height); cache })?; - self.cache_store.delete_key_below(height).unwrap(); + self.cache_store + .delete_key_below(height) + .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))?; tracing::info!(height, "cache cleared below height"); let hash = hex::encode(&finality.block_hash); @@ -144,9 +150,12 @@ impl FinalityWithNull { impl FinalityWithNull { /// Returns the number of blocks cached. - pub(crate) fn cached_blocks(&self) -> Stm { + pub(crate) fn cached_blocks(&self) -> StmResult { let cache_size = self.cached_data.read()?.size(); - let store_size = self.cache_store.size().unwrap(); + let store_size = self + .cache_store + .size() + .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))?; tracing::info!(cache_size, store_size, "COMPARE cached_blocks"); if cache_size != store_size { @@ -156,7 +165,10 @@ impl FinalityWithNull { Ok(cache_size as BlockHeight) } - pub(crate) fn block_hash_at_height(&self, height: BlockHeight) -> Stm> { + pub(crate) fn block_hash_at_height( + &self, + height: BlockHeight, + ) -> StmResult, Error> { if let Some(f) = self.last_committed_finality.read()?.as_ref() { if f.height == height { return Ok(Some(f.block_hash.clone())); @@ -166,9 +178,12 @@ impl FinalityWithNull { self.get_at_height(height, |i| i.0.clone()) } - pub(crate) fn latest_height_in_cache(&self) -> Stm> { + pub(crate) fn latest_height_in_cache(&self) -> StmResult, Error> { let cache_upper_bound = self.cached_data.read()?.upper_bound(); - let store_upper_bound = self.cache_store.upper_bound().unwrap(); + let store_upper_bound = self + .cache_store + .upper_bound() + .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))?; tracing::info!( cache_upper_bound, store_upper_bound, @@ -184,7 +199,7 @@ impl FinalityWithNull { } /// Get the latest height tracked in the provider, includes both cache and last committed finality - pub(crate) fn latest_height(&self) -> Stm> { + pub(crate) fn latest_height(&self) -> StmResult, Error> { let h = if let Some(h) = self.latest_height_in_cache()? { h } else if let Some(p) = self.last_committed_finality()? { @@ -196,7 +211,10 @@ impl FinalityWithNull { } /// Get the first non-null block in the range of earliest cache block till the height specified, inclusive. - pub(crate) fn first_non_null_block(&self, height: BlockHeight) -> Stm> { + pub(crate) fn first_non_null_block( + &self, + height: BlockHeight, + ) -> StmResult, Error> { let cache = self.cached_data.read()?; let mut cached_height = 0; @@ -216,10 +234,14 @@ impl FinalityWithNull { let _res2: std::result::Result, Vec> = Ok(self .cache_store .lower_bound() - .unwrap() + .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))? .and_then(|lower_bound| { for h in (lower_bound..=height).rev() { - if let Some(Some(_)) = self.cache_store.get_value(h).unwrap() { + if let Ok(Some(Some(_))) = self + .cache_store + .get_value(h) + .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string()))) + { stored_height = h; return Some(h); } @@ -242,7 +264,7 @@ impl FinalityWithNull { /// All the private functions impl FinalityWithNull { - fn propose_next_height(&self) -> Stm> { + fn propose_next_height(&self) -> StmResult, Error> { let latest_height = if let Some(h) = self.latest_height_in_cache()? { h } else { @@ -300,7 +322,7 @@ impl FinalityWithNull { height: BlockHeight, f: F, d: D, - ) -> Stm> { + ) -> StmResult, Error> { let cache = self.cached_data.read()?; let mut cache_value = None; @@ -316,15 +338,19 @@ impl FinalityWithNull { })); let mut stored_value = None; - let _ = self.cache_store.get_value(height).unwrap().map(|v| { - if let Some(i) = v.as_ref() { - stored_value = Some(i.clone()); - f(i) - } else { - tracing::debug!(height, "a null round detected, return default"); - d() - } - }); + let _ = self + .cache_store + .get_value(height) + .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))? + .map(|v| { + if let Some(i) = v.as_ref() { + stored_value = Some(i.clone()); + f(i) + } else { + tracing::debug!(height, "a null round detected, return default"); + d() + } + }); tracing::info!(?cache_value, ?stored_value, "COMPARE handle_null_block"); @@ -341,7 +367,7 @@ impl FinalityWithNull { &self, height: BlockHeight, f: F, - ) -> Stm> { + ) -> StmResult, Error> { let cache = self.cached_data.read()?; let mut cache_value = None; @@ -353,7 +379,11 @@ impl FinalityWithNull { }); let mut stored_value = None; - if let Some(Some(v)) = self.cache_store.get_value(height).unwrap() { + if let Some(Some(v)) = self + .cache_store + .get_value(height) + .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))? + { stored_value = Some(v.clone()); } @@ -399,9 +429,10 @@ impl FinalityWithNull { let r2 = self .cache_store - .append(height, Some((block_hash, validator_changes, top_down_msgs))); + .append(height, Some((block_hash, validator_changes, top_down_msgs))) + .map_err(|e| Error::CacheStoreError(e.to_string())); if let Err(e) = r2 { - panic!("cache store failed to append: {:?}", e); + return abort(e); } Ok(()) @@ -420,15 +451,18 @@ impl FinalityWithNull { return abort(e); } - let r2 = self.cache_store.append(height, None); + let r2 = self + .cache_store + .append(height, None) + .map_err(|e| Error::CacheStoreError(e.to_string())); if let Err(e) = r2 { - panic!("cache store failed to append: {:?}", e); + return abort(e); } Ok(()) } - fn check_height(&self, proposal: &IPCParentFinality) -> Stm { + fn check_height(&self, proposal: &IPCParentFinality) -> StmResult { let binding = self.last_committed_finality.read()?; // last committed finality is not ready yet, we don't vote, just reject let last_committed_finality = if let Some(f) = binding.as_ref() { @@ -468,7 +502,7 @@ impl FinalityWithNull { } } - fn check_block_hash(&self, proposal: &IPCParentFinality) -> Stm { + fn check_block_hash(&self, proposal: &IPCParentFinality) -> StmResult { Ok( if let Some(block_hash) = self.block_hash_at_height(proposal.height)? { let r = block_hash == proposal.block_hash; @@ -538,16 +572,19 @@ mod tests { block_hash: vec![4; 32], }; assert_eq!( - atomically(|| provider.next_proposal()).await, + atomically_or_err(|| provider.next_proposal()) + .await + .unwrap(), Some(f.clone()) ); // Test set new finality - atomically(|| { + atomically_or_err(|| { let last = provider.last_committed_finality.read_clone()?; provider.set_new_finality(f.clone(), last) }) - .await; + .await + .unwrap(); assert_eq!( atomically(|| provider.last_committed_finality()).await, @@ -575,7 +612,9 @@ mod tests { let provider = new_provider(parent_blocks).await; assert_eq!( - atomically(|| provider.next_proposal()).await, + atomically_or_err(|| provider.next_proposal()) + .await + .unwrap(), Some(IPCParentFinality { height: 103, block_hash: vec![3; 32] @@ -601,7 +640,12 @@ mod tests { let mut provider = new_provider(parent_blocks).await; provider.config.max_proposal_range = Some(8); - assert_eq!(atomically(|| provider.next_proposal()).await, None); + assert_eq!( + atomically_or_err(|| provider.next_proposal()) + .await + .unwrap(), + None + ); } #[tokio::test] @@ -622,7 +666,12 @@ mod tests { let mut provider = new_provider(parent_blocks).await; provider.config.max_proposal_range = Some(10); - assert_eq!(atomically(|| provider.next_proposal()).await, None); + assert_eq!( + atomically_or_err(|| provider.next_proposal()) + .await + .unwrap(), + None + ); } #[tokio::test] @@ -644,7 +693,9 @@ mod tests { provider.config.max_proposal_range = Some(10); assert_eq!( - atomically(|| provider.next_proposal()).await, + atomically_or_err(|| provider.next_proposal()) + .await + .unwrap(), Some(IPCParentFinality { height: 107, block_hash: vec![7; 32] @@ -672,7 +723,9 @@ mod tests { provider.config.max_proposal_range = Some(20); assert_eq!( - atomically(|| provider.next_proposal()).await, + atomically_or_err(|| provider.next_proposal()) + .await + .unwrap(), Some(IPCParentFinality { height: 107, block_hash: vec![7; 32] diff --git a/fendermint/vm/topdown/src/lib.rs b/fendermint/vm/topdown/src/lib.rs index 4655e883b..1ffb768f9 100644 --- a/fendermint/vm/topdown/src/lib.rs +++ b/fendermint/vm/topdown/src/lib.rs @@ -12,7 +12,7 @@ pub mod proxy; mod toggle; pub mod voting; -use async_stm::Stm; +use async_stm::StmResult; use async_trait::async_trait; use ethers::utils::hex; use fvm_shared::clock::ChainEpoch; @@ -156,15 +156,15 @@ pub trait ParentViewProvider { pub trait ParentFinalityProvider: ParentViewProvider { /// Latest proposal for parent finality - fn next_proposal(&self) -> Stm>; + fn next_proposal(&self) -> StmResult, Error>; /// Check if the target proposal is valid - fn check_proposal(&self, proposal: &IPCParentFinality) -> Stm; + fn check_proposal(&self, proposal: &IPCParentFinality) -> StmResult; /// Called when finality is committed fn set_new_finality( &self, finality: IPCParentFinality, previous_finality: Option, - ) -> Stm<()>; + ) -> StmResult<(), Error>; } /// If res is null round error, returns the default value from f() diff --git a/fendermint/vm/topdown/src/sync/mod.rs b/fendermint/vm/topdown/src/sync/mod.rs index 03d98f129..613c6825d 100644 --- a/fendermint/vm/topdown/src/sync/mod.rs +++ b/fendermint/vm/topdown/src/sync/mod.rs @@ -11,7 +11,7 @@ use crate::sync::tendermint::TendermintAwareSyncer; use crate::voting::VoteTally; use crate::{CachedFinalityProvider, Config, IPCParentFinality, ParentFinalityProvider, Toggle}; use anyhow::anyhow; -use async_stm::atomically; +use async_stm::atomically_or_err; use ethers::utils::hex; use ipc_ipld_resolver::ValidatorKey; use std::sync::Arc; @@ -136,13 +136,13 @@ where }) .collect::>(); - atomically(|| { + atomically_or_err(|| { view_provider.set_new_finality(finality.clone(), None)?; vote_tally.set_finalized(finality.height, finality.block_hash.clone())?; vote_tally.set_power_table(power_table.clone())?; Ok(()) }) - .await; + .await?; tracing::info!( finality = finality.to_string(), diff --git a/fendermint/vm/topdown/src/sync/syncer.rs b/fendermint/vm/topdown/src/sync/syncer.rs index b2b1726a1..da3f19137 100644 --- a/fendermint/vm/topdown/src/sync/syncer.rs +++ b/fendermint/vm/topdown/src/sync/syncer.rs @@ -10,7 +10,7 @@ use crate::{ is_null_round_str, BlockHash, BlockHeight, CachedFinalityProvider, Config, Error, Toggle, }; use anyhow::anyhow; -use async_stm::{atomically, atomically_or_err, StmError}; +use async_stm::{atomically_or_err, StmError}; use ethers::utils::hex; use libp2p::futures::TryFutureExt; use std::sync::Arc; @@ -68,7 +68,7 @@ where }; let (mut latest_height_fetched, mut first_non_null_parent_hash) = - self.latest_cached_data().await; + self.latest_cached_data().await?; tracing::debug!(chain_head, latest_height_fetched, "syncing heights"); if latest_height_fetched > chain_head { @@ -90,7 +90,7 @@ where } loop { - if self.exceed_cache_size_limit().await { + if self.exceed_cache_size_limit().await? { tracing::debug!("exceeded cache size limit"); break; } @@ -127,18 +127,18 @@ where T: ParentFinalityStateQuery + Send + Sync + 'static, P: ParentQueryProxy + Send + Sync + 'static, { - async fn exceed_cache_size_limit(&self) -> bool { + async fn exceed_cache_size_limit(&self) -> Result { let max_cache_blocks = self.config.max_cache_blocks(); - atomically(|| self.provider.cached_blocks()).await > max_cache_blocks + Ok(atomically_or_err(|| self.provider.cached_blocks()).await? > max_cache_blocks) } /// Get the latest data stored in the cache to pull the next block - async fn latest_cached_data(&self) -> (BlockHeight, BlockHash) { + async fn latest_cached_data(&self) -> Result<(BlockHeight, BlockHash), Error> { // we are getting the latest height fetched in cache along with the first non null block // that is stored in cache. // we are doing two fetches in one `atomically` as if we get the data in two `atomically`, // the cache might be updated in between the two calls. `atomically` should guarantee atomicity. - atomically(|| { + atomically_or_err(|| { let latest_height = if let Some(h) = self.provider.latest_height()? { h } else { @@ -292,7 +292,7 @@ where /// Reset the cache in the face of a reorg async fn reset(&self) -> anyhow::Result<()> { let finality = query_starting_finality(&self.query, &self.parent_proxy).await?; - atomically(|| self.provider.reset(finality.clone())).await; + atomically_or_err::<_, Error, _>(|| self.provider.reset(finality.clone())).await?; Ok(()) } } @@ -398,7 +398,7 @@ mod tests { SequentialKeyCache, Toggle, NULL_ROUND_ERR_MSG, }; use anyhow::anyhow; - use async_stm::atomically; + use async_stm::atomically_or_err; use async_trait::async_trait; use fendermint_vm_genesis::{Power, Validator}; use ipc_api::cross::IpcEnvelope; @@ -558,7 +558,9 @@ mod tests { for h in 101..=104 { syncer.sync().await.unwrap(); - let p = atomically(|| syncer.provider.latest_height()).await; + let p = atomically_or_err(|| syncer.provider.latest_height()) + .await + .unwrap(); assert_eq!(p, Some(h)); } } @@ -585,7 +587,9 @@ mod tests { for h in 101..=109 { syncer.sync().await.unwrap(); assert_eq!( - atomically(|| syncer.provider.latest_height()).await, + atomically_or_err(|| syncer.provider.latest_height()) + .await + .unwrap(), Some(h) ); } diff --git a/fendermint/vm/topdown/src/toggle.rs b/fendermint/vm/topdown/src/toggle.rs index c7dd10065..06e70545b 100644 --- a/fendermint/vm/topdown/src/toggle.rs +++ b/fendermint/vm/topdown/src/toggle.rs @@ -74,11 +74,11 @@ impl ParentViewProvider for Toggl } impl ParentFinalityProvider for Toggle

{ - fn next_proposal(&self) -> Stm> { + fn next_proposal(&self) -> StmResult, Error> { self.perform_or_else(|p| p.next_proposal(), None) } - fn check_proposal(&self, proposal: &IPCParentFinality) -> Stm { + fn check_proposal(&self, proposal: &IPCParentFinality) -> StmResult { self.perform_or_else(|p| p.check_proposal(proposal), false) } @@ -86,21 +86,21 @@ impl ParentFinalityProvider f &self, finality: IPCParentFinality, previous_finality: Option, - ) -> Stm<()> { + ) -> StmResult<(), Error> { self.perform_or_else(|p| p.set_new_finality(finality, previous_finality), ()) } } impl

Toggle> { - pub fn block_hash(&self, height: BlockHeight) -> Stm> { + pub fn block_hash(&self, height: BlockHeight) -> StmResult, Error> { self.perform_or_else(|p| p.block_hash(height), None) } - pub fn latest_height_in_cache(&self) -> Stm> { + pub fn latest_height_in_cache(&self) -> StmResult, Error> { self.perform_or_else(|p| p.latest_height_in_cache(), None) } - pub fn latest_height(&self) -> Stm> { + pub fn latest_height(&self) -> StmResult, Error> { self.perform_or_else(|p| p.latest_height(), None) } @@ -116,15 +116,18 @@ impl

Toggle> { self.perform_or_else(|p| p.new_parent_view(height, maybe_payload), ()) } - pub fn reset(&self, finality: IPCParentFinality) -> Stm<()> { + pub fn reset(&self, finality: IPCParentFinality) -> StmResult<(), Error> { self.perform_or_else(|p| p.reset(finality), ()) } - pub fn cached_blocks(&self) -> Stm { + pub fn cached_blocks(&self) -> StmResult { self.perform_or_else(|p| p.cached_blocks(), BlockHeight::MAX) } - pub fn first_non_null_block(&self, height: BlockHeight) -> Stm> { + pub fn first_non_null_block( + &self, + height: BlockHeight, + ) -> StmResult, Error> { self.perform_or_else(|p| p.first_non_null_block(height), None) } } From 612fb9ae61bb203ea93d8e22cfc3a39cf099d40f Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Tue, 7 May 2024 16:14:53 +0000 Subject: [PATCH 4/6] Replace in mem cache with cache store --- fendermint/vm/topdown/src/cache.rs | 2 - fendermint/vm/topdown/src/cache_store.rs | 5 +- fendermint/vm/topdown/src/finality/null.rs | 174 +++------------------ 3 files changed, 27 insertions(+), 154 deletions(-) diff --git a/fendermint/vm/topdown/src/cache.rs b/fendermint/vm/topdown/src/cache.rs index ae002c28b..c1e170e5c 100644 --- a/fendermint/vm/topdown/src/cache.rs +++ b/fendermint/vm/topdown/src/cache.rs @@ -158,8 +158,6 @@ impl SequentialKeyCache { /// Insert the key and value pair only if the key is upper_bound + 1 pub fn append(&mut self, key: K, val: V) -> Result<(), SequentialAppendError> { - tracing::info!("CACHE appending block at height {:?}", key); - let expected_next_key = if let Some(upper) = self.upper_bound() { upper.add(self.increment) } else { diff --git a/fendermint/vm/topdown/src/cache_store.rs b/fendermint/vm/topdown/src/cache_store.rs index 491a90392..ca72aa6f6 100644 --- a/fendermint/vm/topdown/src/cache_store.rs +++ b/fendermint/vm/topdown/src/cache_store.rs @@ -19,10 +19,7 @@ impl CacheStore { if !db.has_cf_handle(&ns) { Err(anyhow!("namespace {ns} does not exist!")) } else { - let store = Self { db: db.db, ns }; - store.delete_all()?; - Ok(store) - //Ok(Self { db: db.db, ns }) + Ok(Self { db: db.db, ns }) } } diff --git a/fendermint/vm/topdown/src/finality/null.rs b/fendermint/vm/topdown/src/finality/null.rs index da59f43b0..d6212e052 100644 --- a/fendermint/vm/topdown/src/finality/null.rs +++ b/fendermint/vm/topdown/src/finality/null.rs @@ -4,9 +4,7 @@ use crate::finality::{ ensure_sequential, topdown_cross_msgs, validator_changes, ParentViewPayload, }; -use crate::{ - BlockHash, BlockHeight, CacheStore, Config, Error, IPCParentFinality, SequentialKeyCache, -}; +use crate::{BlockHash, BlockHeight, CacheStore, Config, Error, IPCParentFinality}; use async_stm::{abort, atomically_or_err, Stm, StmError, StmResult, TVar}; use ipc_api::cross::IpcEnvelope; use ipc_api::staking::StakingChangeRequest; @@ -21,11 +19,10 @@ pub struct FinalityWithNull { config: Config, genesis_epoch: BlockHeight, /// Cached data that always syncs with the latest parent chain proactively - cached_data: TVar>>, + cache_store: CacheStore, /// This is a in memory view of the committed parent finality. We need this as a starting point /// for populating the cache last_committed_finality: TVar>, - cache_store: CacheStore, } impl FinalityWithNull { @@ -38,9 +35,8 @@ impl FinalityWithNull { Self { config, genesis_epoch, - cached_data: TVar::new(SequentialKeyCache::sequential()), - last_committed_finality: TVar::new(committed_finality), cache_store, + last_committed_finality: TVar::new(committed_finality), } } @@ -72,11 +68,9 @@ impl FinalityWithNull { /// Clear the cache and set the committed finality to the provided value pub fn reset(&self, finality: IPCParentFinality) -> StmResult<(), Error> { - self.cached_data.write(SequentialKeyCache::sequential())?; self.cache_store .delete_all() .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))?; - tracing::info!("cache cleared"); Ok(self.last_committed_finality.write(Some(finality))?) } @@ -124,15 +118,10 @@ impl FinalityWithNull { // the height to clear let height = finality.height; - self.cached_data.update(|mut cache| { - // only remove cache below height, but not at height, as we have delayed execution - cache.remove_key_below(height); - cache - })?; + // only remove cache below height, but not at height, as we have delayed execution self.cache_store .delete_key_below(height) .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))?; - tracing::info!(height, "cache cleared below height"); let hash = hex::encode(&finality.block_hash); @@ -151,18 +140,12 @@ impl FinalityWithNull { impl FinalityWithNull { /// Returns the number of blocks cached. pub(crate) fn cached_blocks(&self) -> StmResult { - let cache_size = self.cached_data.read()?.size(); let store_size = self .cache_store .size() .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))?; - tracing::info!(cache_size, store_size, "COMPARE cached_blocks"); - - if cache_size != store_size { - panic!("cached_blocks mismatch: {} != {}", cache_size, store_size); - } - Ok(cache_size as BlockHeight) + Ok(store_size as BlockHeight) } pub(crate) fn block_hash_at_height( @@ -179,23 +162,12 @@ impl FinalityWithNull { } pub(crate) fn latest_height_in_cache(&self) -> StmResult, Error> { - let cache_upper_bound = self.cached_data.read()?.upper_bound(); let store_upper_bound = self .cache_store .upper_bound() .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))?; - tracing::info!( - cache_upper_bound, - store_upper_bound, - "COMPARE latest_height_in_cache" - ); - if cache_upper_bound != store_upper_bound { - panic!( - "latest_height_in_cache mismatch: {:?} != {:?}", - cache_upper_bound, store_upper_bound - ); - } - Ok(cache_upper_bound) + + Ok(store_upper_bound) } /// Get the latest height tracked in the provider, includes both cache and last committed finality @@ -215,23 +187,7 @@ impl FinalityWithNull { &self, height: BlockHeight, ) -> StmResult, Error> { - let cache = self.cached_data.read()?; - - let mut cached_height = 0; - - let res = Ok(cache.lower_bound().and_then(|lower_bound| { - for h in (lower_bound..=height).rev() { - if let Some(Some(_)) = cache.get_value(h) { - cached_height = h; - return Some(h); - } - } - None - })); - - let mut stored_height = 0; - - let _res2: std::result::Result, Vec> = Ok(self + Ok(self .cache_store .lower_bound() .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))? @@ -242,23 +198,11 @@ impl FinalityWithNull { .get_value(h) .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string()))) { - stored_height = h; return Some(h); } } None - })); - - tracing::info!(cached_height, stored_height, "COMPARE first_non_null_block"); - - if cached_height != stored_height { - panic!( - "first_non_null_block mismatch: {} != {}", - cached_height, stored_height - ); - } - - res + })) } } @@ -323,44 +267,18 @@ impl FinalityWithNull { f: F, d: D, ) -> StmResult, Error> { - let cache = self.cached_data.read()?; - - let mut cache_value = None; - - let res = Ok(cache.get_value(height).map(|v| { - if let Some(i) = v.as_ref() { - cache_value = Some(i.clone()); - f(i) - } else { - tracing::debug!(height, "a null round detected, return default"); - d() - } - })); - - let mut stored_value = None; - let _ = self + Ok(self .cache_store .get_value(height) .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))? .map(|v| { if let Some(i) = v.as_ref() { - stored_value = Some(i.clone()); f(i) } else { tracing::debug!(height, "a null round detected, return default"); d() } - }); - - tracing::info!(?cache_value, ?stored_value, "COMPARE handle_null_block"); - - if cache_value.is_some() || stored_value.is_some() { - if cache_value.unwrap().2 != stored_value.unwrap().2 { - panic!("handle_null_block mismatch"); - } - } - - res + })) } fn get_at_height T>( @@ -368,28 +286,17 @@ impl FinalityWithNull { height: BlockHeight, f: F, ) -> StmResult, Error> { - let cache = self.cached_data.read()?; - - let mut cache_value = None; - let res = Ok(if let Some(Some(v)) = cache.get_value(height) { - cache_value = Some(v.clone()); - Some(f(v)) - } else { - None - }); - - let mut stored_value = None; - if let Some(Some(v)) = self - .cache_store - .get_value(height) - .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))? - { - stored_value = Some(v.clone()); - } - - tracing::info!(?cache_value, ?stored_value, "COMPARE get_at_height"); - - res + Ok( + if let Some(Some(v)) = self + .cache_store + .get_value(height) + .map_err(|e| StmError::Abort(Error::CacheStoreError(e.to_string())))? + { + Some(f(&v)) + } else { + None + }, + ) } fn parent_block_filled( @@ -409,29 +316,11 @@ impl FinalityWithNull { ensure_sequential(&validator_changes, |change| change.configuration_number)?; } - let r = self.cached_data.modify(|mut cache| { - let r = cache - .append( - height, - Some(( - block_hash.clone(), - validator_changes.clone(), - top_down_msgs.clone(), - )), - ) - .map_err(Error::NonSequentialParentViewInsert); - (cache, r) - })?; - - if let Err(e) = r { - return abort(e); - } - - let r2 = self + let r = self .cache_store .append(height, Some((block_hash, validator_changes, top_down_msgs))) .map_err(|e| Error::CacheStoreError(e.to_string())); - if let Err(e) = r2 { + if let Err(e) = r { return abort(e); } @@ -440,22 +329,11 @@ impl FinalityWithNull { /// When there is a new parent view, but it is actually a null round, call this function. fn parent_null_round(&self, height: BlockHeight) -> StmResult<(), Error> { - let r = self.cached_data.modify(|mut cache| { - let r = cache - .append(height, None) - .map_err(Error::NonSequentialParentViewInsert); - (cache, r) - })?; - - if let Err(e) = r { - return abort(e); - } - - let r2 = self + let r = self .cache_store .append(height, None) .map_err(|e| Error::CacheStoreError(e.to_string())); - if let Err(e) = r2 { + if let Err(e) = r { return abort(e); } From 39bbff5c8b01f47bf0b5c0d8cf936cf673f4514b Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Tue, 7 May 2024 17:40:46 +0000 Subject: [PATCH 5/6] Add cli command for deleting all k/v in cache store --- fendermint/app/options/src/debug.rs | 9 +++++++++ fendermint/app/src/cmd/debug.rs | 15 +++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/fendermint/app/options/src/debug.rs b/fendermint/app/options/src/debug.rs index c6a95f817..043b30dba 100644 --- a/fendermint/app/options/src/debug.rs +++ b/fendermint/app/options/src/debug.rs @@ -21,6 +21,15 @@ pub enum DebugCommands { #[command(subcommand)] command: DebugIpcCommands, }, + + /// Deletes all the key/values in the finality cache store. + DeleteCacheStore(DebugDeleteCacheStoreArgs), +} + +#[derive(Args, Debug)] +pub struct DebugDeleteCacheStoreArgs { + #[arg(long, default_value = "~/.fendermint/data", env = "FM_DATA_DIR")] + pub data_dir: PathBuf, } #[derive(Subcommand, Debug, Clone)] diff --git a/fendermint/app/src/cmd/debug.rs b/fendermint/app/src/cmd/debug.rs index 364fd36c1..de08b4615 100644 --- a/fendermint/app/src/cmd/debug.rs +++ b/fendermint/app/src/cmd/debug.rs @@ -3,9 +3,10 @@ use anyhow::{anyhow, Context}; use fendermint_app_options::debug::{ - DebugArgs, DebugCommands, DebugExportTopDownEventsArgs, DebugIpcCommands, + DebugArgs, DebugCommands, DebugDeleteCacheStoreArgs, DebugExportTopDownEventsArgs, + DebugIpcCommands, }; -use fendermint_vm_topdown::proxy::IPCProviderProxy; +use fendermint_vm_topdown::{proxy::IPCProviderProxy, CacheStore}; use ipc_provider::{ config::subnet::{EVMSubnet, SubnetConfig}, IpcProvider, @@ -17,6 +18,7 @@ cmd! { DebugArgs(self) { match &self.command { DebugCommands::Ipc { command } => command.exec(()).await, + DebugCommands::DeleteCacheStore(args) => delete_cache_store(args) } } } @@ -30,6 +32,15 @@ cmd! { } } +fn delete_cache_store(args: &DebugDeleteCacheStoreArgs) -> anyhow::Result<()> { + let db = fendermint_rocksdb::RocksDb::open( + args.data_dir.join("rocksdb"), + &fendermint_rocksdb::RocksDbConfig::default(), + )?; + + CacheStore::new(db, "finality_cache".to_owned())?.delete_all() +} + async fn export_topdown_events(args: &DebugExportTopDownEventsArgs) -> anyhow::Result<()> { // Configuration for the child subnet on the parent network, // based on how it's done in `run.rs` and the `genesis ipc from-parent` command. From 876009193548767864e0974f22af2707e6e23dd2 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Tue, 7 May 2024 17:55:45 +0000 Subject: [PATCH 6/6] fix lint --- fendermint/vm/topdown/src/cache_store.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fendermint/vm/topdown/src/cache_store.rs b/fendermint/vm/topdown/src/cache_store.rs index ca72aa6f6..4291d4b02 100644 --- a/fendermint/vm/topdown/src/cache_store.rs +++ b/fendermint/vm/topdown/src/cache_store.rs @@ -1,3 +1,6 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + use anyhow::{anyhow, Ok}; use fendermint_rocksdb::RocksDb; use rocksdb::{BoundColumnFamily, IteratorMode, OptimisticTransactionDB}; @@ -130,8 +133,6 @@ impl CacheStore { height: BlockHeight, block: Option, ) -> anyhow::Result<()> { - tracing::info!("STORE appending block at height {}", height); - let expected_next_key = if let Some(upper) = self.upper_bound()? { upper + 1 } else {