From 343656cb3c82e05349d8ace024fd507534dd464b Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Tue, 4 Feb 2025 23:24:37 -0300 Subject: [PATCH] fix(mining): Advertise mined blocks (#9176) * add a channel for submit_block notifications to gossip task * fix tests and gossip logic * remove the network discriminant and add a test * clippy suggestions * fix unused variable * attempt to fix the conditional compilation issues * fix default * Suggestions for "fix(mining): Advertise mined blocks" (#9183) * refactor error conversions in GetBlockTemplateRpcImpl and rewords documentation * Replaces polling mined block receiver with a select * Skip checking that Zebra is likely synced to the network tip before returning block templates on Testnet. * fixes a clippy lint and a concurrency bug --------- Co-authored-by: Arya --- .../src/methods/get_block_template_rpcs.rs | 21 ++- .../get_block_template.rs | 5 +- .../types/submit_block.rs | 36 ++++++ .../tests/snapshot/get_block_template_rpcs.rs | 3 + zebra-rpc/src/methods/tests/vectors.rs | 15 ++- zebra-rpc/src/server.rs | 5 +- zebra-rpc/src/server/error.rs | 23 ++++ zebra-rpc/src/server/tests/vectors.rs | 4 + zebrad/src/commands/start.rs | 17 +++ .../components/inbound/tests/fake_peer_set.rs | 23 +++- .../components/inbound/tests/real_peer_set.rs | 121 ++++++++++++++++++ zebrad/src/components/sync/gossip.rs | 69 +++++++--- zebrad/tests/acceptance.rs | 22 +++- 13 files changed, 327 insertions(+), 37 deletions(-) diff --git a/zebra-rpc/src/methods/get_block_template_rpcs.rs b/zebra-rpc/src/methods/get_block_template_rpcs.rs index 2bb9a0ca393..3cdb8ef079c 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs.rs @@ -6,6 +6,7 @@ use futures::{future::OptionFuture, TryFutureExt}; use jsonrpsee::core::{async_trait, RpcResult as Result}; use jsonrpsee_proc_macros::rpc; use jsonrpsee_types::ErrorObject; +use tokio::sync::watch; use tower::{Service, ServiceExt}; use zcash_address::{unified::Encoding, TryFromAddress}; @@ -63,7 +64,10 @@ use crate::{ hex_data::HexData, GetBlockHash, }, - server::{self, error::MapError}, + server::{ + self, + error::{MapError, OkOrError}, + }, }; pub mod constants; @@ -375,6 +379,10 @@ pub struct GetBlockTemplateRpcImpl< /// Address book of peers, used for `getpeerinfo`. address_book: AddressBook, + + /// A channel to send successful block submissions to the block gossip task, + /// so they can be advertised to peers. + mined_block_sender: watch::Sender<(block::Hash, block::Height)>, } impl Debug @@ -465,6 +473,7 @@ where block_verifier_router: BlockVerifierRouter, sync_status: SyncStatus, address_book: AddressBook, + mined_block_sender: Option>, ) -> Self { // Prevent loss of miner funds due to an unsupported or incorrect address type. if let Some(miner_address) = mining_config.miner_address.clone() { @@ -527,6 +536,8 @@ where block_verifier_router, sync_status, address_book, + mined_block_sender: mined_block_sender + .unwrap_or(submit_block::SubmitBlockChannel::default().sender()), } } } @@ -937,8 +948,7 @@ where let block_height = block .coinbase_height() - .map(|height| height.0.to_string()) - .unwrap_or_else(|| "invalid coinbase height".to_string()); + .ok_or_error(0, "coinbase height not found")?; let block_hash = block.hash(); let block_verifier_router_response = block_verifier_router @@ -957,6 +967,11 @@ where // The difference is important to miners, because they want to mine on the best chain. Ok(block_hash) => { tracing::info!(?block_hash, ?block_height, "submit block accepted"); + + self.mined_block_sender + .send((block_hash, block_height)) + .map_error_with_prefix(0, "failed to send mined block")?; + return Ok(submit_block::Response::Accepted); } diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/get_block_template.rs b/zebra-rpc/src/methods/get_block_template_rpcs/get_block_template.rs index baa0200db1f..2a8a9f60109 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs/get_block_template.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs/get_block_template.rs @@ -161,10 +161,7 @@ where Tip: ChainTip + Clone + Send + Sync + 'static, SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, { - // TODO: - // - Add a `disable_peers` field to `Network` to check instead of `disable_pow()` (#8361) - // - Check the field in `sync_status` so it applies to the mempool as well. - if network.disable_pow() { + if network.is_a_test_network() { return Ok(()); } diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/types/submit_block.rs b/zebra-rpc/src/methods/get_block_template_rpcs/types/submit_block.rs index cec806901bb..54f593eb867 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs/types/submit_block.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs/types/submit_block.rs @@ -1,5 +1,9 @@ //! Parameter and response types for the `submitblock` RPC. +use tokio::sync::watch; + +use zebra_chain::{block, parameters::GENESIS_PREVIOUS_BLOCK_HASH}; + // Allow doc links to these imports. #[allow(unused_imports)] use crate::methods::get_block_template_rpcs::GetBlockTemplate; @@ -64,3 +68,35 @@ impl From for Response { Self::ErrorResponse(error_response) } } + +/// A submit block channel, used to inform the gossip task about mined blocks. +pub struct SubmitBlockChannel { + /// The channel sender + sender: watch::Sender<(block::Hash, block::Height)>, + /// The channel receiver + receiver: watch::Receiver<(block::Hash, block::Height)>, +} + +impl SubmitBlockChannel { + /// Create a new submit block channel + pub fn new() -> Self { + let (sender, receiver) = watch::channel((GENESIS_PREVIOUS_BLOCK_HASH, block::Height::MIN)); + Self { sender, receiver } + } + + /// Get the channel sender + pub fn sender(&self) -> watch::Sender<(block::Hash, block::Height)> { + self.sender.clone() + } + + /// Get the channel receiver + pub fn receiver(&self) -> watch::Receiver<(block::Hash, block::Height)> { + self.receiver.clone() + } +} + +impl Default for SubmitBlockChannel { + fn default() -> Self { + Self::new() + } +} diff --git a/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs b/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs index a512faf7cfc..ec6d2fe4870 100644 --- a/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs +++ b/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs @@ -151,6 +151,7 @@ pub async fn test_responses( block_verifier_router.clone(), mock_sync_status.clone(), mock_address_book, + None, ); if network.is_a_test_network() && !network.is_default_testnet() { @@ -286,6 +287,7 @@ pub async fn test_responses( block_verifier_router, mock_sync_status.clone(), MockAddressBookPeers::default(), + None, ); // Basic variant (default mode and no extra features) @@ -395,6 +397,7 @@ pub async fn test_responses( mock_block_verifier_router.clone(), mock_sync_status, MockAddressBookPeers::default(), + None, ); let get_block_template_fut = get_block_template_rpc_mock_state_verifier.get_block_template( diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index e1f559b8e4f..7d2f88f1983 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -6,13 +6,12 @@ use std::sync::Arc; use futures::FutureExt; use tower::buffer::Buffer; -use zebra_chain::serialization::ZcashSerialize; use zebra_chain::{ amount::Amount, block::Block, chain_tip::{mock::MockChainTip, NoChainTip}, parameters::Network::*, - serialization::ZcashDeserializeInto, + serialization::{ZcashDeserializeInto, ZcashSerialize}, transaction::UnminedTxId, }; use zebra_node_services::BoxError; @@ -1195,6 +1194,7 @@ async fn rpc_getblockcount() { block_verifier_router, MockSyncStatus::default(), MockAddressBookPeers::default(), + None, ); // Get the tip height using RPC method `get_block_count` @@ -1244,6 +1244,7 @@ async fn rpc_getblockcount_empty_state() { block_verifier_router, MockSyncStatus::default(), MockAddressBookPeers::default(), + None, ); // Get the tip height using RPC method `get_block_count @@ -1312,6 +1313,7 @@ async fn rpc_getpeerinfo() { block_verifier_router, MockSyncStatus::default(), mock_address_book, + None, ); // Call `get_peer_info` @@ -1372,6 +1374,7 @@ async fn rpc_getblockhash() { tower::ServiceBuilder::new().service(block_verifier_router), MockSyncStatus::default(), MockAddressBookPeers::default(), + None, ); // Query the hashes using positive indexes @@ -1428,6 +1431,7 @@ async fn rpc_getmininginfo() { MockService::build().for_unit_tests(), MockSyncStatus::default(), MockAddressBookPeers::default(), + None, ); get_block_template_rpc @@ -1464,6 +1468,7 @@ async fn rpc_getnetworksolps() { MockService::build().for_unit_tests(), MockSyncStatus::default(), MockAddressBookPeers::default(), + None, ); let get_network_sol_ps_inputs = [ @@ -1595,6 +1600,7 @@ async fn rpc_getblocktemplate_mining_address(use_p2pkh: bool) { block_verifier_router, mock_sync_status.clone(), MockAddressBookPeers::default(), + None, ); // Fake the ChainInfo response @@ -1870,6 +1876,7 @@ async fn rpc_submitblock_errors() { block_verifier_router, MockSyncStatus::default(), MockAddressBookPeers::default(), + None, ); // Try to submit pre-populated blocks and assert that it responds with duplicate. @@ -1922,6 +1929,7 @@ async fn rpc_validateaddress() { MockService::build().for_unit_tests(), MockSyncStatus::default(), MockAddressBookPeers::default(), + None, ); let validate_address = get_block_template_rpc @@ -1967,6 +1975,7 @@ async fn rpc_z_validateaddress() { MockService::build().for_unit_tests(), MockSyncStatus::default(), MockAddressBookPeers::default(), + None, ); let z_validate_address = get_block_template_rpc @@ -2055,6 +2064,7 @@ async fn rpc_getdifficulty() { block_verifier_router, mock_sync_status.clone(), MockAddressBookPeers::default(), + None, ); // Fake the ChainInfo response: smallest numeric difficulty @@ -2176,6 +2186,7 @@ async fn rpc_z_listunifiedreceivers() { MockService::build().for_unit_tests(), MockSyncStatus::default(), MockAddressBookPeers::default(), + None, ); // invalid address diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index 87c565f4a3e..3d54ab8ea54 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -12,7 +12,7 @@ use std::{fmt, panic}; use cookie::Cookie; use jsonrpsee::server::middleware::rpc::RpcServiceBuilder; use jsonrpsee::server::{Server, ServerHandle}; -use tokio::task::JoinHandle; +use tokio::{sync::watch, task::JoinHandle}; use tower::Service; use tracing::*; @@ -120,6 +120,8 @@ impl RpcServer { address_book: AddressBook, latest_chain_tip: Tip, network: Network, + #[cfg_attr(not(feature = "getblocktemplate-rpcs"), allow(unused_variables))] + mined_block_sender: Option>, ) -> Result<(ServerTask, JoinHandle<()>), tower::BoxError> where VersionString: ToString + Clone + Send + 'static, @@ -170,6 +172,7 @@ impl RpcServer { block_verifier_router, sync_status, address_book, + mined_block_sender, ); // Initialize the rpc methods with the zebra version diff --git a/zebra-rpc/src/server/error.rs b/zebra-rpc/src/server/error.rs index 835e3c4581c..cf54de4e8b2 100644 --- a/zebra-rpc/src/server/error.rs +++ b/zebra-rpc/src/server/error.rs @@ -69,6 +69,14 @@ pub(crate) trait MapError: Sized { /// Maps errors to [`jsonrpsee_types::ErrorObjectOwned`] with a specific error code. fn map_error(self, code: impl Into) -> std::result::Result; + /// Maps errors to [`jsonrpsee_types::ErrorObjectOwned`] with a prefixed message and a specific error code. + #[cfg(feature = "getblocktemplate-rpcs")] + fn map_error_with_prefix( + self, + code: impl Into, + msg_prefix: impl ToString, + ) -> Result; + /// Maps errors to [`jsonrpsee_types::ErrorObjectOwned`] with a [`LegacyCode::Misc`] error code. fn map_misc_error(self) -> std::result::Result { self.map_error(LegacyCode::Misc) @@ -98,6 +106,21 @@ where fn map_error(self, code: impl Into) -> Result { self.map_err(|error| ErrorObject::owned(code.into().code(), error.to_string(), None::<()>)) } + + #[cfg(feature = "getblocktemplate-rpcs")] + fn map_error_with_prefix( + self, + code: impl Into, + msg_prefix: impl ToString, + ) -> Result { + self.map_err(|error| { + ErrorObject::owned( + code.into().code(), + format!("{}: {}", msg_prefix.to_string(), error.to_string()), + None::<()>, + ) + }) + } } impl OkOrError for Option { diff --git a/zebra-rpc/src/server/tests/vectors.rs b/zebra-rpc/src/server/tests/vectors.rs index bf850661a09..6cb83f98326 100644 --- a/zebra-rpc/src/server/tests/vectors.rs +++ b/zebra-rpc/src/server/tests/vectors.rs @@ -56,6 +56,7 @@ async fn rpc_server_spawn() { MockAddressBookPeers::default(), NoChainTip, Mainnet, + None, ); info!("spawned RPC server, checking services..."); @@ -115,6 +116,7 @@ async fn rpc_spawn_unallocated_port(do_shutdown: bool) { MockAddressBookPeers::default(), NoChainTip, Mainnet, + None, ) .await .expect(""); @@ -170,6 +172,7 @@ async fn rpc_server_spawn_port_conflict() { MockAddressBookPeers::default(), NoChainTip, Mainnet, + None, ) .await; @@ -189,6 +192,7 @@ async fn rpc_server_spawn_port_conflict() { MockAddressBookPeers::default(), NoChainTip, Mainnet, + None, ) .await; diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 5d346e42c77..ba6ddce82c9 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -86,6 +86,9 @@ use zebra_chain::block::genesis::regtest_genesis_block; use zebra_consensus::{router::BackgroundTaskHandles, ParameterCheckpoint}; use zebra_rpc::server::RpcServer; +#[cfg(feature = "getblocktemplate-rpcs")] +use zebra_rpc::methods::get_block_template_rpcs::types::submit_block::SubmitBlockChannel; + use crate::{ application::{build_version, user_agent}, components::{ @@ -242,6 +245,10 @@ impl StartCmd { ); } + #[cfg(feature = "getblocktemplate-rpcs")] + // Create a channel to send mined blocks to the gossip task + let submit_block_channel = SubmitBlockChannel::new(); + // Launch RPC server let (rpc_task_handle, mut rpc_tx_queue_task_handle) = if let Some(listen_addr) = config.rpc.listen_addr { @@ -259,6 +266,10 @@ impl StartCmd { address_book.clone(), latest_chain_tip.clone(), config.network.network.clone(), + #[cfg(feature = "getblocktemplate-rpcs")] + Some(submit_block_channel.sender()), + #[cfg(not(feature = "getblocktemplate-rpcs"))] + None, ); rpc_task_handle.await.unwrap() } else { @@ -301,6 +312,10 @@ impl StartCmd { sync_status.clone(), chain_tip_change.clone(), peer_set.clone(), + #[cfg(feature = "getblocktemplate-rpcs")] + Some(submit_block_channel.receiver()), + #[cfg(not(feature = "getblocktemplate-rpcs"))] + None, ) .in_current_span(), ); @@ -382,6 +397,7 @@ impl StartCmd { #[cfg(feature = "internal-miner")] let miner_task_handle = if config.mining.is_internal_miner_enabled() { info!("spawning Zcash miner"); + let rpc = zebra_rpc::methods::get_block_template_rpcs::GetBlockTemplateRpcImpl::new( &config.network.network, config.mining.clone(), @@ -391,6 +407,7 @@ impl StartCmd { block_verifier_router, sync_status, address_book, + Some(submit_block_channel.sender()), ); crate::components::miner::spawn_init(&config.network.network, &config.mining, rpc) diff --git a/zebrad/src/components/inbound/tests/fake_peer_set.rs b/zebrad/src/components/inbound/tests/fake_peer_set.rs index 07402dafb50..f3c05315258 100644 --- a/zebrad/src/components/inbound/tests/fake_peer_set.rs +++ b/zebrad/src/components/inbound/tests/fake_peer_set.rs @@ -5,7 +5,7 @@ use std::{collections::HashSet, iter, net::SocketAddr, str::FromStr, sync::Arc, use futures::FutureExt; use tokio::{sync::oneshot, task::JoinHandle, time::timeout}; use tower::{buffer::Buffer, builder::ServiceBuilder, util::BoxService, Service, ServiceExt}; -use tracing::Span; +use tracing::{Instrument, Span}; use zebra_chain::{ amount::Amount, @@ -24,6 +24,8 @@ use zebra_network::{ AddressBook, InventoryResponse, Request, Response, }; use zebra_node_services::mempool; +#[cfg(feature = "getblocktemplate-rpcs")] +use zebra_rpc::methods::get_block_template_rpcs::types::submit_block::SubmitBlockChannel; use zebra_state::{ChainTipChange, Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT}; use zebra_test::mock_service::{MockService, PanicAssertion}; @@ -974,11 +976,20 @@ async fn setup( // Pretend we're close to tip SyncStatus::sync_close_to_tip(&mut recent_syncs); - let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( - sync_status.clone(), - chain_tip_change.clone(), - peer_set.clone(), - )); + #[cfg(feature = "getblocktemplate-rpcs")] + let submitblock_channel = SubmitBlockChannel::new(); + let sync_gossip_task_handle = tokio::spawn( + sync::gossip_best_tip_block_hashes( + sync_status.clone(), + chain_tip_change.clone(), + peer_set.clone(), + #[cfg(feature = "getblocktemplate-rpcs")] + Some(submitblock_channel.receiver()), + #[cfg(not(feature = "getblocktemplate-rpcs"))] + None, + ) + .in_current_span(), + ); let tx_gossip_task_handle = tokio::spawn(gossip_mempool_transaction_id( transaction_receiver, diff --git a/zebrad/src/components/inbound/tests/real_peer_set.rs b/zebrad/src/components/inbound/tests/real_peer_set.rs index e4c0f08659e..11211ccabed 100644 --- a/zebrad/src/components/inbound/tests/real_peer_set.rs +++ b/zebrad/src/components/inbound/tests/real_peer_set.rs @@ -21,6 +21,8 @@ use zebra_network::{ Config as NetworkConfig, InventoryResponse, PeerError, Request, Response, SharedPeerError, }; use zebra_node_services::mempool; +#[cfg(feature = "getblocktemplate-rpcs")] +use zebra_rpc::methods::get_block_template_rpcs::types::submit_block::SubmitBlockChannel; use zebra_state::Config as StateConfig; use zebra_test::mock_service::{MockService, PanicAssertion}; @@ -725,10 +727,17 @@ async fn setup( // We can't expect or unwrap because the returned Result does not implement Debug assert!(r.is_ok(), "unexpected setup channel send failure"); + #[cfg(feature = "getblocktemplate-rpcs")] + let submitblock_channel = SubmitBlockChannel::new(); + let block_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( sync_status.clone(), chain_tip_change, peer_set.clone(), + #[cfg(feature = "getblocktemplate-rpcs")] + Some(submitblock_channel.receiver()), + #[cfg(not(feature = "getblocktemplate-rpcs"))] + None, )); let tx_gossip_task_handle = tokio::spawn(gossip_mempool_transaction_id( @@ -782,3 +791,115 @@ async fn setup( listen_addr, ) } + +#[cfg(feature = "getblocktemplate-rpcs")] +mod submitblock_test { + use std::io; + use std::sync::{Arc, Mutex}; + use tracing::{Instrument, Level}; + use tracing_subscriber::fmt; + + use super::*; + + use crate::components::sync::PEER_GOSSIP_DELAY; + + // Custom in-memory writer to capture logs + struct TestWriter(Arc>>); + + impl io::Write for TestWriter { + #[allow(clippy::unwrap_in_result)] + fn write(&mut self, buf: &[u8]) -> io::Result { + let mut logs = self.0.lock().unwrap(); + logs.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn submitblock_channel() -> Result<(), crate::BoxError> { + let logs = Arc::new(Mutex::new(Vec::new())); + let log_sink = logs.clone(); + + // Set up a tracing subscriber with a custom writer + let subscriber = fmt() + .with_max_level(Level::INFO) + .with_writer(move || TestWriter(log_sink.clone())) // Write logs to an in-memory buffer + .finish(); + + let _guard = tracing::subscriber::set_default(subscriber); + + let (sync_status, _recent_syncs) = SyncStatus::new(); + + // State + let state_config = StateConfig::ephemeral(); + let (_state_service, _read_only_state_service, latest_chain_tip, chain_tip_change) = + zebra_state::init(state_config, &Network::Mainnet, Height::MAX, 0); + + let config_listen_addr = "127.0.0.1:0".parse().unwrap(); + + // Network + let network_config = NetworkConfig { + network: Network::Mainnet, + listen_addr: config_listen_addr, + + // Stop Zebra making outbound connections + initial_mainnet_peers: IndexSet::new(), + initial_testnet_peers: IndexSet::new(), + cache_dir: CacheDir::disabled(), + + ..NetworkConfig::default() + }; + + // Inbound + let (_setup_tx, setup_rx) = oneshot::channel(); + let inbound_service = Inbound::new(MAX_INBOUND_CONCURRENCY, setup_rx); + let inbound_service = ServiceBuilder::new() + .load_shed() + .buffer(10) + .service(BoxService::new(inbound_service)); + + let (peer_set, _address_book) = zebra_network::init( + network_config, + inbound_service.clone(), + latest_chain_tip.clone(), + "Zebra user agent".to_string(), + ) + .await; + + // Start the block gossip task with a SubmitBlockChannel + let submitblock_channel = SubmitBlockChannel::new(); + let gossip_task_handle = tokio::spawn( + sync::gossip_best_tip_block_hashes( + sync_status.clone(), + chain_tip_change, + peer_set.clone(), + Some(submitblock_channel.receiver()), + ) + .in_current_span(), + ); + + // Send a block top the channel + submitblock_channel + .sender() + .send((block::Hash([1; 32]), block::Height(1))) + .unwrap(); + + // Wait for the block gossip task to process the block + tokio::time::sleep(PEER_GOSSIP_DELAY).await; + + // Check that the block was processed as a mnined block by the gossip task + let captured_logs = logs.lock().unwrap(); + let log_output = String::from_utf8(captured_logs.clone()).unwrap(); + + assert!(log_output.contains("initializing block gossip task")); + assert!(log_output.contains("sending mined block broadcast")); + + std::mem::drop(gossip_task_handle); + + Ok(()) + } +} diff --git a/zebrad/src/components/sync/gossip.rs b/zebrad/src/components/sync/gossip.rs index 9cb02c6529f..5fbbbbd65ec 100644 --- a/zebrad/src/components/sync/gossip.rs +++ b/zebrad/src/components/sync/gossip.rs @@ -2,10 +2,13 @@ //! //! [`block::Hash`]: zebra_chain::block::Hash +use futures::TryFutureExt; use thiserror::Error; use tokio::sync::watch; use tower::{timeout::Timeout, Service, ServiceExt}; +use tracing::Instrument; +use zebra_chain::block; use zebra_network as zn; use zebra_state::ChainTipChange; @@ -43,9 +46,10 @@ pub enum BlockGossipError { /// /// [`block::Hash`]: zebra_chain::block::Hash pub async fn gossip_best_tip_block_hashes( - mut sync_status: SyncStatus, + sync_status: SyncStatus, mut chain_state: ChainTipChange, broadcast_network: ZN, + mut mined_block_receiver: Option>, ) -> Result<(), BlockGossipError> where ZN: Service + Send + Clone + 'static, @@ -58,27 +62,56 @@ where let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT); loop { - // wait for at least one tip change, to make sure we have a new block hash to broadcast - let tip_action = chain_state.wait_for_tip_change().await.map_err(TipChange)?; - - // wait until we're close to the tip, because broadcasts are only useful for nodes near the tip - // (if they're a long way from the tip, they use the syncer and block locators) - sync_status - .wait_until_close_to_tip() - .await - .map_err(SyncStatus)?; - - // get the latest tip change - it might be different to the change we awaited, - // because the syncer might take a long time to reach the tip - let tip_action = chain_state.last_tip_change().unwrap_or(tip_action); + let mut sync_status = sync_status.clone(); + let mut chain_tip = chain_state.clone(); + let tip_change_close_to_network_tip_fut = async move { + // wait for at least one tip change, to make sure we have a new block hash to broadcast + let tip_action = chain_tip.wait_for_tip_change().await.map_err(TipChange)?; + + // wait until we're close to the tip, because broadcasts are only useful for nodes near the tip + // (if they're a long way from the tip, they use the syncer and block locators), unless a mined block + // hash is received before `wait_until_close_to_tip()` is ready. + sync_status + .wait_until_close_to_tip() + .map_err(SyncStatus) + .await?; + + // get the latest tip change when close to tip - it might be different to the change we awaited, + // because the syncer might take a long time to reach the tip + let best_tip = chain_tip + .last_tip_change() + .unwrap_or(tip_action) + .best_tip_hash_and_height(); + + Ok((best_tip, "sending committed block broadcast", chain_tip)) + } + .in_current_span(); + + let ((hash, height), log_msg, updated_chain_state) = if let Some(mined_block_receiver) = + mined_block_receiver.as_mut() + { + tokio::select! { + tip_change_close_to_network_tip = tip_change_close_to_network_tip_fut => { + mined_block_receiver.mark_unchanged(); + tip_change_close_to_network_tip? + }, + + Ok(_) = mined_block_receiver.changed() => { + // we have a new block to broadcast from the `submitblock `RPC method, get block data and release the channel. + (*mined_block_receiver.borrow_and_update(), "sending mined block broadcast", chain_state) + } + } + } else { + tip_change_close_to_network_tip_fut.await? + }; + + chain_state = updated_chain_state; // block broadcasts inform other nodes about new blocks, // so our internal Grow or Reset state doesn't matter to them - let request = zn::Request::AdvertiseBlock(tip_action.best_tip_hash()); - - let height = tip_action.best_tip_height(); - debug!(?height, ?request, "sending committed block broadcast"); + let request = zn::Request::AdvertiseBlock(hash); + info!(?height, ?request, log_msg); // broadcast requests don't return errors, and we'd just want to ignore them anyway let _ = broadcast_network .ready() diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 584a28f7705..270a0e955c7 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -162,8 +162,8 @@ use color_eyre::{ }; use semver::Version; use serde_json::Value; - use tower::ServiceExt; + use zebra_chain::{ block::{self, genesis::regtest_genesis_block, Height}, parameters::Network::{self, *}, @@ -3270,8 +3270,10 @@ async fn nu6_funding_streams_and_coinbase_balance() -> Result<()> { fetch_state_tip_and_local_time, generate_coinbase_and_roots, proposal_block_from_template, GetBlockTemplate, GetBlockTemplateRequestMode, }, - types::get_block_template, - types::submit_block, + types::{ + get_block_template, + submit_block::{self, SubmitBlockChannel}, + }, }, hex_data::HexData, GetBlockTemplateRpcImpl, GetBlockTemplateRpcServer, @@ -3340,6 +3342,8 @@ async fn nu6_funding_streams_and_coinbase_balance() -> Result<()> { let mut mock_sync_status = MockSyncStatus::default(); mock_sync_status.set_is_close_to_tip(true); + let submitblock_channel = SubmitBlockChannel::new(); + let get_block_template_rpc_impl = GetBlockTemplateRpcImpl::new( &network, mining_config, @@ -3349,6 +3353,7 @@ async fn nu6_funding_streams_and_coinbase_balance() -> Result<()> { block_verifier_router, mock_sync_status, MockAddressBookPeers::default(), + Some(submitblock_channel.sender()), ); let make_mock_mempool_request_handler = || async move { @@ -3406,6 +3411,17 @@ async fn nu6_funding_streams_and_coinbase_balance() -> Result<()> { "valid block should be accepted" ); + // Check that the submitblock channel received the submitted block + let submit_block_channel_data = *submitblock_channel.receiver().borrow_and_update(); + assert_eq!( + submit_block_channel_data, + ( + proposal_block.hash(), + proposal_block.coinbase_height().unwrap() + ), + "submitblock channel should receive the submitted block" + ); + // Use an invalid coinbase transaction (with an output value greater than the `block_subsidy + miner_fees - expected_lockbox_funding_stream`) let make_configured_recipients_with_lockbox_numerator = |numerator| {