Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add persistent top-down finality cache #897

Open
wants to merge 6 commits into
base: external-materializer
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions fendermint/app/options/src/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ pub enum DebugCommands {
#[command(subcommand)]
command: DebugIpcCommands,
},

/// Deletes all the key/values in the finality cache store.
DeleteCacheStore(DebugDeleteCacheStoreArgs),
}

#[derive(Args, Debug)]
pub struct DebugDeleteCacheStoreArgs {
#[arg(long, default_value = "~/.fendermint/data", env = "FM_DATA_DIR")]
pub data_dir: PathBuf,
}

#[derive(Subcommand, Debug, Clone)]
Expand Down
15 changes: 13 additions & 2 deletions fendermint/app/src/cmd/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

use anyhow::{anyhow, Context};
use fendermint_app_options::debug::{
DebugArgs, DebugCommands, DebugExportTopDownEventsArgs, DebugIpcCommands,
DebugArgs, DebugCommands, DebugDeleteCacheStoreArgs, DebugExportTopDownEventsArgs,
DebugIpcCommands,
};
use fendermint_vm_topdown::proxy::IPCProviderProxy;
use fendermint_vm_topdown::{proxy::IPCProviderProxy, CacheStore};
use ipc_provider::{
config::subnet::{EVMSubnet, SubnetConfig},
IpcProvider,
Expand All @@ -17,6 +18,7 @@ cmd! {
DebugArgs(self) {
match &self.command {
DebugCommands::Ipc { command } => command.exec(()).await,
DebugCommands::DeleteCacheStore(args) => delete_cache_store(args)
}
}
}
Expand All @@ -30,6 +32,15 @@ cmd! {
}
}

fn delete_cache_store(args: &DebugDeleteCacheStoreArgs) -> anyhow::Result<()> {
let db = fendermint_rocksdb::RocksDb::open(
args.data_dir.join("rocksdb"),
&fendermint_rocksdb::RocksDbConfig::default(),
)?;

CacheStore::new(db, "finality_cache".to_owned())?.delete_all()
}

async fn export_topdown_events(args: &DebugExportTopDownEventsArgs) -> anyhow::Result<()> {
// Configuration for the child subnet on the parent network,
// based on how it's done in `run.rs` and the `genesis ipc from-parent` command.
Expand Down
16 changes: 12 additions & 4 deletions fendermint/app/src/cmd/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use fendermint_vm_snapshot::{SnapshotManager, SnapshotParams};
use fendermint_vm_topdown::proxy::IPCProviderProxy;
use fendermint_vm_topdown::sync::launch_polling_syncer;
use fendermint_vm_topdown::voting::{publish_vote_loop, Error as VoteError, VoteTally};
use fendermint_vm_topdown::{CachedFinalityProvider, IPCParentFinality, Toggle};
use fendermint_vm_topdown::{CacheStore, CachedFinalityProvider, IPCParentFinality, Toggle};
use fvm_shared::address::{current_network, Address, Network};
use ipc_ipld_resolver::{Event as ResolverEvent, VoteRecord};
use ipc_provider::config::subnet::{EVMSubnet, SubnetConfig};
Expand All @@ -52,7 +52,8 @@ namespaces! {
app,
state_hist,
state_store,
bit_store
bit_store,
finality_cache
}
}

