From 4f5749e0d76f4c03e523f20a831fa306930edd2e Mon Sep 17 00:00:00 2001 From: Neotamandua <107320179+Neotamandua@users.noreply.github.com> Date: Tue, 28 Jan 2025 11:58:45 +0200 Subject: [PATCH 01/11] node-data: remove ArchivedEvents from ArchivalData - update doc comments for transaction ID --- node-data/src/archive.rs | 11 ++--------- node-data/src/ledger/transaction.rs | 4 ++-- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/node-data/src/archive.rs b/node-data/src/archive.rs index eef06af56a..951ac66f1d 100644 --- a/node-data/src/archive.rs +++ b/node-data/src/archive.rs @@ -4,22 +4,15 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. -use crate::events::contract::ContractTxEvent; -use crate::ledger::Hash; - type HexHash = String; /// Defined data, that the archivist will store. /// /// This is also the type of the mpsc channel where the archivist listens for -/// data to archive. -/// -/// Any data that archive nodes can store must be defined here +/// data to archive. ContractEvents together with an unfinalized Block are +/// archived directly and not stored here. #[derive(Debug)] pub enum ArchivalData { - /// List of contract events from one block together with the block height - /// and block hash. - ArchivedEvents(u64, Hash, Vec), FinalizedBlock(u64, HexHash), DeletedBlock(u64, HexHash), } diff --git a/node-data/src/ledger/transaction.rs b/node-data/src/ledger/transaction.rs index e2e51019cb..b9ef349692 100644 --- a/node-data/src/ledger/transaction.rs +++ b/node-data/src/ledger/transaction.rs @@ -71,8 +71,8 @@ impl Transaction { /// Computes the transaction ID. /// /// The transaction ID is a unique identifier for the transaction. - /// Unlike the [`hash()`](#method.hash) method, which is computed over the - /// entire transaction, the transaction ID is derived from specific + /// Unlike the [`digest()`](#method.digest) method, which is computed over + /// the entire transaction, the transaction ID is derived from specific /// fields of the transaction and serves as a unique identifier of the /// transaction itself. /// From 9a1b1a7acacca2732161d0ed131a9c5e4d421fab Mon Sep 17 00:00:00 2001 From: Neotamandua <107320179+Neotamandua@users.noreply.github.com> Date: Tue, 28 Jan 2025 12:10:33 +0200 Subject: [PATCH 02/11] node: change accept in VMExecution - Change VMExecution `accept` to return ContractTxEvent instead of ContractEvent --- node/src/vm.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/src/vm.rs b/node/src/vm.rs index bc104b0a4b..dfccc23a45 100644 --- a/node/src/vm.rs +++ b/node/src/vm.rs @@ -10,7 +10,7 @@ use dusk_consensus::user::provisioners::Provisioners; use dusk_consensus::user::stake::Stake; use dusk_core::signatures::bls::PublicKey as BlsPublicKey; use dusk_core::transfer::moonlight::AccountData; -use node_data::events::contract::ContractEvent; +use node_data::events::contract::ContractTxEvent; use node_data::ledger::{Block, SpentTransaction, Transaction}; #[derive(Default)] @@ -42,7 +42,7 @@ pub trait VMExecution: Send + Sync + 'static { ) -> anyhow::Result<( Vec, VerificationOutput, - Vec, + Vec, )>; fn finalize_state( From fbea58782d55d86366cddd742aa5f1cb8934581e Mon Sep 17 00:00:00 2001 From: Neotamandua <107320179+Neotamandua@users.noreply.github.com> Date: Tue, 28 Jan 2025 12:13:50 +0200 Subject: [PATCH 03/11] rusk: adjust Rusk VMExecution `accept` to trait changes - Return all contract events as ContractTxEvent instead of only stake events --- rusk/src/lib/node/vm.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rusk/src/lib/node/vm.rs b/rusk/src/lib/node/vm.rs index 8b89f52812..720e10067e 100644 --- a/rusk/src/lib/node/vm.rs +++ b/rusk/src/lib/node/vm.rs @@ -8,7 +8,7 @@ mod config; mod query; use dusk_consensus::errors::VstError; -use node_data::events::contract::ContractEvent; +use node_data::events::contract::ContractTxEvent; use tracing::{debug, info}; use dusk_bytes::DeserializableSlice; @@ -90,7 +90,7 @@ impl VMExecution for Rusk { ) -> anyhow::Result<( Vec, VerificationOutput, - Vec, + Vec, )> { debug!("Received accept request"); let generator = blk.header().generator_bls_pubkey; @@ -99,7 +99,7 @@ impl VMExecution for Rusk { let slashing = Slash::from_block(blk)?; - let (txs, verification_output, stake_events) = self + let (txs, verification_output, all_txs_events) = self .accept_transactions( prev_root, blk.header().height, @@ -116,7 +116,7 @@ impl VMExecution for Rusk { ) .map_err(|inner| anyhow::anyhow!("Cannot accept txs: {inner}!!"))?; - Ok((txs, verification_output, stake_events)) + Ok((txs, verification_output, all_txs_events)) } fn move_to_commit(&self, commit: [u8; 32]) -> anyhow::Result<()> { From 453b034da752748d43c567e2a30465bd951a1e67 Mon Sep 17 00:00:00 2001 From: Neotamandua <107320179+Neotamandua@users.noreply.github.com> Date: Tue, 28 Jan 2025 12:17:44 +0200 Subject: [PATCH 04/11] rusk: prepare synchronize archived events with acceptor - Replace archive mpsc channel & Archivist - Update comments - Change `accept_transactions` to return all contract events as ContractTxEvent instead of only stake events --- rusk/src/lib/builder/node.rs | 19 ++++++++++++------ rusk/src/lib/node.rs | 6 ++---- rusk/src/lib/node/rusk.rs | 39 +++++++++++++++--------------------- 3 files changed, 31 insertions(+), 33 deletions(-) diff --git a/rusk/src/lib/builder/node.rs b/rusk/src/lib/builder/node.rs index 81b73a9ff6..d030783cb7 100644 --- a/rusk/src/lib/builder/node.rs +++ b/rusk/src/lib/builder/node.rs @@ -201,6 +201,8 @@ impl RuskNodeBuilder { #[cfg(feature = "archive")] let (archive_sender, archive_receiver) = mpsc::channel(10000); + #[cfg(feature = "archive")] + let archive = Archive::create_or_open(self.db_path.clone()).await; let min_gas_limit = self.min_gas_limit.unwrap_or(DEFAULT_MIN_GAS_LIMIT); @@ -212,14 +214,11 @@ impl RuskNodeBuilder { self.feeder_call_gas, rues_sender.clone(), #[cfg(feature = "archive")] - archive_sender.clone(), + archive.clone(), ) .map_err(|e| anyhow::anyhow!("Cannot instantiate VM {e}"))?; info!("Rusk VM loaded"); - #[cfg(feature = "archive")] - let archive = Archive::create_or_open(self.db_path.clone()).await; - let node = { let db = rocksdb::Backend::create_or_open( self.db_path.clone(), @@ -246,6 +245,8 @@ impl RuskNodeBuilder { node.inner().network(), node.inner().database(), node.inner().vm_handler(), + #[cfg(feature = "archive")] + archive, ) .await?; return chain_srv.revert_last_final().await; @@ -297,10 +298,16 @@ impl RuskNodeBuilder { #[cfg(feature = "archive")] service_list.push(Box::new(ArchivistSrv { archive_receiver, - archivist: archive, + archivist: archive.clone(), })); - node.inner().initialize(&mut service_list).await?; + node.inner() + .initialize( + &mut service_list, + #[cfg(feature = "archive")] + archive, + ) + .await?; node.inner().spawn_all(service_list).await?; Ok(()) diff --git a/rusk/src/lib/node.rs b/rusk/src/lib/node.rs index 1dc636a2e0..0d1a08d232 100644 --- a/rusk/src/lib/node.rs +++ b/rusk/src/lib/node.rs @@ -24,9 +24,7 @@ pub use vm::RuskVmConfig; use crate::http::RuesEvent; pub(crate) use events::ChainEventStreamer; #[cfg(feature = "archive")] -use { - node::archive::Archive, node_data::archive::ArchivalData, tokio::sync::mpsc, -}; +use node::archive::Archive; #[derive(Debug, Clone, Copy)] pub struct RuskTip { @@ -45,7 +43,7 @@ pub struct Rusk { pub(crate) feeder_gas_limit: u64, pub(crate) event_sender: broadcast::Sender, #[cfg(feature = "archive")] - pub(crate) archive_sender: mpsc::Sender, + pub archive: Archive, } pub(crate) type Services = diff --git a/rusk/src/lib/node/rusk.rs b/rusk/src/lib/node/rusk.rs index dea2fa6c3d..2f28f683c2 100644 --- a/rusk/src/lib/node/rusk.rs +++ b/rusk/src/lib/node/rusk.rs @@ -28,16 +28,15 @@ use dusk_core::{BlsScalar, Dusk}; use dusk_vm::{ execute, CallReceipt, Error as VMError, ExecutionConfig, Session, VM, }; -use node_data::events::contract::{ContractEvent, ContractTxEvent}; +#[cfg(feature = "archive")] +use node::archive::Archive; +use node_data::events::contract::ContractTxEvent; use node_data::ledger::{Hash, Slash, SpentTransaction, Transaction}; use parking_lot::RwLock; use rusk_profile::to_rusk_state_id_path; use tokio::sync::broadcast; use tracing::info; -#[cfg(feature = "archive")] -use {node_data::archive::ArchivalData, tokio::sync::mpsc::Sender}; - use super::RuskVmConfig; use crate::bloom::Bloom; use crate::http::RuesEvent; @@ -53,7 +52,7 @@ impl Rusk { min_gas_limit: u64, feeder_gas_limit: u64, event_sender: broadcast::Sender, - #[cfg(feature = "archive")] archive_sender: Sender, + #[cfg(feature = "archive")] archive: Archive, ) -> Result { let dir = dir.as_ref(); info!("Using state from {dir:?}"); @@ -91,7 +90,7 @@ impl Rusk { feeder_gas_limit, event_sender, #[cfg(feature = "archive")] - archive_sender, + archive, }) } @@ -273,6 +272,12 @@ impl Rusk { /// * `consistency_check` - represents a state_root, the caller expects to /// be returned on successful transactions execution. Passing a None /// value disables the check. + /// + /// # Returns + /// - Vec - The transactions that were spent. + /// - VerificationOutput - The verification output. + /// - Vec - All contract events that were emitted from the + /// given transactions. #[allow(clippy::too_many_arguments)] pub fn accept_transactions( &self, @@ -288,7 +293,7 @@ impl Rusk { ) -> Result<( Vec, VerificationOutput, - Vec, + Vec, )> { let session = self.new_block_session(block_height, prev_commit)?; @@ -318,27 +323,15 @@ impl Rusk { self.set_current_commit(session.commit()?); - // Sent all events from this block to the archivist - #[cfg(feature = "archive")] - { - let _ = self.archive_sender.try_send(ArchivalData::ArchivedEvents( - block_height, - block_hash, - events.clone(), - )); - } - - let mut stake_events = vec![]; + let all_txs_events = events.clone(); for event in events { - if event.event.target.0 == STAKE_CONTRACT { - stake_events.push(event.event.clone()); - } // Send VM event to RUES let event = RuesEvent::from(event); let _ = self.event_sender.send(event); - } + } // TODO: move this also in acceptor (async fn try_accept_block) where + // stake events are filtered, to avoid looping twice? - Ok((spent_txs, verification_output, stake_events)) + Ok((spent_txs, verification_output, all_txs_events)) } pub fn finalize_state( From 3ae5f8bae201fb3342514455c1fdb1a163fc86bc Mon Sep 17 00:00:00 2001 From: Neotamandua <107320179+Neotamandua@users.noreply.github.com> Date: Tue, 28 Jan 2025 12:18:29 +0200 Subject: [PATCH 05/11] node: synchronize events to be archived with acceptor - Replace mpsc channel with Archive directly - Make Archive available in Acceptor - Make `store_unfinalized_events` public - Remove ArchivedEvents logic in ArchivistSrv - Add ArchivedEvents logic to `try_accept_block` - Move stake event filtering from Rusk `accept_transactions` to Node Acceptor `try_accept_block` --- node/src/archive/archivist.rs | 18 -------- node/src/archive/sqlite.rs | 2 +- node/src/chain.rs | 5 +++ node/src/chain/acceptor.rs | 79 ++++++++++++++++++++++++++--------- node/src/lib.rs | 6 +++ 5 files changed, 71 insertions(+), 39 deletions(-) diff --git a/node/src/archive/archivist.rs b/node/src/archive/archivist.rs index d1febf1ffa..09667ab7d9 100644 --- a/node/src/archive/archivist.rs +++ b/node/src/archive/archivist.rs @@ -33,24 +33,6 @@ impl loop { if let Some(msg) = self.archive_receiver.recv().await { match msg { - ArchivalData::ArchivedEvents( - blk_height, - blk_hash, - events, - ) => { - if let Err(e) = self - .archivist - .store_unfinalized_events( - blk_height, blk_hash, events, - ) - .await - { - error!( - "Failed to archive block vm events: {:?}", - e - ); - } - } ArchivalData::DeletedBlock(blk_height, hex_blk_hash) => { if let Err(e) = self .archivist diff --git a/node/src/archive/sqlite.rs b/node/src/archive/sqlite.rs index cb176e1579..ebdc5b3cc4 100644 --- a/node/src/archive/sqlite.rs +++ b/node/src/archive/sqlite.rs @@ -226,7 +226,7 @@ impl Archive { impl Archive { /// Store the list of **all** unfinalized vm events from the block of the /// given height. - pub(super) async fn store_unfinalized_events( + pub(crate) async fn store_unfinalized_events( &self, block_height: u64, block_hash: Hash, diff --git a/node/src/chain.rs b/node/src/chain.rs index 51f53bd939..98e25a005c 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -34,6 +34,8 @@ use tracing::{debug, error, info, warn}; use self::acceptor::Acceptor; use self::fsm::SimpleFSM; +#[cfg(feature = "archive")] +use crate::archive::Archive; use crate::database::rocksdb::MD_HASH_KEY; use crate::database::{Ledger, Metadata}; use crate::{database, vm, LongLivedService, Message, Network}; @@ -70,6 +72,7 @@ impl network: Arc>, db: Arc>, vm: Arc>, + #[cfg(feature = "archive")] archive: Archive, ) -> anyhow::Result<()> { let tip = Self::load_tip( db.read().await.deref(), @@ -89,6 +92,8 @@ impl db, network, vm, + #[cfg(feature = "archive")] + archive, self.max_consensus_queue_size, self.event_sender.clone(), self.dusk_key, diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 54661f8a93..178d0a537e 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -21,6 +21,7 @@ use dusk_consensus::operations::Voter; use dusk_consensus::user::provisioners::{ContextProvisioners, Provisioners}; use dusk_consensus::user::stake::Stake; use dusk_core::signatures::bls; +use dusk_core::stake::STAKE_CONTRACT; use dusk_core::stake::{SlashEvent, StakeAmount, StakeEvent}; use metrics::{counter, gauge, histogram}; use node_data::bls::PublicKey; @@ -38,6 +39,8 @@ use tokio::sync::{RwLock, RwLockReadGuard}; use tracing::{debug, error, info, trace, warn}; use super::consensus::Task; +#[cfg(feature = "archive")] +use crate::archive::Archive; use crate::chain::header_validation::{verify_att, verify_faults, Validator}; use crate::chain::metrics::AverageElapsedTime; use crate::database::rocksdb::{ @@ -78,6 +81,8 @@ pub(crate) struct Acceptor { pub(crate) db: Arc>, pub(crate) vm: Arc>, pub(crate) network: Arc>, + #[cfg(feature = "archive")] + pub(crate) archive: Archive, /// Sender channel for sending out RUES events event_sender: Sender, @@ -181,6 +186,7 @@ impl Acceptor { db: Arc>, network: Arc>, vm: Arc>, + #[cfg(feature = "archive")] archive: Archive, max_queue_size: usize, event_sender: Sender, dusk_key: bls::PublicKey, @@ -202,6 +208,8 @@ impl Acceptor { db: db.clone(), vm: vm.clone(), network: network.clone(), + #[cfg(feature = "archive")] + archive, task: RwLock::new(Task::new_with_keys( keys_path.to_string(), max_queue_size, @@ -696,30 +704,61 @@ impl Acceptor { let vm = self.vm.write().await; - let (stakes, finality) = self.db.read().await.update(|db| { - let (txs, verification_output, stake_events) = vm.accept( - prev_header.state_hash, - blk, - &prev_block_voters[..], - )?; - for spent_tx in txs.iter() { - events.push(TransactionEvent::Executed(spent_tx).into()); - } - est_elapsed_time = start.elapsed(); + let (all_txs_events, finality) = + self.db.read().await.update(|db| { + let (txs, verification_output, all_txs_events) = vm + .accept( + prev_header.state_hash, + blk, + &prev_block_voters[..], + )?; - assert_eq!(header.state_hash, verification_output.state_root); - assert_eq!(header.event_bloom, verification_output.event_bloom); + for spent_tx in txs.iter() { + events + .push(TransactionEvent::Executed(spent_tx).into()); + } + est_elapsed_time = start.elapsed(); - let finality = - self.rolling_finality::(pni, blk, db, &mut events)?; + assert_eq!( + header.state_hash, + verification_output.state_root + ); + assert_eq!( + header.event_bloom, + verification_output.event_bloom + ); - let label = finality.0; - // Store block with updated transactions with Error and GasSpent - block_size_on_disk = - db.store_block(header, &txs, blk.faults(), label)?; + let finality = + self.rolling_finality::(pni, blk, db, &mut events)?; + + let label = finality.0; + // Store block with updated transactions with Error and + // GasSpent + block_size_on_disk = + db.store_block(header, &txs, blk.faults(), label)?; + + Ok((all_txs_events, finality)) + })?; + + // Store all events from this current block in the archive + #[cfg(feature = "archive")] + self.archive + .store_unfinalized_events( + header.height, + header.hash, + all_txs_events.clone(), + ) + .await + .expect( + "Storing unfinalized events in archive should never fail", + ); - Ok((stake_events, finality)) - })?; + let mut stakes = vec![]; + for event in &all_txs_events { + if event.event.target.0 == STAKE_CONTRACT { + stakes.push(event.event.clone()); + } + } self.log_missing_iterations( provisioners_list.current(), diff --git a/node/src/lib.rs b/node/src/lib.rs index afb9b84d26..678231c656 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -22,6 +22,8 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; +#[cfg(feature = "archive")] +use archive::Archive; use async_trait::async_trait; use node_data::message::payload::Inv; use node_data::message::{AsyncQueue, Message}; @@ -118,6 +120,7 @@ pub trait LongLivedService: network: Arc>, database: Arc>, vm: Arc>, + #[cfg(feature = "archive")] achive: Archive, ) -> anyhow::Result<()> { Ok(()) } @@ -189,6 +192,7 @@ impl Node { pub async fn initialize( &self, services: &mut [Box>], + #[cfg(feature = "archive")] archive: Archive, ) -> anyhow::Result<()> { // Run lazy-initialization of all registered services for service in services.iter_mut() { @@ -198,6 +202,8 @@ impl Node { self.network.clone(), self.database.clone(), self.vm_handler.clone(), + #[cfg(feature = "archive")] + archive.clone(), ) .await?; } From 8684cefcb301100388645be7cf82e279293d5715 Mon Sep 17 00:00:00 2001 From: Neotamandua <107320179+Neotamandua@users.noreply.github.com> Date: Tue, 28 Jan 2025 15:23:29 +0200 Subject: [PATCH 06/11] rusk: remove archive event forwarding from ChainEventStreamer --- rusk/src/lib/builder/node.rs | 4 +-- rusk/src/lib/node/events.rs | 63 ------------------------------------ 2 files changed, 1 insertion(+), 66 deletions(-) diff --git a/rusk/src/lib/builder/node.rs b/rusk/src/lib/builder/node.rs index d030783cb7..c5a31ff0d0 100644 --- a/rusk/src/lib/builder/node.rs +++ b/rusk/src/lib/builder/node.rs @@ -200,7 +200,7 @@ impl RuskNodeBuilder { let (node_sender, node_receiver) = mpsc::channel(1000); #[cfg(feature = "archive")] - let (archive_sender, archive_receiver) = mpsc::channel(10000); + let (_, archive_receiver) = mpsc::channel(10000); #[cfg(feature = "archive")] let archive = Archive::create_or_open(self.db_path.clone()).await; @@ -266,8 +266,6 @@ impl RuskNodeBuilder { service_list.push(Box::new(ChainEventStreamer { node_receiver, rues_sender, - #[cfg(feature = "archive")] - archivist_sender: archive_sender, })); let mut handler = DataSources::default(); diff --git a/rusk/src/lib/node/events.rs b/rusk/src/lib/node/events.rs index 5c39ea55ff..a70028cf1d 100644 --- a/rusk/src/lib/node/events.rs +++ b/rusk/src/lib/node/events.rs @@ -13,19 +13,12 @@ use node_data::events::Event as ChainEvent; use tokio::sync::broadcast; use tokio::sync::mpsc::Receiver; use tracing::error; -#[cfg(feature = "archive")] -use { - node_data::archive::ArchivalData, node_data::events::BlockState, - serde_json::Value, tokio::sync::mpsc::Sender, -}; use crate::http::RuesEvent; pub(crate) struct ChainEventStreamer { pub node_receiver: Receiver, pub rues_sender: broadcast::Sender, - #[cfg(feature = "archive")] - pub archivist_sender: Sender, } #[async_trait] @@ -43,62 +36,6 @@ impl if let Err(e) = self.rues_sender.send(msg.clone().into()) { error!("Cannot send to rues {e:?}"); } - - #[cfg(feature = "archive")] - { - // NB: This is a temporary solution to send finalized and - // deleted blocks to the archivist in a decoupled way. - // We can remove this once the consensus acceptor can send - // these events directly to the archivist service. - match msg.topic { - // "statechange" & "deleted" are only in msg.component - // == "blocks" - "statechange" => { - if let Some(json_val) = msg.data { - let state = json_val - .get("state") - .and_then(Value::as_str) - .unwrap_or_default(); - let at_height = json_val - .get("atHeight") - .and_then(Value::as_u64) - .unwrap_or_default(); - - if state == BlockState::Finalized.as_str() { - if let Err(e) = self - .archivist_sender - .try_send(ArchivalData::FinalizedBlock( - at_height, - msg.entity.clone(), - )) - { - error!( - "Cannot send to archivist {e:?}" - ); - }; - } - }; - } - "deleted" => { - if let Some(json_val) = msg.data { - let at_height = json_val - .get("atHeight") - .and_then(Value::as_u64) - .unwrap_or_default(); - - if let Err(e) = self.archivist_sender.try_send( - ArchivalData::DeletedBlock( - at_height, - msg.entity.clone(), - ), - ) { - error!("Cannot send to archivist {e:?}"); - }; - }; - } - _ => (), - } - } } } } From ca2474f56f6c49bd3cfac95e5fd0a09e3a4286e6 Mon Sep 17 00:00:00 2001 From: Neotamandua <107320179+Neotamandua@users.noreply.github.com> Date: Tue, 28 Jan 2025 17:45:13 +0200 Subject: [PATCH 07/11] node: remove ArchivistSrv --- node/src/archive/archivist.rs | 77 ----------------------------------- 1 file changed, 77 deletions(-) delete mode 100644 node/src/archive/archivist.rs diff --git a/node/src/archive/archivist.rs b/node/src/archive/archivist.rs deleted file mode 100644 index 09667ab7d9..0000000000 --- a/node/src/archive/archivist.rs +++ /dev/null @@ -1,77 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. -// -// Copyright (c) DUSK NETWORK. All rights reserved. - -use std::sync::Arc; - -use async_trait::async_trait; -use node_data::archive::ArchivalData; -use tokio::sync::mpsc::Receiver; -use tokio::sync::RwLock; -use tracing::error; - -use crate::archive::Archive; -use crate::{database, vm, LongLivedService, Network}; - -pub struct ArchivistSrv { - pub archive_receiver: Receiver, - pub archivist: Archive, -} - -#[async_trait] -impl - LongLivedService for ArchivistSrv -{ - async fn execute( - &mut self, - _: Arc>, - _: Arc>, - _: Arc>, - ) -> anyhow::Result { - loop { - if let Some(msg) = self.archive_receiver.recv().await { - match msg { - ArchivalData::DeletedBlock(blk_height, hex_blk_hash) => { - if let Err(e) = self - .archivist - .remove_block_and_events(blk_height, &hex_blk_hash) - .await - { - error!( - "Failed to delete block in archive: {:?}", - e - ); - } - } - ArchivalData::FinalizedBlock(blk_height, hex_blk_hash) => { - if let Err(e) = self - .archivist - .finalize_archive_data(blk_height, &hex_blk_hash) - .await - { - error!( - "Failed to finalize block in archive: {:?}", - e - ); - } - } - } - } else { - error!( - "Sending side of the archive data channel has been closed" - ); - - break; - } - } - - Ok(0) - } - - /// Returns service name. - fn name(&self) -> &'static str { - "archivist" - } -} From df6920287929a7c24c63cb26bf328ac0182b9af0 Mon Sep 17 00:00:00 2001 From: Neotamandua <107320179+Neotamandua@users.noreply.github.com> Date: Tue, 28 Jan 2025 17:46:06 +0200 Subject: [PATCH 08/11] node-data: remove ArchivalData enum - Add PartialEq & Eq to BLockState for later - Add CHANGELOG entries --- node-data/CHANGELOG.md | 9 +++++++++ node-data/src/archive.rs | 18 ------------------ node-data/src/events/blocks.rs | 2 +- node-data/src/lib.rs | 1 - 4 files changed, 10 insertions(+), 20 deletions(-) delete mode 100644 node-data/src/archive.rs diff --git a/node-data/CHANGELOG.md b/node-data/CHANGELOG.md index fc2a8fbf78..31448f2b9c 100644 --- a/node-data/CHANGELOG.md +++ b/node-data/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add PartialEq, Eq to `BlockState` [#3359] + +### Removed + +- Removed `ArchivalData` together with archive module [#3359] + [1.0.1] - 2025-01-23 ### Changed @@ -20,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add Types used for interacting with Dusk node +[#3359]: https://github.com/dusk-network/rusk/issues/3359 [#3405]: https://github.com/dusk-network/rusk/issues/3405 [Unreleased]: https://github.com/dusk-network/rusk/compare/dusk-node-data-1.0.1...HEAD diff --git a/node-data/src/archive.rs b/node-data/src/archive.rs deleted file mode 100644 index 951ac66f1d..0000000000 --- a/node-data/src/archive.rs +++ /dev/null @@ -1,18 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. -// -// Copyright (c) DUSK NETWORK. All rights reserved. - -type HexHash = String; - -/// Defined data, that the archivist will store. -/// -/// This is also the type of the mpsc channel where the archivist listens for -/// data to archive. ContractEvents together with an unfinalized Block are -/// archived directly and not stored here. -#[derive(Debug)] -pub enum ArchivalData { - FinalizedBlock(u64, HexHash), - DeletedBlock(u64, HexHash), -} diff --git a/node-data/src/events/blocks.rs b/node-data/src/events/blocks.rs index 8083d98336..bd7e4ff052 100644 --- a/node-data/src/events/blocks.rs +++ b/node-data/src/events/blocks.rs @@ -19,7 +19,7 @@ use crate::ledger::{Block, Hash}; /// /// - `as_str() -> &'static str` - Returns the string representation of the /// block state. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum BlockState { Confirmed, Finalized, diff --git a/node-data/src/lib.rs b/node-data/src/lib.rs index 9615f0d7b6..ea685697fb 100644 --- a/node-data/src/lib.rs +++ b/node-data/src/lib.rs @@ -7,7 +7,6 @@ #![deny(unused_crate_dependencies)] #![deny(unused_extern_crates)] -pub mod archive; pub mod bls; pub mod encoding; pub mod events; From 8cd558d1c10e042e0dcd9e08ba88f071a6c2758c Mon Sep 17 00:00:00 2001 From: Neotamandua <107320179+Neotamandua@users.noreply.github.com> Date: Tue, 28 Jan 2025 18:13:43 +0200 Subject: [PATCH 09/11] node: use archive directly for block revert & finalization - Add CHANGELOG entries --- node/CHANGELOG.md | 9 ++++++++ node/src/archive.rs | 2 -- node/src/archive/sqlite.rs | 4 ++-- node/src/chain.rs | 8 ++++++-- node/src/chain/acceptor.rs | 42 ++++++++++++++++++++++++++++++++++---- node/src/lib.rs | 6 ------ 6 files changed, 55 insertions(+), 16 deletions(-) diff --git a/node/CHANGELOG.md b/node/CHANGELOG.md index f56c617e40..e9d9f531e8 100644 --- a/node/CHANGELOG.md +++ b/node/CHANGELOG.md @@ -7,10 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Change the way the archive synchronizes with the node Acceptor [#3359] + ### Changed - Change deprecated `tempdir` with `tempfile` dependency [#3407] +### Removed + +- Removed ArchivistSrv & archivist module [#3359] + ## [1.0.1] - 2025-01-23 ### Changed @@ -22,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - First `dusk-node` release +[#3359]: https://github.com/dusk-network/rusk/issues/3359 [#3407]: https://github.com/dusk-network/rusk/issues/3407 [#3405]: https://github.com/dusk-network/rusk/issues/3405 diff --git a/node/src/archive.rs b/node/src/archive.rs index 6a4ed620c0..7d3868ff65 100644 --- a/node/src/archive.rs +++ b/node/src/archive.rs @@ -12,12 +12,10 @@ use rocksdb::OptimisticTransactionDB; use sqlx::sqlite::SqlitePool; use tracing::debug; -mod archivist; mod moonlight; mod sqlite; mod transformer; -pub use archivist::ArchivistSrv; pub use moonlight::{MoonlightGroup, Order}; // Archive folder containing the sqlite database and the moonlight database diff --git a/node/src/archive/sqlite.rs b/node/src/archive/sqlite.rs index ebdc5b3cc4..58e3190fa7 100644 --- a/node/src/archive/sqlite.rs +++ b/node/src/archive/sqlite.rs @@ -276,7 +276,7 @@ impl Archive { /// This also triggers the loading of the MoonlightTxEvents into the /// moonlight db. This also updates the last finalized block height /// attribute. - pub(super) async fn finalize_archive_data( + pub(crate) async fn finalize_archive_data( &mut self, current_block_height: u64, hex_block_hash: &str, @@ -375,7 +375,7 @@ impl Archive { /// Remove the unfinalized block together with the unfinalized events of the /// given hash from the archive. - pub(super) async fn remove_block_and_events( + pub(crate) async fn remove_block_and_events( &self, current_block_height: u64, hex_block_hash: &str, diff --git a/node/src/chain.rs b/node/src/chain.rs index 98e25a005c..a5a4476616 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -61,6 +61,8 @@ pub struct ChainSrv { event_sender: Sender, genesis_timestamp: u64, dusk_key: BlsPublicKey, + #[cfg(feature = "archive")] + archive: Archive, } #[async_trait] @@ -72,7 +74,6 @@ impl network: Arc>, db: Arc>, vm: Arc>, - #[cfg(feature = "archive")] archive: Archive, ) -> anyhow::Result<()> { let tip = Self::load_tip( db.read().await.deref(), @@ -93,7 +94,7 @@ impl network, vm, #[cfg(feature = "archive")] - archive, + self.archive.clone(), self.max_consensus_queue_size, self.event_sender.clone(), self.dusk_key, @@ -256,6 +257,7 @@ impl ChainSrv { event_sender: Sender, genesis_timestamp: u64, dusk_key: BlsPublicKey, + #[cfg(feature = "archive")] archive: Archive, ) -> Self { info!( "ChainSrv::new with keys_path: {}, max_inbound_size: {}", @@ -270,6 +272,8 @@ impl ChainSrv { event_sender, genesis_timestamp, dusk_key, + #[cfg(feature = "archive")] + archive, } } diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 178d0a537e..e0e16c5cb1 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -740,9 +740,23 @@ impl Acceptor { Ok((all_txs_events, finality)) })?; - // Store all events from this current block in the archive + // use rolling_finality_events for archive #[cfg(feature = "archive")] - self.archive + { + if let Some((_, new_finals)) = &finality.1 { + for (height, hash) in new_finals.iter() { + if let Err(e) = self + .archive + .finalize_archive_data(*height, &hex::encode(hash)) + .await + { + error!("Failed to finalize block in archive: {e:?}") + } + } + } + + // Store all events from this current block in the archive + self.archive .store_unfinalized_events( header.height, header.hash, @@ -752,11 +766,12 @@ impl Acceptor { .expect( "Storing unfinalized events in archive should never fail", ); + } let mut stakes = vec![]; - for event in &all_txs_events { + for event in all_txs_events { if event.event.target.0 == STAKE_CONTRACT { - stakes.push(event.event.clone()); + stakes.push(event.event); } } @@ -1106,6 +1121,8 @@ impl Acceptor { // VM was reverted to. // The blockchain tip after reverting + #[cfg(feature = "archive")] + let mut archive_revert_info: Vec<(u64, String)> = vec![]; let (blk, label) = self.db.read().await.update(|db| { let mut height = curr_height; loop { @@ -1137,6 +1154,10 @@ impl Acceptor { warn!("cannot notify event {e}") }; + // Temporary store the reverted block info for archive + #[cfg(feature = "archive")] + archive_revert_info.push((h.height, hex::encode(h.hash))); + info!( event = "block reverted", height = h.height, @@ -1175,6 +1196,19 @@ impl Acceptor { state_root = hex::encode(blk.header().state_hash) ); + // Remove the block and event entries for this block from the + // archive + #[cfg(feature = "archive")] + for (height, hex_hash) in archive_revert_info { + if let Err(e) = self + .archive + .remove_block_and_events(height, &hex_hash) + .await + { + error!("Failed to delete block & events in archive: {:?}", e); + } + } + self.update_tip(&blk, label).await } diff --git a/node/src/lib.rs b/node/src/lib.rs index 678231c656..afb9b84d26 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -22,8 +22,6 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; -#[cfg(feature = "archive")] -use archive::Archive; use async_trait::async_trait; use node_data::message::payload::Inv; use node_data::message::{AsyncQueue, Message}; @@ -120,7 +118,6 @@ pub trait LongLivedService: network: Arc>, database: Arc>, vm: Arc>, - #[cfg(feature = "archive")] achive: Archive, ) -> anyhow::Result<()> { Ok(()) } @@ -192,7 +189,6 @@ impl Node { pub async fn initialize( &self, services: &mut [Box>], - #[cfg(feature = "archive")] archive: Archive, ) -> anyhow::Result<()> { // Run lazy-initialization of all registered services for service in services.iter_mut() { @@ -202,8 +198,6 @@ impl Node { self.network.clone(), self.database.clone(), self.vm_handler.clone(), - #[cfg(feature = "archive")] - archive.clone(), ) .await?; } From a5500512a3e222e41ebe524ed69a07b8b9223d4f Mon Sep 17 00:00:00 2001 From: Neotamandua <107320179+Neotamandua@users.noreply.github.com> Date: Tue, 28 Jan 2025 18:25:38 +0200 Subject: [PATCH 10/11] rusk: remove archive channel & service - Add CHANGELOG entries --- node/src/chain/acceptor.rs | 10 +++++----- rusk/CHANGELOG.md | 3 +++ rusk/src/lib/builder/node.rs | 24 +++++------------------- rusk/src/lib/node/rusk.rs | 6 +++--- rusk/src/lib/node/vm.rs | 4 ++-- 5 files changed, 18 insertions(+), 29 deletions(-) diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index e0e16c5cb1..ac3a703f67 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -704,9 +704,9 @@ impl Acceptor { let vm = self.vm.write().await; - let (all_txs_events, finality) = + let (contract_events, finality) = self.db.read().await.update(|db| { - let (txs, verification_output, all_txs_events) = vm + let (txs, verification_output, contract_events) = vm .accept( prev_header.state_hash, blk, @@ -737,7 +737,7 @@ impl Acceptor { block_size_on_disk = db.store_block(header, &txs, blk.faults(), label)?; - Ok((all_txs_events, finality)) + Ok((contract_events, finality)) })?; // use rolling_finality_events for archive @@ -760,7 +760,7 @@ impl Acceptor { .store_unfinalized_events( header.height, header.hash, - all_txs_events.clone(), + contract_events.clone(), ) .await .expect( @@ -769,7 +769,7 @@ impl Acceptor { } let mut stakes = vec![]; - for event in all_txs_events { + for event in contract_events { if event.event.target.0 == STAKE_CONTRACT { stakes.push(event.event); } diff --git a/rusk/CHANGELOG.md b/rusk/CHANGELOG.md index 50db6f6b7e..1a3609b715 100644 --- a/rusk/CHANGELOG.md +++ b/rusk/CHANGELOG.md @@ -19,10 +19,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Deprecate `[chain].generation_timeout` config [#3341] - Deprecate `[chain].min_deploy_points` config [#3341] - Deprecate `[chain].block_gas_limit` config [#3341] +- Change how Rusk controls the archive for synchronization [#3359] ### Removed - Remove legacy event system +- Remove archive mpsc channel & archive event forwarding [#3359] ## [1.0.2] - 2025-01-27 @@ -319,6 +321,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add build system that generates keys for circuits and caches them. +[#3359]: https://github.com/dusk-network/rusk/issues/3359 [#3422]: https://github.com/dusk-network/rusk/issues/3422 [#3405]: https://github.com/dusk-network/rusk/issues/3405 [#3341]: https://github.com/dusk-network/rusk/issues/3341 diff --git a/rusk/src/lib/builder/node.rs b/rusk/src/lib/builder/node.rs index c5a31ff0d0..68a89d027e 100644 --- a/rusk/src/lib/builder/node.rs +++ b/rusk/src/lib/builder/node.rs @@ -19,10 +19,10 @@ use node::network::Kadcast; use node::telemetry::TelemetrySrv; use node::{LongLivedService, Node}; +#[cfg(feature = "archive")] +use node::archive::Archive; use tokio::sync::{broadcast, mpsc}; use tracing::info; -#[cfg(feature = "archive")] -use {node::archive::Archive, node::archive::ArchivistSrv}; use crate::http::{DataSources, HttpServer, HttpServerConfig}; use crate::node::{ChainEventStreamer, RuskNode, RuskVmConfig, Services}; @@ -199,8 +199,6 @@ impl RuskNodeBuilder { let (rues_sender, rues_receiver) = broadcast::channel(channel_cap); let (node_sender, node_receiver) = mpsc::channel(1000); - #[cfg(feature = "archive")] - let (_, archive_receiver) = mpsc::channel(10000); #[cfg(feature = "archive")] let archive = Archive::create_or_open(self.db_path.clone()).await; @@ -238,6 +236,8 @@ impl RuskNodeBuilder { node_sender.clone(), self.genesis_timestamp, *crate::DUSK_CONSENSUS_KEY, + #[cfg(feature = "archive")] + archive.clone(), ); if self.command_revert { chain_srv @@ -245,8 +245,6 @@ impl RuskNodeBuilder { node.inner().network(), node.inner().database(), node.inner().vm_handler(), - #[cfg(feature = "archive")] - archive, ) .await?; return chain_srv.revert_last_final().await; @@ -293,19 +291,7 @@ impl RuskNodeBuilder { ); } - #[cfg(feature = "archive")] - service_list.push(Box::new(ArchivistSrv { - archive_receiver, - archivist: archive.clone(), - })); - - node.inner() - .initialize( - &mut service_list, - #[cfg(feature = "archive")] - archive, - ) - .await?; + node.inner().initialize(&mut service_list).await?; node.inner().spawn_all(service_list).await?; Ok(()) diff --git a/rusk/src/lib/node/rusk.rs b/rusk/src/lib/node/rusk.rs index 2f28f683c2..3e06f8d4fa 100644 --- a/rusk/src/lib/node/rusk.rs +++ b/rusk/src/lib/node/rusk.rs @@ -276,7 +276,7 @@ impl Rusk { /// # Returns /// - Vec - The transactions that were spent. /// - VerificationOutput - The verification output. - /// - Vec - All contract events that were emitted from the + /// - Vec - All contract events that were emitted from the /// given transactions. #[allow(clippy::too_many_arguments)] pub fn accept_transactions( @@ -323,7 +323,7 @@ impl Rusk { self.set_current_commit(session.commit()?); - let all_txs_events = events.clone(); + let contract_events = events.clone(); for event in events { // Send VM event to RUES let event = RuesEvent::from(event); @@ -331,7 +331,7 @@ impl Rusk { } // TODO: move this also in acceptor (async fn try_accept_block) where // stake events are filtered, to avoid looping twice? - Ok((spent_txs, verification_output, all_txs_events)) + Ok((spent_txs, verification_output, contract_events)) } pub fn finalize_state( diff --git a/rusk/src/lib/node/vm.rs b/rusk/src/lib/node/vm.rs index 720e10067e..aa8987b089 100644 --- a/rusk/src/lib/node/vm.rs +++ b/rusk/src/lib/node/vm.rs @@ -99,7 +99,7 @@ impl VMExecution for Rusk { let slashing = Slash::from_block(blk)?; - let (txs, verification_output, all_txs_events) = self + let (txs, verification_output, contract_events) = self .accept_transactions( prev_root, blk.header().height, @@ -116,7 +116,7 @@ impl VMExecution for Rusk { ) .map_err(|inner| anyhow::anyhow!("Cannot accept txs: {inner}!!"))?; - Ok((txs, verification_output, all_txs_events)) + Ok((txs, verification_output, contract_events)) } fn move_to_commit(&self, commit: [u8; 32]) -> anyhow::Result<()> { From 5b3895b6c66838eca5ef7e69e1a4b7fc194b55ce Mon Sep 17 00:00:00 2001 From: Neotamandua <107320179+Neotamandua@users.noreply.github.com> Date: Mon, 3 Feb 2025 17:22:17 +0200 Subject: [PATCH 11/11] node: add block hash to rolling_finality return val --- node/src/chain/acceptor.rs | 65 +++++++++++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 15 deletions(-) diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index ac3a703f67..716339b2a3 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -56,7 +56,19 @@ const CANDIDATES_DELETION_OFFSET: u64 = 10; /// future message. const OFFSET_FUTURE_MSGS: u64 = 5; -pub type RollingFinalityResult = ([u8; 32], BTreeMap); +struct Identifiers { + /// Block hash of the newly finalized block + block_hash: [u8; 32], + /// State root of the newly finalized block + state_root: [u8; 32], +} + +struct RollingFinalityResult { + /// State root of the last finalized block + prev_final_state_root: [u8; 32], + /// New finalized blocks + new_finals: BTreeMap, +} #[allow(dead_code)] pub(crate) enum RevertTarget { @@ -743,11 +755,18 @@ impl Acceptor { // use rolling_finality_events for archive #[cfg(feature = "archive")] { - if let Some((_, new_finals)) = &finality.1 { - for (height, hash) in new_finals.iter() { + if let Some(RollingFinalityResult { new_finals, .. }) = + &finality.1 + { + for (height, Identifiers { block_hash, .. }) in + new_finals.iter() + { if let Err(e) = self .archive - .finalize_archive_data(*height, &hex::encode(hash)) + .finalize_archive_data( + *height, + &hex::encode(block_hash), + ) .await { error!("Failed to finalize block in archive: {e:?}") @@ -811,14 +830,21 @@ impl Acceptor { let finalized = final_results.is_some(); - if let Some((prev_final_state, mut new_finals)) = final_results { + if let Some(RollingFinalityResult { + prev_final_state_root, + mut new_finals, + }) = final_results + { let (_, new_final_state) = new_finals.pop_last().expect("new_finals to be not empty"); - let old_finals_to_merge = new_finals + let new_final_state_root = new_final_state.state_root; + // old final state roots to merge too + let old_final_state_roots = new_finals .into_values() - .chain([prev_final_state]) + .map(|finalized_info| finalized_info.state_root) + .chain([prev_final_state_root]) .collect::>(); - vm.finalize_state(new_final_state, old_finals_to_merge)?; + vm.finalize_state(new_final_state_root, old_final_state_roots)?; } anyhow::Ok((label, finalized)) @@ -944,7 +970,8 @@ impl Acceptor { /// Returns /// - Current accepted block label /// - Previous last finalized state root - /// - List of the new finalized state root + /// - List of the new finalized state root together with the respective + /// block hash fn rolling_finality( &self, pni: u8, // Previous Non-Attested Iterations @@ -980,7 +1007,7 @@ impl Acceptor { } let lfb_hash = lfb_hash.expect("Unable to find last finalized block hash"); - let lfb_state_root = db + let prev_final_state_root = db .block_header(&lfb_hash)? .ok_or(anyhow!( "Cannot get header for last finalized block hash {}", @@ -1052,23 +1079,28 @@ impl Acceptor { events.push(event.into()); db.store_block_label(height, &hash, label)?; - let state_hash = db + let state_root = db .block_header(&hash)? .map(|h| h.state_hash) .ok_or(anyhow!( "Cannot get header for hash {}", to_str(&hash) ))?; + let finalized = Identifiers { + block_hash: hash, + state_root, + }; info!( event = "block finalized", src = "rolling_finality", current_height, height, finalized_after, - hash = to_str(&hash), - state_root = to_str(&state_hash), + hash = to_str(&finalized.block_hash), + state_root = to_str(&finalized.state_root), ); - finalized_blocks.insert(height, state_hash); + + finalized_blocks.insert(height, finalized); } } } @@ -1076,7 +1108,10 @@ impl Acceptor { let finalized_result = if finalized_blocks.is_empty() { None } else { - Some((lfb_state_root, finalized_blocks)) + Some(RollingFinalityResult { + prev_final_state_root, + new_finals: finalized_blocks, + }) }; Ok((block_label, finalized_result))