Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache store generic #914

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,234 changes: 619 additions & 615 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ anyhow = "1"
arbitrary = { version = "1", features = ["derive"] }
arbtest = "0.2"
async-recursion = "1"
async-stm = "0.4"
async-stm = "0.5"
async-trait = "0.1"
async-channel = "1.8.0"
axum = { version = "0.6", features = ["ws"] }
Expand Down
97 changes: 39 additions & 58 deletions fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use async_trait::async_trait;
use cid::Cid;
use fendermint_abci::util::take_until_max_size;
use fendermint_abci::{AbciResult, Application};
use fendermint_storage::{
Codec, Encode, KVCollection, KVRead, KVReadable, KVStore, KVWritable, KVWrite,
};
use fendermint_storage::{KVCollection, KVRead, KVReadable, KVStore, KVWritable, KVWrite};
use fendermint_tracing::emit;
use fendermint_vm_core::Timestamp;
use fendermint_vm_interpreter::bytes::{
Expand Down Expand Up @@ -44,9 +42,9 @@ use tendermint::abci::{request, response};
use tracing::instrument;

use crate::events::{NewBlock, ProposalProcessed};
use crate::AppExitCode;
use crate::BlockHeight;
use crate::{tmconv::*, VERSION};
use crate::{AppExitCode, AppStore};

#[derive(Serialize)]
#[repr(u8)]
Expand Down Expand Up @@ -99,11 +97,11 @@ impl AppState {
}
}