Expand Down Expand Up @@ -152,6 +153,9 @@ async fn run(settings: Settings) -> anyhow::Result<()> {
let ns = Namespaces::default();
let db = open_db(&settings, &ns).context("error opening DB")?;

let finality_store =
CacheStore::new(db.clone(), ns.finality_cache).context("error creating cache store DB")?;

// Blockstore for actors.
let state_store =
NamespaceBlockstore::new(db.clone(), ns.state_store).context("error creating state DB")?;
Expand Down Expand Up @@ -250,8 +254,12 @@ async fn run(settings: Settings) -> anyhow::Result<()> {
}

let ipc_provider = Arc::new(make_ipc_provider_proxy(&settings)?);
let finality_provider =
CachedFinalityProvider::uninitialized(config.clone(), ipc_provider.clone()).await?;
let finality_provider = CachedFinalityProvider::uninitialized(
config.clone(),
ipc_provider.clone(),
finality_store.clone(),
)
.await?;
let p = Arc::new(Toggle::enabled(finality_provider));
(p, Some((ipc_provider, config)))
} else {
Expand Down
13 changes: 7 additions & 6 deletions fendermint/vm/interpreter/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
CheckInterpreter, ExecInterpreter, GenesisInterpreter, ProposalInterpreter, QueryInterpreter,
};
use anyhow::{bail, Context};
use async_stm::atomically;
use async_stm::{atomically, atomically_or_err};
use async_trait::async_trait;
use fendermint_tracing::emit;
use fendermint_vm_actor_interface::ipc;
Expand Down Expand Up @@ -129,7 +129,7 @@ where
// The pre-requisite for proposal is that there is a quorum of gossiped votes at that height.
// The final proposal can be at most as high as the quorum, but can be less if we have already,
// hit some limits such as how many blocks we can propose in a single step.
let finalities = atomically(|| {
let finalities = atomically_or_err(|| {
let parent = state.parent_finality_provider.next_proposal()?;
let quorum = state
.parent_finality_votes
Expand All @@ -138,7 +138,7 @@ where

Ok((parent, quorum))
})
.await;
.await?;

let maybe_finality = match finalities {
(Some(parent), Some(quorum)) => Some(if parent.height <= quorum.height {
Expand Down Expand Up @@ -206,7 +206,8 @@ where
block_hash,
};
let is_final =
atomically(|| env.parent_finality_provider.check_proposal(&prop)).await;
atomically_or_err(|| env.parent_finality_provider.check_proposal(&prop))
.await?;
if !is_final {
return Ok(false);
}
Expand Down Expand Up @@ -362,7 +363,7 @@ where

tracing::debug!("chain interpreter applied topdown msgs");

atomically(|| {
atomically_or_err(|| {
env.parent_finality_provider
.set_new_finality(finality.clone(), prev_finality.clone())?;

Expand All @@ -371,7 +372,7 @@ where

Ok(())
})
.await;
.await?;

tracing::debug!(
finality = finality.to_string(),
Expand Down
3 changes: 3 additions & 0 deletions fendermint/vm/topdown/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ tracing = { workspace = true }
fendermint_vm_genesis = { path = "../genesis" }
fendermint_vm_event = { path = "../event" }
fendermint_tracing = { path = "../../tracing" }
fendermint_rocksdb = { path = "../../rocksdb" }
rocksdb = { version = "0.21", features = ["multi-threaded-cf"] }

[dev-dependencies]
arbitrary = { workspace = true }
clap = { workspace = true }
rand = { workspace = true }
tracing-subscriber = { workspace = true }
tempfile = { workspace = true }

fendermint_crypto = { path = "../../crypto" }
fendermint_testing = { path = "../../testing", features = ["smt"] }
205 changes: 205 additions & 0 deletions fendermint/vm/topdown/src/cache_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use anyhow::{anyhow, Ok};
use fendermint_rocksdb::RocksDb;
use rocksdb::{BoundColumnFamily, IteratorMode, OptimisticTransactionDB};
use std::sync::Arc;

use crate::{BlockHeight, ParentViewPayload};

/// A cache k/v implementation for storing ParentViewPayload for a specific height
/// in rocksdb with a specific namespace.
#[derive(Clone)]
pub struct CacheStore {
db: Arc<OptimisticTransactionDB>,
ns: String,
}

impl CacheStore {
pub fn new(db: RocksDb, ns: String) -> anyhow::Result<Self> {
// All namespaces are pre-created during open.
if !db.has_cf_handle(&ns) {
Err(anyhow!("namespace {ns} does not exist!"))
} else {
Ok(Self { db: db.db, ns })
}
}

// creates a new instance of the cache store for testing purposes
#[cfg(test)]
pub fn new_test(ns: String) -> anyhow::Result<Self> {
use fendermint_rocksdb::RocksDbConfig;
let dir = tempfile::Builder::new().prefix(&ns).tempdir()?;
let db = RocksDb::open(dir.path().join("rocksdb"), &RocksDbConfig::default())?;
let _ = db.new_cf_handle(&ns)?;
Ok(Self { db: db.db, ns })
}

// Unfortunately there doesn't seem to be a way to avoid having to
// clone another instance for each operation :(
fn cf(&self) -> anyhow::Result<Arc<BoundColumnFamily>> {
self.db
.cf_handle(&self.ns)
.ok_or_else(|| anyhow!("namespace {} does not exist!", self.ns))
}
}

impl CacheStore {
fn put(
&self,
height: BlockHeight,
value: Option<Option<ParentViewPayload>>,
) -> anyhow::Result<()> {
let bytes = fvm_ipld_encoding::to_vec(&value)?;

Ok(self.db.put_cf(&self.cf()?, height.to_be_bytes(), bytes)?)
}

pub fn delete(&self, height: BlockHeight) -> anyhow::Result<()> {
Ok(self.db.delete_cf(&self.cf()?, height.to_be_bytes())?)
}

pub fn delete_all(&self) -> anyhow::Result<()> {
let iter = self.db.iterator_cf(&self.cf()?, IteratorMode::Start);
for item in iter {
let (key, _) = item?;
self.db.delete_cf(&self.cf()?, key)?;
}

Ok(())
}

pub fn delete_key_below(&self, height: BlockHeight) -> anyhow::Result<()> {
let iter = self.db.iterator_cf(&self.cf()?, IteratorMode::Start);
for item in iter {
let (key, _) = item?;
let key = BlockHeight::from_be_bytes(key[0..8].try_into().unwrap());
if key < height {
self.db.delete_cf(&self.cf()?, key.to_be_bytes())?;
}
}

Ok(())
}

pub fn size(&self) -> anyhow::Result<usize> {
let mut count = 0;
let iter = self.db.iterator_cf(&self.cf()?, IteratorMode::Start);
for _ in iter {
count += 1;
}

Ok(count)
}

pub fn upper_bound(&self) -> anyhow::Result<Option<BlockHeight>> {
let mut iter = self.db.iterator_cf(&self.cf()?, IteratorMode::End);
if let Some(item) = iter.next() {
let (key, _) = item?;
Ok(Some(BlockHeight::from_be_bytes(
key[0..8].try_into().unwrap(),
)))
} else {
Ok(None)
}
}

pub fn lower_bound(&self) -> anyhow::Result<Option<BlockHeight>> {
let mut iter = self.db.iterator_cf(&self.cf()?, IteratorMode::Start);
if let Some(item) = iter.next() {
let (key, _) = item?;
Ok(Some(BlockHeight::from_be_bytes(
key[0..8].try_into().unwrap(),
)))
} else {
Ok(None)
}
}

pub fn get_value(
&self,
height: BlockHeight,
) -> anyhow::Result<Option<Option<ParentViewPayload>>> {
let value = self.db.get_cf(&self.cf()?, height.to_be_bytes())?;
match value {
Some(value) => Ok(Some(fvm_ipld_encoding::from_slice(&value)?)),
None => Ok(None),
}
}

pub fn append(
&self,
height: BlockHeight,
block: Option<ParentViewPayload>,
) -> anyhow::Result<()> {
let expected_next_key = if let Some(upper) = self.upper_bound()? {
upper + 1
} else {
self.put(height, Some(block))?;
return Ok(());
};

if height != expected_next_key {
return Err(anyhow!(
"expected next key to be {}, but got {}",
expected_next_key,
height
));
}

self.put(height, Some(block))
}
}

#[cfg(test)]
mod tests {
use crate::BlockHeight;
use crate::CacheStore;
use crate::ParentViewPayload;

fn build_payload(height: BlockHeight) -> ParentViewPayload {
let mut p = ParentViewPayload::default();
p.0 = height.to_be_bytes().to_vec();
p
}

#[test]
fn insert_works() {
let cache_store = CacheStore::new_test("test".to_string()).unwrap();
for height in 9..100 {
cache_store
.append(height, Some(build_payload(height)))
.unwrap();
}

for height in 9..100 {
let value = cache_store.get_value(height).unwrap().unwrap().unwrap();
let cache_height = BlockHeight::from_be_bytes(value.0[0..8].try_into().unwrap());
assert_eq!(height, cache_height);
}

assert!(cache_store.get_value(100).unwrap().is_none());
assert_eq!(cache_store.lower_bound().unwrap(), Some(9));
assert_eq!(cache_store.upper_bound().unwrap(), Some(99));
}

#[test]
fn delete_works() {
let cache_store = CacheStore::new_test("test".to_string()).unwrap();

for height in 0..100 {
cache_store
.append(height, Some(build_payload(height)))
.unwrap();
}

cache_store.delete_key_below(10).unwrap();
assert!(cache_store.size().unwrap() == 90);
assert_eq!(cache_store.lower_bound().unwrap(), Some(10));

cache_store.delete_all().unwrap();
assert!(cache_store.size().unwrap() == 0);
assert_eq!(cache_store.lower_bound().unwrap(), None);
}
}
2 changes: 2 additions & 0 deletions fendermint/vm/topdown/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ pub enum Error {
ParentChainReorgDetected,
#[error("Cannot query parent at height {1}: {0}")]
CannotQueryParent(String, BlockHeight),
#[error("Error in cache store: {0}")]
CacheStoreError(String),
}
Loading
Loading