Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consensus: Fix reorgs #384

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 27 additions & 6 deletions binaries/cuprated/src/blockchain/manager/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,12 @@ impl super::BlockchainManager {
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from or if the incoming batch contains no blocks.
async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
let Ok(prepped_blocks) =
batch_prepare_main_chain_blocks(batch.blocks, &mut self.blockchain_context_service)
.await
let Ok((prepped_blocks, mut output_cache)) = batch_prepare_main_chain_blocks(
batch.blocks,
&mut self.blockchain_context_service,
self.blockchain_read_handle.clone(),
)
.await
else {
batch.peer_handle.ban_peer(LONG_BAN);
self.stop_current_block_downloader.notify_one();
Expand All @@ -181,6 +184,7 @@ impl super::BlockchainManager {
txs,
&mut self.blockchain_context_service,
self.blockchain_read_handle.clone(),
Some(&mut output_cache),
)
.await
else {
Expand Down Expand Up @@ -267,12 +271,28 @@ impl super::BlockchainManager {
block: Block,
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
) -> Result<AddAltBlock, anyhow::Error> {
// Check if a block already exists.
let BlockchainResponse::FindBlock(chain) = self
.blockchain_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockchainReadRequest::FindBlock(block.hash()))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!();
};

if chain.is_some() {
// The block could also be in the main-chain here under some circumstances.
return Ok(AddAltBlock::Cached);
}

let alt_block_info =
sanity_check_alt_block(block, prepared_txs, self.blockchain_context_service.clone())
.await?;

// TODO: check in consensus crate if alt block with this hash already exists.

// If this alt chain has more cumulative difficulty, reorg.
if alt_block_info.cumulative_difficulty
> self
Expand Down Expand Up @@ -404,6 +424,7 @@ impl super::BlockchainManager {
prepped_txs,
&mut self.blockchain_context_service,
self.blockchain_read_handle.clone(),
None,
)
.await?;

Expand Down Expand Up @@ -474,7 +495,7 @@ impl super::BlockchainManager {

/// The result from successfully adding an alt-block.
enum AddAltBlock {
/// The alt-block was cached.
/// The alt-block was cached or was already present in the DB.
Cached,
/// The chain was reorged.
Reorged,
Expand Down
2 changes: 1 addition & 1 deletion binaries/cuprated/src/blockchain/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};

use futures::StreamExt;
use tokio::{
sync::{mpsc, Notify},
sync::{mpsc, oneshot, Notify},
time::interval,
};
use tower::{Service, ServiceExt};
Expand Down
5 changes: 3 additions & 2 deletions binaries/cuprated/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
reason = "TODO: remove after v1.0.0"
)]

use std::mem;
use std::sync::Arc;
use std::{mem, sync::Arc};

use tokio::sync::mpsc;
use tower::{Service, ServiceExt};
use tracing::level_filters::LevelFilter;
Expand All @@ -27,6 +27,7 @@ use cuprate_consensus_context::{
BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
};
use cuprate_helper::time::secs_to_hms;
use cuprate_types::blockchain::BlockchainWriteRequest;

use crate::{
config::Config, constants::PANIC_CRITICAL_SERVICE_ERROR, logging::CupratedTracingFilter,
Expand Down
6 changes: 4 additions & 2 deletions binaries/cuprated/src/rpc/request/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use std::{
};

use anyhow::Error;
use indexmap::{IndexMap, IndexSet};
use monero_serai::block::Block;
use tower::{Service, ServiceExt};

use cuprate_blockchain::{service::BlockchainReadHandle, types::AltChainInfo};
use cuprate_helper::cast::{u64_to_usize, usize_to_u64};
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
output_cache::OutputCache,
Chain, ChainInfo, CoinbaseTxSum, ExtendedBlockHeader, HardFork, MinerData,
OutputHistogramEntry, OutputHistogramInput, OutputOnChain,
};
Expand Down Expand Up @@ -185,8 +187,8 @@ pub(crate) async fn generated_coins(
/// [`BlockchainReadRequest::Outputs`]
pub(crate) async fn outputs(
blockchain_read: &mut BlockchainReadHandle,
outputs: HashMap<u64, HashSet<u64>>,
) -> Result<HashMap<u64, HashMap<u64, OutputOnChain>>, Error> {
outputs: IndexMap<u64, IndexSet<u64>>,
) -> Result<OutputCache, Error> {
let BlockchainResponse::Outputs(outputs) = blockchain_read
.ready()
.await?
Expand Down
1 change: 1 addition & 0 deletions binaries/cuprated/src/txpool/incoming_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ async fn handle_incoming_txs(
context.current_adjusted_timestamp_for_time_lock(),
context.current_hf,
blockchain_read_handle,
None,
)
.verify()
.await
Expand Down
2 changes: 1 addition & 1 deletion books/architecture/src/resources/cap/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ There are "synchronization primitives" that help with this, common ones being:
- [Channels](https://en.wikipedia.org/wiki/Channel_(programming))
- [Atomics](https://en.wikipedia.org/wiki/Linearizability#Primitive_atomic_instructions)

These tools are relatively easy to use in isolation, but trickier to do so when considering the entire system. It is not uncommon for _the_ bottleneck to be the [poor orchastration](https://en.wikipedia.org/wiki/Starvation_(computer_science)) of these primitives.
These tools are relatively easy to use in isolation, but trickier to do so when considering the entire system. It is not uncommon for _the_ bottleneck to be the [poor orchestration](https://en.wikipedia.org/wiki/Starvation_(computer_science)) of these primitives.

## Analogy
A common analogy for a parallel system is an intersection.
Expand Down
1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ monero-serai = { workspace = true, features = ["std"] }
rayon = { workspace = true }
thread_local = { workspace = true }

indexmap = { workspace = true, features = ["std"] }
hex = { workspace = true }
rand = { workspace = true }

Expand Down
10 changes: 3 additions & 7 deletions consensus/context/src/alt_chains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,8 @@ impl AltChainMap {
}

/// Add an alt chain cache to the map.
pub(crate) fn add_alt_cache(
&mut self,
prev_id: [u8; 32],
alt_cache: Box<AltChainContextCache>,
) {
self.alt_cache_map.insert(prev_id, alt_cache);
pub(crate) fn add_alt_cache(&mut self, alt_cache: Box<AltChainContextCache>) {
self.alt_cache_map.insert(alt_cache.top_hash, alt_cache);
}

/// Attempts to take an [`AltChainContextCache`] from the map, returning [`None`] if no cache is
Expand Down Expand Up @@ -119,7 +115,7 @@ impl AltChainMap {
weight_cache: None,
difficulty_cache: None,
cached_rx_vm: None,
chain_height: top_height,
chain_height: top_height + 1,
top_hash: prev_id,
chain_id: None,
parent_chain,
Expand Down
2 changes: 0 additions & 2 deletions consensus/context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,6 @@ pub enum BlockChainContextRequest {
/// This variant is private and is not callable from outside this crate, the block verifier service will
/// handle returning the alt cache to the context service.
AddAltChainContextCache {
/// The previous block field in a [`BlockHeader`](monero_serai::block::BlockHeader).
prev_id: [u8; 32],
/// The cache.
cache: Box<AltChainContextCache>,
/// An internal token to prevent external crates calling this request.
Expand Down
8 changes: 2 additions & 6 deletions consensus/context/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,8 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
.get_alt_vm(height, chain, &mut self.database)
.await?,
),
BlockChainContextRequest::AddAltChainContextCache {
prev_id,
cache,
_token,
} => {
self.alt_chain_cache_map.add_alt_cache(prev_id, cache);
BlockChainContextRequest::AddAltChainContextCache { cache, _token } => {
self.alt_chain_cache_map.add_alt_cache(cache);
BlockChainContextResponse::Ok
}
BlockChainContextRequest::HardForkInfo(_)
Expand Down
1 change: 1 addition & 0 deletions consensus/rules/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ curve25519-dalek = { workspace = true, features = ["alloc", "zeroize", "precompu

rand = { workspace = true, features = ["std", "std_rng"] }

indexmap = { workspace = true, features = ["std"] }
hex = { workspace = true, features = ["std"] }
hex-literal = { workspace = true }
crypto-bigint = { workspace = true }
Expand Down
8 changes: 3 additions & 5 deletions consensus/rules/src/transactions/contextual_data.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::{
cmp::{max, min},
collections::{HashMap, HashSet},
};
use std::cmp::{max, min};

use curve25519_dalek::EdwardsPoint;
use indexmap::{IndexMap, IndexSet};
use monero_serai::transaction::{Input, Timelock};

use crate::{transactions::TransactionError, HardFork};
Expand Down Expand Up @@ -33,7 +31,7 @@ pub fn get_absolute_offsets(relative_offsets: &[u64]) -> Result<Vec<u64>, Transa
///
pub fn insert_ring_member_ids(
inputs: &[Input],
output_ids: &mut HashMap<u64, HashSet<u64>>,
output_ids: &mut IndexMap<u64, IndexSet<u64>>,
) -> Result<(), TransactionError> {
if inputs.is_empty() {
return Err(TransactionError::NoInputs);
Expand Down
16 changes: 12 additions & 4 deletions consensus/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod batch_prepare;
mod free;

pub use alt_block::sanity_check_alt_block;
pub use batch_prepare::batch_prepare_main_chain_blocks;
pub use batch_prepare::{batch_prepare_main_chain_blocks, BatchPrepareCache};
use free::pull_ordered_transactions;

/// A pre-prepared block with all data needed to verify it, except the block's proof of work.
Expand Down Expand Up @@ -243,7 +243,7 @@ where
// Check that the txs included are what we need and that there are not any extra.
let ordered_txs = pull_ordered_transactions(&prepped_block.block, txs)?;

verify_prepped_main_chain_block(prepped_block, ordered_txs, context_svc, database).await
verify_prepped_main_chain_block(prepped_block, ordered_txs, context_svc, database, None).await
}

/// Fully verify a block that has already been prepared using [`batch_prepare_main_chain_blocks`].
Expand All @@ -252,6 +252,7 @@ pub async fn verify_prepped_main_chain_block<D>(
mut txs: Vec<TransactionVerificationData>,
context_svc: &mut BlockchainContextService,
database: D,
batch_prep_cache: Option<&mut BatchPrepareCache>,
) -> Result<VerifiedBlockInformation, ExtendedConsensusError>
where
D: Database + Clone + Send + 'static,
Expand Down Expand Up @@ -283,6 +284,7 @@ where
context.current_adjusted_timestamp_for_time_lock(),
context.current_hf,
database,
batch_prep_cache.as_deref(),
)
.verify()
.await?;
Expand All @@ -304,7 +306,7 @@ where
)
.map_err(ConsensusError::Block)?;

Ok(VerifiedBlockInformation {
let block = VerifiedBlockInformation {
block_hash: prepped_block.block_hash,
block: prepped_block.block,
block_blob: prepped_block.block_blob,
Expand All @@ -324,5 +326,11 @@ where
height: context.chain_height,
long_term_weight: context.next_block_long_term_weight(block_weight),
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
})
};

if let Some(batch_prep_cache) = batch_prep_cache {
batch_prep_cache.output_cache.add_block_to_cache(&block);
}

Ok(block)
}
1 change: 0 additions & 1 deletion consensus/src/block/alt_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ where
// Add this alt cache back to the context service.
context_svc
.oneshot(BlockChainContextRequest::AddAltChainContextCache {
prev_id: block_info.block.header.previous,
cache: alt_context_cache,
_token: AltChainRequestToken,
})
Expand Down
41 changes: 36 additions & 5 deletions consensus/src/block/batch_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,41 @@ use cuprate_consensus_rules::{
ConsensusError, HardFork,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_types::TransactionVerificationData;
use cuprate_types::{output_cache::OutputCache, TransactionVerificationData};

use crate::{
batch_verifier::MultiThreadedBatchVerifier,
block::{free::order_transactions, PreparedBlock, PreparedBlockExPow},
transactions::start_tx_verification,
transactions::{check_kis_unique, contextual_data::get_output_cache, start_tx_verification},
BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
__private::Database,
};

/// Cached state created when batch preparing a group of blocks.
///
/// This cache is only valid for the set of blocks it was created with, it should not be used for
/// other blocks.
pub struct BatchPrepareCache {
pub(crate) output_cache: OutputCache,
/// [`true`] if all the key images in the batch have been checked for double spends in the batch and
/// the whole chain.
pub(crate) key_images_spent_checked: bool,
}

/// Batch prepares a list of blocks for verification.
#[instrument(level = "debug", name = "batch_prep_blocks", skip_all, fields(amt = blocks.len()))]
pub async fn batch_prepare_main_chain_blocks(
#[expect(clippy::type_complexity)]
pub async fn batch_prepare_main_chain_blocks<D: Database>(
blocks: Vec<(Block, Vec<Transaction>)>,
context_svc: &mut BlockchainContextService,
) -> Result<Vec<(PreparedBlock, Vec<TransactionVerificationData>)>, ExtendedConsensusError> {
mut database: D,
) -> Result<
(
Vec<(PreparedBlock, Vec<TransactionVerificationData>)>,
BatchPrepareCache,
),
ExtendedConsensusError,
> {
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();

tracing::debug!("Calculating block hashes.");
Expand Down Expand Up @@ -189,5 +209,16 @@ pub async fn batch_prepare_main_chain_blocks(
})
.await?;

Ok(blocks)
check_kis_unique(blocks.iter().flat_map(|(_, txs)| txs.iter()), &mut database).await?;

let output_cache =
get_output_cache(blocks.iter().flat_map(|(_, txs)| txs.iter()), database).await?;

Ok((
blocks,
BatchPrepareCache {
output_cache,
key_images_spent_checked: true,
},
))
}
Loading