pub struct AppConfig<S: KVStore> {
pub struct AppConfig {
/// Namespace to store the current app state.
pub app_namespace: S::Namespace,
pub app_namespace: <AppStore as KVStore>::Namespace,
/// Namespace to store the app state history.
pub state_hist_namespace: S::Namespace,
pub state_hist_namespace: <AppStore as KVStore>::Namespace,
/// Size of state history to keep; 0 means unlimited.
pub state_hist_size: u64,
/// Path to the Wasm bundle.
Expand All @@ -118,10 +116,9 @@ pub struct AppConfig<S: KVStore> {

/// Handle ABCI requests.
#[derive(Clone)]
pub struct App<DB, SS, S, I>
pub struct App<DB, BS, I>
where
SS: Blockstore + Clone + 'static,
S: KVStore,
BS: Blockstore + Clone + 'static,
{
/// Database backing all key-value operations.
db: Arc<DB>,
Expand All @@ -130,7 +127,7 @@ where
/// Must be kept separate from storage that can be influenced by network operations such as Bitswap;
/// nodes must be able to run transactions deterministically. By contrast the Bitswap store should
/// be able to read its own storage area as well as state storage, to serve content from both.
state_store: Arc<SS>,
state_store: Arc<BS>,
/// Wasm engine cache.
multi_engine: Arc<MultiEngine>,
/// Path to the Wasm bundle.
Expand All @@ -142,7 +139,7 @@ where
/// Block height where we should gracefully stop the node
halt_height: i64,
/// Namespace to store app state.
namespace: S::Namespace,
namespace: <AppStore as KVStore>::Namespace,
/// Collection of past state parameters.
///
/// We store the state hash for the height of the block where it was committed,
Expand All @@ -153,39 +150,34 @@ where
/// The state also contains things like timestamp and the network version,
/// so that we can retrospectively execute FVM messages at past block heights
/// in read-only mode.
state_hist: KVCollection<S, BlockHeight, FvmStateParams>,
state_hist: KVCollection<AppStore, BlockHeight, FvmStateParams>,
/// Interpreter for block lifecycle events.
interpreter: Arc<I>,
/// Environment-like dependencies for the interpreter.
chain_env: ChainEnv,
chain_env: ChainEnv<AppStore>,
/// Interface to the snapshotter, if enabled.
snapshots: Option<SnapshotClient>,
/// State accumulating changes during block execution.
exec_state: Arc<tokio::sync::Mutex<Option<FvmExecState<SS>>>>,
exec_state: Arc<tokio::sync::Mutex<Option<FvmExecState<BS>>>>,
/// Projected (partial) state accumulating during transaction checks.
check_state: CheckStateRef<SS>,
check_state: CheckStateRef<BS>,
/// How much history to keep.
///
/// Zero means unlimited.
state_hist_size: u64,
}

impl<DB, SS, S, I> App<DB, SS, S, I>
impl<DB, BS, I> App<DB, BS, I>
where
S: KVStore
+ Codec<AppState>
+ Encode<AppStoreKey>
+ Encode<BlockHeight>
+ Codec<FvmStateParams>,
DB: KVWritable<S> + KVReadable<S> + Clone + 'static,
SS: Blockstore + Clone + 'static,
DB: KVWritable<AppStore> + KVReadable<AppStore> + Clone + 'static,
BS: Blockstore + Clone + 'static,
{
pub fn new(
config: AppConfig<S>,
config: AppConfig,
db: DB,
state_store: SS,
state_store: BS,
interpreter: I,
chain_env: ChainEnv,
chain_env: ChainEnv<AppStore>,
snapshots: Option<SnapshotClient>,
) -> Result<Self> {
let app = Self {
Expand All @@ -209,18 +201,13 @@ where
}
}

impl<DB, SS, S, I> App<DB, SS, S, I>
impl<DB, BS, I> App<DB, BS, I>
where
S: KVStore
+ Codec<AppState>
+ Encode<AppStoreKey>
+ Encode<BlockHeight>
+ Codec<FvmStateParams>,
DB: KVWritable<S> + KVReadable<S> + 'static + Clone,
SS: Blockstore + 'static + Clone,
DB: KVWritable<AppStore> + KVReadable<AppStore> + 'static + Clone,
BS: Blockstore + 'static + Clone,
{
/// Get an owned clone of the state store.
fn state_store_clone(&self) -> SS {
fn state_store_clone(&self) -> BS {
self.state_store.as_ref().clone()
}

Expand Down Expand Up @@ -294,23 +281,23 @@ where
}

/// Put the execution state during block execution. Has to be empty.
async fn put_exec_state(&self, state: FvmExecState<SS>) {
async fn put_exec_state(&self, state: FvmExecState<BS>) {
let mut guard = self.exec_state.lock().await;
assert!(guard.is_none(), "exec state not empty");
*guard = Some(state);
}

/// Take the execution state during block execution. Has to be non-empty.
async fn take_exec_state(&self) -> FvmExecState<SS> {
async fn take_exec_state(&self) -> FvmExecState<BS> {
let mut guard = self.exec_state.lock().await;
guard.take().expect("exec state empty")
}

/// Take the execution state, update it, put it back, return the output.
async fn modify_exec_state<T, F, R>(&self, f: F) -> Result<T>
where
F: FnOnce((ChainEnv, FvmExecState<SS>)) -> R,
R: Future<Output = Result<((ChainEnv, FvmExecState<SS>), T)>>,
F: FnOnce((ChainEnv<AppStore>, FvmExecState<BS>)) -> R,
R: Future<Output = Result<((ChainEnv<AppStore>, FvmExecState<BS>), T)>>,
{
let mut guard = self.exec_state.lock().await;
let state = guard.take().expect("exec state empty");
Expand All @@ -327,7 +314,7 @@ where
/// the latest state.
pub fn new_read_only_exec_state(
&self,
) -> Result<Option<FvmExecState<ReadOnlyBlockstore<Arc<SS>>>>> {
) -> Result<Option<FvmExecState<ReadOnlyBlockstore<Arc<BS>>>>> {
let maybe_app_state = self.get_committed_state()?;

Ok(if let Some(app_state) = maybe_app_state {
Expand Down Expand Up @@ -395,36 +382,30 @@ where
// the `tower-abci` library would throw an exception when it tried to convert a
// `Response::Exception` into a `ConsensusResponse` for example.
#[async_trait]
impl<DB, SS, S, I> Application for App<DB, SS, S, I>
impl<DB, BS, I> Application for App<DB, BS, I>
where
S: KVStore
+ Codec<AppState>
+ Encode<AppStoreKey>
+ Encode<BlockHeight>
+ Codec<FvmStateParams>,
S::Namespace: Sync + Send,
DB: KVWritable<S> + KVReadable<S> + Clone + Send + Sync + 'static,
SS: Blockstore + Clone + Send + Sync + 'static,
DB: KVWritable<AppStore> + KVReadable<AppStore> + Clone + Send + Sync + 'static,
BS: Blockstore + Clone + Send + Sync + 'static,
I: GenesisInterpreter<
State = FvmGenesisState<SS>,
State = FvmGenesisState<BS>,
Genesis = Vec<u8>,
Output = FvmGenesisOutput,
>,
I: ProposalInterpreter<State = ChainEnv, Message = Vec<u8>>,
I: ProposalInterpreter<State = ChainEnv<AppStore>, Message = Vec<u8>>,
I: ExecInterpreter<
State = (ChainEnv, FvmExecState<SS>),
State = (ChainEnv<AppStore>, FvmExecState<BS>),
Message = Vec<u8>,
BeginOutput = FvmApplyRet,
DeliverOutput = BytesMessageApplyRes,
EndOutput = PowerUpdates,
>,
I: CheckInterpreter<
State = FvmExecState<ReadOnlyBlockstore<SS>>,
State = FvmExecState<ReadOnlyBlockstore<BS>>,
Message = Vec<u8>,
Output = BytesMessageCheckRes,
>,
I: QueryInterpreter<
State = FvmQueryState<SS>,
State = FvmQueryState<BS>,
Query = BytesMessageQuery,
Output = BytesMessageQueryRes,
>,
Expand Down Expand Up @@ -832,7 +813,7 @@ where
);

// TODO: We can defer committing changes the resolution pool to this point.
// For example if a checkpoint is successfully executed, that's when we want to remove
// For example if a checkpoint is succeBSfully executed, that's when we want to remove
// that checkpoint from the pool, and not propose it to other validators again.
// However, once Tendermint starts delivering the transactions, the commit will surely
// follow at the end, so we can also remove these checkpoints from memory at the time
Expand All @@ -841,8 +822,8 @@ where
// commit in case the block execution fails somewhere in the middle for uknown reasons.
// But if that happened, we will have to restart the application again anyway, and
// repopulate the in-memory checkpoints based on the last committed ledger.
// So, while the pool is theoretically part of the evolving state and we can pass
// it in and out, unless we want to defer commit to here (which the interpreters aren't
// So, while the pool is theoretically part of the evolving state and we can paBS
// it in and out, unleBS we want to defer commit to here (which the interpreters aren't
// notified about), we could add it to the `ChainMessageInterpreter` as a constructor argument,
// a sort of "ambient state", and not worry about in in the `App` at all.

Expand Down
23 changes: 17 additions & 6 deletions fendermint/app/src/cmd/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use fendermint_app::{App, AppConfig, AppStore, BitswapBlockstore};
use fendermint_app_settings::AccountKind;
use fendermint_crypto::SecretKey;
use fendermint_rocksdb::{blockstore::NamespaceBlockstore, namespaces, RocksDb, RocksDbConfig};
use fendermint_storage::KVCollection;
use fendermint_tracing::emit;
use fendermint_vm_actor_interface::eam::EthAddress;
use fendermint_vm_interpreter::chain::ChainEnv;
Expand Down Expand Up @@ -52,7 +53,8 @@ namespaces! {
app,
state_hist,
state_store,
bit_store
bit_store,
parent_view_store
}
}

Expand Down Expand Up @@ -141,7 +143,8 @@ async fn run(settings: Settings) -> anyhow::Result<()> {
.with_push_chain_meta(testing_settings.map_or(true, |t| t.push_chain_meta));

let interpreter = SignedMessageInterpreter::new(interpreter);
let interpreter = ChainMessageInterpreter::<_, NamespaceBlockstore>::new(interpreter);
let interpreter =
ChainMessageInterpreter::<_, NamespaceBlockstore, AppStore, RocksDb>::new(interpreter);
let interpreter = BytesMessageInterpreter::new(
interpreter,
ProposalPrepareMode::PrependOnly,
Expand Down Expand Up @@ -249,9 +252,15 @@ async fn run(settings: Settings) -> anyhow::Result<()> {
config = config.with_max_cache_blocks(v);
}

let parent_view_store = KVCollection::new(ns.parent_view_store);

let ipc_provider = Arc::new(make_ipc_provider_proxy(&settings)?);
let finality_provider =
CachedFinalityProvider::uninitialized(config.clone(), ipc_provider.clone()).await?;
let finality_provider = CachedFinalityProvider::uninitialized(
config.clone(),
ipc_provider.clone(),
parent_view_store,
)
.await?;
let p = Arc::new(Toggle::enabled(finality_provider));
(p, Some((ipc_provider, config)))
} else {
Expand Down Expand Up @@ -285,7 +294,7 @@ async fn run(settings: Settings) -> anyhow::Result<()> {
None
};

let app: App<_, _, AppStore, _> = App::new(
let app: App<RocksDb, NamespaceBlockstore, _> = App::new(
AppConfig {
app_namespace: ns.app,
state_hist_namespace: ns.state_hist,
Expand All @@ -294,7 +303,7 @@ async fn run(settings: Settings) -> anyhow::Result<()> {
custom_actors_bundle: settings.custom_actors_bundle(),
halt_height: settings.halt_height,
},
db,
db.clone(),
state_store,
interpreter,
ChainEnv {
Expand All @@ -307,8 +316,10 @@ async fn run(settings: Settings) -> anyhow::Result<()> {

if let Some((agent_proxy, config)) = ipc_tuple {
let app_parent_finality_query = AppParentFinalityQuery::new(app.clone());
let db = db.clone();
tokio::spawn(async move {
match launch_polling_syncer(
db,
app_parent_finality_query,
config,
parent_finality_provider,
Expand Down
36 changes: 12 additions & 24 deletions fendermint/app/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
// SPDX-License-Identifier: Apache-2.0, MIT
//! IPC related execution

use crate::app::{AppState, AppStoreKey};
use crate::{App, BlockHeight};
use fendermint_storage::{Codec, Encode, KVReadable, KVStore, KVWritable};
use crate::{App, AppStore};
use fendermint_storage::{KVReadable, KVWritable};
use fendermint_vm_genesis::{Power, Validator};
use fendermint_vm_interpreter::fvm::state::ipc::GatewayCaller;
use fendermint_vm_interpreter::fvm::state::{FvmExecState, FvmStateParams};
use fendermint_vm_interpreter::fvm::state::FvmExecState;
use fendermint_vm_interpreter::fvm::store::ReadOnlyBlockstore;
use fendermint_vm_topdown::sync::ParentFinalityStateQuery;
use fendermint_vm_topdown::IPCParentFinality;
Expand All @@ -24,27 +23,21 @@ pub enum AppVote {
}

/// Queries the LATEST COMMITTED parent finality from the storage
pub struct AppParentFinalityQuery<DB, SS, S, I>
pub struct AppParentFinalityQuery<DB, BS, I>
where
SS: Blockstore + Clone + 'static,
S: KVStore,
BS: Blockstore + Clone + 'static,
{
/// The app to get state
app: App<DB, SS, S, I>,
gateway_caller: GatewayCaller<ReadOnlyBlockstore<Arc<SS>>>,
app: App<DB, BS, I>,
gateway_caller: GatewayCaller<ReadOnlyBlockstore<Arc<BS>>>,
}

impl<DB, SS, S, I> AppParentFinalityQuery<DB, SS, S, I>
impl<DB, SS, I> AppParentFinalityQuery<DB, SS, I>
where
S: KVStore
+ Codec<AppState>
+ Encode<AppStoreKey>
+ Encode<BlockHeight>
+ Codec<FvmStateParams>,
DB: KVWritable<S> + KVReadable<S> + 'static + Clone,
DB: KVWritable<AppStore> + KVReadable<AppStore> + 'static + Clone,
SS: Blockstore + 'static + Clone,
{
pub fn new(app: App<DB, SS, S, I>) -> Self {
pub fn new(app: App<DB, SS, I>) -> Self {
Self {
app,
gateway_caller: GatewayCaller::default(),
Expand All @@ -62,14 +55,9 @@ where
}
}

impl<DB, SS, S, I> ParentFinalityStateQuery for AppParentFinalityQuery<DB, SS, S, I>
impl<DB, SS, I> ParentFinalityStateQuery for AppParentFinalityQuery<DB, SS, I>
where
S: KVStore
+ Codec<AppState>
+ Encode<AppStoreKey>
+ Encode<BlockHeight>
+ Codec<FvmStateParams>,
DB: KVWritable<S> + KVReadable<S> + 'static + Clone,
DB: KVWritable<AppStore> + KVReadable<AppStore> + 'static + Clone,
SS: Blockstore + 'static + Clone,
{
fn get_latest_committed_finality(&self) -> anyhow::Result<Option<IPCParentFinality>> {
Expand Down
Loading
Loading