diff --git a/libtonode-tests/Cargo.toml b/libtonode-tests/Cargo.toml index 8308f020d..3b8a9bf40 100644 --- a/libtonode-tests/Cargo.toml +++ b/libtonode-tests/Cargo.toml @@ -6,14 +6,13 @@ edition = "2021" [features] chain_generic_tests = [] ci = ["zingolib/ci"] -sync = ["dep:zingo-sync"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] zingolib = { path = "../zingolib", features = [ "deprecations", "test-elevation" ] } zingo-status = { path = "../zingo-status" } zingo-netutils = { path = "../zingo-netutils" } -zingo-sync = { path = "../zingo-sync", optional = true } +zingo-sync = { path = "../zingo-sync" } testvectors = { path = "../testvectors" } bip0039.workspace = true diff --git a/libtonode-tests/tests/sync.rs b/libtonode-tests/tests/sync.rs index 6c1eb4326..823a06e51 100644 --- a/libtonode-tests/tests/sync.rs +++ b/libtonode-tests/tests/sync.rs @@ -1,7 +1,9 @@ +use std::time::Duration; + use tempfile::TempDir; use testvectors::seeds::HOSPITAL_MUSEUM_SEED; use zingo_netutils::GrpcConnector; -use zingo_sync::sync::sync; +use zingo_sync::sync::{self, sync}; use zingolib::{ config::{construct_lightwalletd_uri, load_clientconfig, DEFAULT_LIGHTWALLETD_SERVER}, get_base_address_macro, @@ -10,7 +12,7 @@ use zingolib::{ wallet::WalletBase, }; -#[ignore = "too slow, and flakey"] +#[ignore = "temporary mainnet test for sync development"] #[tokio::test] async fn sync_mainnet_test() { rustls::crypto::ring::default_provider() @@ -49,6 +51,54 @@ async fn sync_mainnet_test() { dbg!(&wallet.sync_state); } +#[ignore = "mainnet test for large chain"] +#[tokio::test] +async fn sync_status() { + rustls::crypto::ring::default_provider() + .install_default() + .expect("Ring to work as a default"); + tracing_subscriber::fmt().init(); + + let uri = construct_lightwalletd_uri(Some(DEFAULT_LIGHTWALLETD_SERVER.to_string())); + let temp_dir = TempDir::new().unwrap(); + let temp_path = temp_dir.path().to_path_buf(); + let config = load_clientconfig( + uri.clone(), + Some(temp_path), + zingolib::config::ChainType::Mainnet, + true, + ) + .unwrap(); + let lightclient = LightClient::create_from_wallet_base_async( + WalletBase::from_string(HOSPITAL_MUSEUM_SEED.to_string()), + &config, + 2_700_000, + true, + ) + .await + .unwrap(); + + let client = GrpcConnector::new(uri).get_client().await.unwrap(); + + let wallet = lightclient.wallet.clone(); + let sync_handle = tokio::spawn(async move { + sync(client, &config.chain, wallet).await.unwrap(); + }); + + let wallet = lightclient.wallet.clone(); + tokio::spawn(async move { + loop { + let wallet = wallet.clone(); + let sync_status = sync::sync_status(wallet).await; + dbg!(sync_status); + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); + + sync_handle.await.unwrap(); +} + +// temporary test for sync development #[ignore = "hangs"] #[tokio::test] async fn sync_test() { diff --git a/zingo-sync/src/client.rs b/zingo-sync/src/client.rs index add75abf7..30d083198 100644 --- a/zingo-sync/src/client.rs +++ b/zingo-sync/src/client.rs @@ -188,7 +188,7 @@ pub async fn get_transparent_address_transactions( pub async fn get_mempool_transaction_stream( client: &mut CompactTxStreamerClient, ) -> Result, ()> { - tracing::info!("Fetching mempool stream"); + tracing::debug!("Fetching mempool stream"); let mempool_stream = fetch::get_mempool_stream(client).await.unwrap(); Ok(mempool_stream) diff --git a/zingo-sync/src/client/fetch.rs b/zingo-sync/src/client/fetch.rs index 7770fab3c..0b8339227 100644 --- a/zingo-sync/src/client/fetch.rs +++ b/zingo-sync/src/client/fetch.rs @@ -103,17 +103,17 @@ async fn fetch_from_server( ) -> Result<(), ()> { match fetch_request { FetchRequest::ChainTip(sender) => { - tracing::info!("Fetching chain tip."); + tracing::debug!("Fetching chain tip."); let block_id = get_latest_block(client).await.unwrap(); sender.send(block_id).unwrap(); } FetchRequest::CompactBlockRange(sender, block_range) => { - tracing::info!("Fetching compact blocks. {:?}", &block_range); + tracing::debug!("Fetching compact blocks. {:?}", &block_range); let compact_blocks = get_block_range(client, block_range).await.unwrap(); sender.send(compact_blocks).unwrap(); } FetchRequest::GetSubtreeRoots(sender, start_index, shielded_protocol, max_entries) => { - tracing::info!( + tracing::debug!( "Fetching subtree roots. start index: {}. shielded protocol: {}", start_index, shielded_protocol @@ -124,19 +124,19 @@ async fn fetch_from_server( sender.send(shards).unwrap(); } FetchRequest::TreeState(sender, block_height) => { - tracing::info!("Fetching tree state. {:?}", &block_height); + tracing::debug!("Fetching tree state. {:?}", &block_height); let tree_state = get_tree_state(client, block_height).await.unwrap(); sender.send(tree_state).unwrap(); } FetchRequest::Transaction(sender, txid) => { - tracing::info!("Fetching transaction. {:?}", txid); + tracing::debug!("Fetching transaction. {:?}", txid); let transaction = get_transaction(client, consensus_parameters, txid) .await .unwrap(); sender.send(transaction).unwrap(); } FetchRequest::UtxoMetadata(sender, (addresses, start_height)) => { - tracing::info!( + tracing::debug!( "Fetching unspent transparent output metadata from {:?} for addresses:\n{:?}", &start_height, &addresses @@ -147,7 +147,7 @@ async fn fetch_from_server( sender.send(utxo_metadata).unwrap(); } FetchRequest::TransparentAddressTxs(sender, (address, block_range)) => { - tracing::info!( + tracing::debug!( "Fetching raw transactions in block range {:?} for address {:?}", &block_range, &address diff --git a/zingo-sync/src/primitives.rs b/zingo-sync/src/primitives.rs index 8cb75e046..4f584f967 100644 --- a/zingo-sync/src/primitives.rs +++ b/zingo-sync/src/primitives.rs @@ -25,8 +25,50 @@ use crate::{ /// detections or transparent output discovery. pub type Locator = (BlockHeight, TxId); +/// Initial sync state. +/// +/// All fields will be reset when a new sync session starts. +#[derive(Debug, Clone, CopyGetters, Setters)] +#[getset(get_copy = "pub", set = "pub")] +pub struct InitialSyncState { + /// One block above the fully scanned wallet height at start of sync session. + sync_start_height: BlockHeight, + /// The tree sizes of the fully scanned height and chain tip at start of sync session. + sync_tree_boundaries: TreeBoundaries, + /// Total number of blocks to scan. + total_blocks_to_scan: u32, + /// Total number of sapling outputs to scan. + total_sapling_outputs_to_scan: u32, + /// Total number of orchard outputs to scan. + total_orchard_outputs_to_scan: u32, +} + +impl InitialSyncState { + /// Create new InitialSyncState + pub fn new() -> Self { + InitialSyncState { + sync_start_height: 0.into(), + sync_tree_boundaries: TreeBoundaries { + sapling_initial_tree_size: 0, + sapling_final_tree_size: 0, + orchard_initial_tree_size: 0, + orchard_final_tree_size: 0, + }, + total_blocks_to_scan: 0, + total_sapling_outputs_to_scan: 0, + total_orchard_outputs_to_scan: 0, + } + } +} + +impl Default for InitialSyncState { + fn default() -> Self { + Self::new() + } +} + /// Encapsulates the current state of sync -#[derive(Debug, Getters, MutGetters)] +#[derive(Debug, Clone, Getters, MutGetters, CopyGetters, Setters)] #[getset(get = "pub", get_mut = "pub")] pub struct SyncState { /// A vec of block ranges with scan priorities from wallet birthday to chain tip. @@ -34,6 +76,8 @@ pub struct SyncState { scan_ranges: Vec, /// Locators for relevant transactions to the wallet. locators: BTreeSet, + /// Initial sync state. + initial_sync_state: InitialSyncState, } impl SyncState { @@ -42,6 +86,7 @@ impl SyncState { SyncState { scan_ranges: Vec::new(), locators: BTreeSet::new(), + initial_sync_state: InitialSyncState::new(), } } @@ -76,6 +121,30 @@ impl Default for SyncState { } } +#[derive(Debug, Clone, Copy)] +pub struct TreeBoundaries { + pub sapling_initial_tree_size: u32, + pub sapling_final_tree_size: u32, + pub orchard_initial_tree_size: u32, + pub orchard_final_tree_size: u32, +} + +/// A snapshot of the current state of sync. Useful for displaying the status of sync to a user / consumer. +/// +/// `percentage_outputs_scanned` is a much more accurate indicator of sync completion than `percentage_blocks_scanned`. +#[derive(Debug, Clone, Getters)] +pub struct SyncStatus { + pub scan_ranges: Vec, + pub scanned_blocks: u32, + pub unscanned_blocks: u32, + pub percentage_blocks_scanned: f32, + pub scanned_sapling_outputs: u32, + pub unscanned_sapling_outputs: u32, + pub scanned_orchard_outputs: u32, + pub unscanned_orchard_outputs: u32, + pub percentage_outputs_scanned: f32, +} + /// Output ID for a given pool type #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, CopyGetters)] #[getset(get_copy = "pub")] @@ -150,8 +219,7 @@ pub struct WalletBlock { time: u32, #[getset(skip)] txids: Vec, - sapling_commitment_tree_size: u32, - orchard_commitment_tree_size: u32, + tree_boundaries: TreeBoundaries, } impl WalletBlock { @@ -161,8 +229,7 @@ impl WalletBlock { prev_hash: BlockHash, time: u32, txids: Vec, - sapling_commitment_tree_size: u32, - orchard_commitment_tree_size: u32, + tree_boundaries: TreeBoundaries, ) -> Self { Self { block_height, @@ -170,8 +237,7 @@ impl WalletBlock { prev_hash, time, txids, - sapling_commitment_tree_size, - orchard_commitment_tree_size, + tree_boundaries, } } diff --git a/zingo-sync/src/scan.rs b/zingo-sync/src/scan.rs index 1a3c8e4b4..0abf75e07 100644 --- a/zingo-sync/src/scan.rs +++ b/zingo-sync/src/scan.rs @@ -50,59 +50,68 @@ impl InitialScanData { // gets initial tree size from previous block if available // otherwise, from first block if available // otherwise, fetches frontiers from server - let (sapling_initial_tree_size, orchard_initial_tree_size) = - if let Some(prev) = &previous_wallet_block { - ( - prev.sapling_commitment_tree_size(), - prev.orchard_commitment_tree_size(), - ) - } else if let Some(chain_metadata) = &first_block.chain_metadata { - // calculate initial tree size by subtracting number of outputs in block from the blocks final tree size - let sapling_output_count: u32 = first_block - .vtx - .iter() - .map(|tx| tx.outputs.len()) - .sum::() - .try_into() - .expect("Sapling output count cannot exceed a u32"); - let orchard_output_count: u32 = first_block - .vtx - .iter() - .map(|tx| tx.actions.len()) - .sum::() - .try_into() - .expect("Sapling output count cannot exceed a u32"); - - ( - chain_metadata - .sapling_commitment_tree_size - .checked_sub(sapling_output_count) - .unwrap(), - chain_metadata - .orchard_commitment_tree_size - .checked_sub(orchard_output_count) - .unwrap(), - ) - } else { - let sapling_activation_height = consensus_parameters - .activation_height(NetworkUpgrade::Sapling) - .expect("should have some sapling activation height"); - - match first_block.height().cmp(&sapling_activation_height) { - cmp::Ordering::Greater => { - let frontiers = - client::get_frontiers(fetch_request_sender, first_block.height() - 1) - .await - .unwrap(); - ( - frontiers.final_sapling_tree().tree_size() as u32, - frontiers.final_orchard_tree().tree_size() as u32, - ) - } - cmp::Ordering::Equal => (0, 0), - cmp::Ordering::Less => panic!("pre-sapling not supported!"), + let (sapling_initial_tree_size, orchard_initial_tree_size) = if let Some(prev) = + &previous_wallet_block + { + ( + prev.tree_boundaries().sapling_final_tree_size, + prev.tree_boundaries().orchard_final_tree_size, + ) + } else if let Some(chain_metadata) = &first_block.chain_metadata { + // calculate initial tree size by subtracting number of outputs in block from the blocks final tree size + let sapling_output_count: u32 = first_block + .vtx + .iter() + .map(|tx| tx.outputs.len()) + .sum::() + .try_into() + .expect("Sapling output count cannot exceed a u32"); + let orchard_output_count: u32 = first_block + .vtx + .iter() + .map(|tx| tx.actions.len()) + .sum::() + .try_into() + .expect("Sapling output count cannot exceed a u32"); + + ( + chain_metadata + .sapling_commitment_tree_size + .checked_sub(sapling_output_count) + .unwrap(), + chain_metadata + .orchard_commitment_tree_size + .checked_sub(orchard_output_count) + .unwrap(), + ) + } else { + let sapling_activation_height = consensus_parameters + .activation_height(NetworkUpgrade::Sapling) + .expect("should have some sapling activation height"); + + match first_block.height().cmp(&sapling_activation_height) { + cmp::Ordering::Greater => { + let frontiers = + client::get_frontiers(fetch_request_sender, first_block.height() - 1) + .await + .unwrap(); + ( + frontiers + .final_sapling_tree() + .tree_size() + .try_into() + .expect("should not be more than 2^32 note commitments in the tree!"), + frontiers + .final_orchard_tree() + .tree_size() + .try_into() + .expect("should not be more than 2^32 note commitments in the tree!"), + ) } - }; + cmp::Ordering::Equal => (0, 0), + cmp::Ordering::Less => panic!("pre-sapling not supported!"), + } + }; Ok(InitialScanData { previous_block: previous_wallet_block, diff --git a/zingo-sync/src/scan/compact_blocks.rs b/zingo-sync/src/scan/compact_blocks.rs index 2d724c394..fcd2497ff 100644 --- a/zingo-sync/src/scan/compact_blocks.rs +++ b/zingo-sync/src/scan/compact_blocks.rs @@ -17,7 +17,7 @@ use zcash_primitives::{ use crate::{ keys::{KeyId, ScanningKeyOps, ScanningKeys}, - primitives::{NullifierMap, OutputId, WalletBlock}, + primitives::{NullifierMap, OutputId, TreeBoundaries, WalletBlock}, witness::WitnessData, }; @@ -55,9 +55,14 @@ where Position::from(u64::from(initial_scan_data.sapling_initial_tree_size)), Position::from(u64::from(initial_scan_data.orchard_initial_tree_size)), ); - let mut sapling_tree_size = initial_scan_data.sapling_initial_tree_size; - let mut orchard_tree_size = initial_scan_data.orchard_initial_tree_size; + let mut sapling_initial_tree_size; + let mut orchard_initial_tree_size; + let mut sapling_final_tree_size = initial_scan_data.sapling_initial_tree_size; + let mut orchard_final_tree_size = initial_scan_data.orchard_initial_tree_size; for block in &compact_blocks { + sapling_initial_tree_size = sapling_final_tree_size; + orchard_initial_tree_size = orchard_final_tree_size; + let mut transactions = block.vtx.iter().peekable(); while let Some(transaction) = transactions.next() { // collect trial decryption results by transaction @@ -100,21 +105,21 @@ where ); calculate_nullifiers_and_positions( - sapling_tree_size, + sapling_final_tree_size, scanning_keys.sapling(), &incoming_sapling_outputs, &mut decrypted_note_data.sapling_nullifiers_and_positions, ); calculate_nullifiers_and_positions( - orchard_tree_size, + orchard_final_tree_size, scanning_keys.orchard(), &incoming_orchard_outputs, &mut decrypted_note_data.orchard_nullifiers_and_positions, ); - sapling_tree_size += u32::try_from(transaction.outputs.len()) + sapling_final_tree_size += u32::try_from(transaction.outputs.len()) .expect("should not be more than 2^32 outputs in a transaction"); - orchard_tree_size += u32::try_from(transaction.actions.len()) + orchard_final_tree_size += u32::try_from(transaction.actions.len()) .expect("should not be more than 2^32 outputs in a transaction"); } @@ -124,8 +129,12 @@ where block.prev_hash(), block.time, block.vtx.iter().map(|tx| tx.txid()).collect(), - sapling_tree_size, - orchard_tree_size, + TreeBoundaries { + sapling_initial_tree_size, + sapling_final_tree_size, + orchard_initial_tree_size, + orchard_final_tree_size, + }, ); check_tree_size(block, &wallet_block).unwrap(); @@ -204,12 +213,12 @@ fn check_continuity( fn check_tree_size(compact_block: &CompactBlock, wallet_block: &WalletBlock) -> Result<(), ()> { if let Some(chain_metadata) = &compact_block.chain_metadata { if chain_metadata.sapling_commitment_tree_size - != wallet_block.sapling_commitment_tree_size() + != wallet_block.tree_boundaries().sapling_final_tree_size { panic!("sapling tree size is incorrect!") } if chain_metadata.orchard_commitment_tree_size - != wallet_block.orchard_commitment_tree_size() + != wallet_block.tree_boundaries().orchard_final_tree_size { panic!("orchard tree size is incorrect!") } diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index ef3cecfdb..e4d6f8866 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -88,7 +88,7 @@ where /// /// When the worker is running it will wait for a scan task. pub(crate) fn spawn_worker(&mut self) { - tracing::info!("Spawning worker {}", self.unique_id); + tracing::debug!("Spawning worker {}", self.unique_id); let mut worker = ScanWorker::new( self.unique_id, self.consensus_parameters.clone(), @@ -286,7 +286,7 @@ where } fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { - tracing::info!("Adding scan task to worker {}:\n{:#?}", self.id, &scan_task); + tracing::debug!("Adding scan task to worker {}:\n{:#?}", self.id, &scan_task); self.scan_task_sender .clone() .unwrap() @@ -301,7 +301,7 @@ where /// /// This should always be called in the context of the scanner as it must be also be removed from the worker pool. async fn shutdown(&mut self) -> Result<(), JoinError> { - tracing::info!("Shutting down worker {}", self.id); + tracing::debug!("Shutting down worker {}", self.id); if let Some(sender) = self.scan_task_sender.take() { drop(sender); } diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 62cbc7abf..69f5f4ada 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -8,7 +8,7 @@ use std::time::Duration; use crate::client::{self, FetchRequest}; use crate::error::SyncError; use crate::keys::transparent::TransparentAddressId; -use crate::primitives::{NullifierMap, OutPointMap}; +use crate::primitives::{NullifierMap, OutPointMap, SyncStatus}; use crate::scan::error::{ContinuityError, ScanError}; use crate::scan::task::Scanner; use crate::scan::transactions::scan_transaction; @@ -66,8 +66,18 @@ where .await }); - let mut wallet_lock = wallet.lock().await; - let wallet_height = state::get_wallet_height(consensus_parameters, &*wallet_lock).unwrap(); + // create channel for receiving mempool transactions and launch mempool monitor + let (mempool_transaction_sender, mut mempool_transaction_receiver) = mpsc::channel(10); + let shutdown_mempool = Arc::new(AtomicBool::new(false)); + let shutdown_mempool_clone = shutdown_mempool.clone(); + let mempool_handle = tokio::spawn(async move { + mempool_monitor(client, mempool_transaction_sender, shutdown_mempool_clone).await + }); + + // pre-scan initialisation + let mut wallet_guard = wallet.lock().await; + + let mut wallet_height = state::get_wallet_height(consensus_parameters, &*wallet_guard).unwrap(); let chain_height = client::get_chain_height(fetch_request_sender.clone()) .await .unwrap(); @@ -78,21 +88,15 @@ where MAX_VERIFICATION_WINDOW ); } - truncate_wallet_data(&mut *wallet_lock, chain_height).unwrap(); + truncate_wallet_data(&mut *wallet_guard, chain_height).unwrap(); + wallet_height = chain_height; } - let ufvks = wallet_lock.get_unified_full_viewing_keys().unwrap(); - // create channel for receiving mempool transactions and launch mempool monitor - let (mempool_transaction_sender, mut mempool_transaction_receiver) = mpsc::channel(10); - let shutdown_mempool = Arc::new(AtomicBool::new(false)); - let shutdown_mempool_clone = shutdown_mempool.clone(); - let mempool_handle = tokio::spawn(async move { - mempool_monitor(client, mempool_transaction_sender, shutdown_mempool_clone).await - }); + let ufvks = wallet_guard.get_unified_full_viewing_keys().unwrap(); transparent::update_addresses_and_locators( consensus_parameters, - &mut *wallet_lock, + &mut *wallet_guard, fetch_request_sender.clone(), &ufvks, wallet_height, @@ -103,14 +107,22 @@ where state::update_scan_ranges( wallet_height, chain_height, - wallet_lock.get_sync_state_mut().unwrap(), + wallet_guard.get_sync_state_mut().unwrap(), ) .await .unwrap(); - update_subtree_roots(fetch_request_sender.clone(), &mut *wallet_lock).await; + state::set_initial_state( + consensus_parameters, + fetch_request_sender.clone(), + &mut *wallet_guard, + chain_height, + ) + .await; - drop(wallet_lock); + update_subtree_roots(fetch_request_sender.clone(), &mut *wallet_guard).await; + + drop(wallet_guard); // create channel for receiving scan results and launch scanner let (scan_results_sender, mut scan_results_receiver) = mpsc::unbounded_channel(); @@ -122,18 +134,17 @@ where ); scanner.spawn_workers(); - // TODO: consider what happens when there is no verification range i.e. all ranges already scanned // TODO: invalidate any pending transactions after eviction height (40 below best chain height?) // TODO: implement an option for continuous scanning where it doesnt exit when complete - let mut wallet_lock = wallet.lock().await; + let mut wallet_guard = wallet.lock().await; let mut interval = tokio::time::interval(Duration::from_millis(30)); loop { tokio::select! { Some((scan_range, scan_results)) = scan_results_receiver.recv() => { process_scan_results( consensus_parameters, - &mut *wallet_lock, + &mut *wallet_guard, fetch_request_sender.clone(), &ufvks, scan_range, @@ -143,24 +154,24 @@ where .unwrap(); // allow tasks outside the sync engine access to the wallet data - drop(wallet_lock); - wallet_lock = wallet.lock().await; + drop(wallet_guard); + wallet_guard = wallet.lock().await; } Some(raw_transaction) = mempool_transaction_receiver.recv() => { process_mempool_transaction( consensus_parameters, &ufvks, - &mut *wallet_lock, + &mut *wallet_guard, raw_transaction, ) .await; } _update_scanner = interval.tick() => { - scanner.update(&mut *wallet_lock, shutdown_mempool.clone()).await; + scanner.update(&mut *wallet_guard, shutdown_mempool.clone()).await; - if sync_complete(&scanner, &scan_results_receiver, &*wallet_lock) { + if sync_complete(&scanner, &scan_results_receiver, &*wallet_guard) { tracing::info!("Sync complete."); break; } @@ -168,7 +179,9 @@ where } } - drop(wallet_lock); + // TODO: clear locators + + drop(wallet_guard); drop(scanner); drop(fetch_request_sender); mempool_handle.await.unwrap().unwrap(); @@ -177,6 +190,65 @@ where Ok(()) } +/// Obtains the mutex guard to the wallet and creates a [`crate::primitives::SyncStatus`] from the wallet's current +/// [`crate::primitives::SyncState`]. +/// +/// Designed to be called during the sync process with minimal interruption. +pub async fn sync_status(wallet: Arc>) -> SyncStatus +where + W: SyncWallet + SyncBlocks, +{ + let wallet_guard = wallet.lock().await; + let sync_state = wallet_guard.get_sync_state().unwrap().clone(); + + let unscanned_blocks = sync_state + .scan_ranges() + .iter() + .filter(|scan_range| scan_range.priority() != ScanPriority::Scanned) + .map(|scan_range| scan_range.block_range()) + .fold(0, |acc, block_range| { + acc + (block_range.end - block_range.start) + }); + let scanned_blocks = sync_state + .initial_sync_state() + .total_blocks_to_scan() + .saturating_sub(unscanned_blocks); + let percentage_blocks_scanned = (scanned_blocks as f32 + / sync_state.initial_sync_state().total_blocks_to_scan() as f32) + * 100.0; + + let (unscanned_sapling_outputs, unscanned_orchard_outputs) = + state::calculate_unscanned_outputs(&*wallet_guard); + let scanned_sapling_outputs = sync_state + .initial_sync_state() + .total_sapling_outputs_to_scan() + .saturating_sub(unscanned_sapling_outputs); + let scanned_orchard_outputs = sync_state + .initial_sync_state() + .total_orchard_outputs_to_scan() + .saturating_sub(unscanned_orchard_outputs); + let percentage_outputs_scanned = ((scanned_sapling_outputs + scanned_orchard_outputs) as f32 + / (sync_state + .initial_sync_state() + .total_sapling_outputs_to_scan() + + sync_state + .initial_sync_state() + .total_orchard_outputs_to_scan()) as f32) + * 100.0; + + SyncStatus { + scan_ranges: sync_state.scan_ranges().clone(), + scanned_blocks, + unscanned_blocks, + percentage_blocks_scanned, + scanned_sapling_outputs, + unscanned_sapling_outputs, + scanned_orchard_outputs, + unscanned_orchard_outputs, + percentage_outputs_scanned, + } +} + async fn update_subtree_roots( fetch_request_sender: mpsc::UnboundedSender, wallet: &mut W, @@ -300,7 +372,7 @@ where ScanPriority::Scanned, ) .unwrap(); - tracing::info!("Scan results processed."); + tracing::debug!("Scan results processed."); } Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => { tracing::info!("Re-org detected."); @@ -347,7 +419,7 @@ async fn process_mempool_transaction( ) .unwrap(); - tracing::info!( + tracing::debug!( "mempool received txid {} at height {}", transaction.txid(), block_height diff --git a/zingo-sync/src/sync/state.rs b/zingo-sync/src/sync/state.rs index 4a2f80afc..783c0e4e1 100644 --- a/zingo-sync/src/sync/state.rs +++ b/zingo-sync/src/sync/state.rs @@ -2,15 +2,17 @@ use std::{cmp, collections::HashMap, ops::Range}; +use tokio::sync::mpsc; use zcash_client_backend::data_api::scanning::{ScanPriority, ScanRange}; use zcash_primitives::{ - consensus::{self, BlockHeight}, + consensus::{self, BlockHeight, NetworkUpgrade}, transaction::TxId, }; use crate::{ + client::{self, FetchRequest}, keys::transparent::TransparentAddressId, - primitives::{Locator, SyncState}, + primitives::{Locator, SyncState, TreeBoundaries}, scan::task::ScanTask, traits::{SyncBlocks, SyncWallet}, }; @@ -427,3 +429,176 @@ where Ok(None) } } + +/// Sets the `initial_sync_state` field at the start of the sync session +pub(super) async fn set_initial_state( + consensus_parameters: &impl consensus::Parameters, + fetch_request_sender: mpsc::UnboundedSender, + wallet: &mut W, + chain_height: BlockHeight, +) where + W: SyncWallet + SyncBlocks, +{ + let fully_scanned_height = wallet.get_sync_state().unwrap().fully_scanned_height(); + let (sync_start_sapling_tree_size, sync_start_orchard_tree_size) = final_tree_sizes( + consensus_parameters, + fetch_request_sender.clone(), + wallet, + fully_scanned_height, + ) + .await; + let (chain_tip_sapling_tree_size, chain_tip_orchard_tree_size) = final_tree_sizes( + consensus_parameters, + fetch_request_sender.clone(), + wallet, + chain_height, + ) + .await; + + let initial_sync_state = wallet + .get_sync_state_mut() + .unwrap() + .initial_sync_state_mut(); + initial_sync_state.set_sync_start_height(fully_scanned_height + 1); + initial_sync_state.set_sync_tree_boundaries(TreeBoundaries { + sapling_initial_tree_size: sync_start_sapling_tree_size, + sapling_final_tree_size: chain_tip_sapling_tree_size, + orchard_initial_tree_size: sync_start_orchard_tree_size, + orchard_final_tree_size: chain_tip_orchard_tree_size, + }); + + let (total_sapling_outputs_to_scan, total_orchard_outputs_to_scan) = + calculate_unscanned_outputs(wallet); + + let sync_state = wallet.get_sync_state_mut().unwrap(); + let total_blocks_to_scan = sync_state + .scan_ranges() + .iter() + .filter(|scan_range| scan_range.priority() != ScanPriority::Scanned) + .map(|scan_range| scan_range.block_range()) + .fold(0, |acc, block_range| { + acc + (block_range.end - block_range.start) + }); + + let initial_sync_state = sync_state.initial_sync_state_mut(); + initial_sync_state.set_total_blocks_to_scan(total_blocks_to_scan); + initial_sync_state.set_total_sapling_outputs_to_scan(total_sapling_outputs_to_scan); + initial_sync_state.set_total_orchard_outputs_to_scan(total_orchard_outputs_to_scan); +} + +pub(super) fn calculate_unscanned_outputs(wallet: &W) -> (u32, u32) +where + W: SyncWallet + SyncBlocks, +{ + let sync_state = wallet.get_sync_state().unwrap(); + let sync_start_height = sync_state.initial_sync_state().sync_start_height(); + + let nonlinear_scanned_block_ranges = sync_state + .scan_ranges() + .iter() + .filter(|scan_range| { + scan_range.priority() == ScanPriority::Scanned + && scan_range.block_range().start >= sync_start_height + }) + .map(|scan_range| scan_range.block_range().clone()) + .collect::>(); + let (nonlinear_scanned_sapling_outputs, nonlinear_scanned_orchard_outputs) = + nonlinear_scanned_block_ranges + .iter() + .map(|block_range| scanned_range_tree_boundaries(wallet, block_range.clone())) + .fold((0, 0), |acc, tree_boundaries| { + ( + acc.0 + + (tree_boundaries.sapling_final_tree_size + - tree_boundaries.sapling_initial_tree_size), + acc.1 + + (tree_boundaries.orchard_final_tree_size + - tree_boundaries.orchard_initial_tree_size), + ) + }); + + let initial_sync_state = wallet.get_sync_state().unwrap().initial_sync_state(); + let unscanned_sapling_outputs = initial_sync_state + .sync_tree_boundaries() + .sapling_final_tree_size + - initial_sync_state + .sync_tree_boundaries() + .sapling_initial_tree_size + - nonlinear_scanned_sapling_outputs; + let unscanned_orchard_outputs = initial_sync_state + .sync_tree_boundaries() + .orchard_final_tree_size + - initial_sync_state + .sync_tree_boundaries() + .orchard_initial_tree_size + - nonlinear_scanned_orchard_outputs; + + (unscanned_sapling_outputs, unscanned_orchard_outputs) +} + +/// Gets `block_height` final tree sizes from wallet block if it exists, otherwise from frontiers fetched from server. +async fn final_tree_sizes( + consensus_parameters: &impl consensus::Parameters, + fetch_request_sender: mpsc::UnboundedSender, + wallet: &mut W, + block_height: BlockHeight, +) -> (u32, u32) +where + W: SyncBlocks, +{ + if let Ok(block) = wallet.get_wallet_block(block_height) { + ( + block.tree_boundaries().sapling_final_tree_size, + block.tree_boundaries().orchard_final_tree_size, + ) + } else { + // TODO: move this whole block into `client::get_frontiers` + let sapling_activation_height = consensus_parameters + .activation_height(NetworkUpgrade::Sapling) + .expect("should have some sapling activation height"); + + match block_height.cmp(&(sapling_activation_height - 1)) { + cmp::Ordering::Greater => { + let frontiers = client::get_frontiers(fetch_request_sender.clone(), block_height) + .await + .unwrap(); + ( + frontiers + .final_sapling_tree() + .tree_size() + .try_into() + .expect("should not be more than 2^32 note commitments in the tree!"), + frontiers + .final_orchard_tree() + .tree_size() + .try_into() + .expect("should not be more than 2^32 note commitments in the tree!"), + ) + } + cmp::Ordering::Equal => (0, 0), + cmp::Ordering::Less => panic!("pre-sapling not supported!"), + } + } +} + +/// Gets the initial and final tree sizes of a `scanned_range`. +/// +/// Panics if `scanned_range` boundary wallet blocks are not found in the wallet. +fn scanned_range_tree_boundaries(wallet: &W, scanned_range: Range) -> TreeBoundaries +where + W: SyncBlocks, +{ + let start_block = wallet + .get_wallet_block(scanned_range.start) + .expect("scanned range boundary blocks should be retained in the wallet"); + let end_block = wallet + .get_wallet_block(scanned_range.end - 1) + .expect("scanned range boundary blocks should be retained in the wallet"); + + TreeBoundaries { + sapling_initial_tree_size: start_block.tree_boundaries().sapling_initial_tree_size, + sapling_final_tree_size: end_block.tree_boundaries().sapling_final_tree_size, + orchard_initial_tree_size: start_block.tree_boundaries().orchard_initial_tree_size, + orchard_final_tree_size: end_block.tree_boundaries().orchard_final_tree_size, + } +}