diff --git a/zingo-sync/src/client.rs b/zingo-sync/src/client.rs index ce94ac0af..5acc2282e 100644 --- a/zingo-sync/src/client.rs +++ b/zingo-sync/src/client.rs @@ -29,7 +29,10 @@ pub enum FetchRequest { /// Gets the height of the blockchain from the server. ChainTip(oneshot::Sender), /// Gets the specified range of compact blocks from the server (end exclusive). - CompactBlockRange(oneshot::Sender>, Range), + CompactBlockRange( + oneshot::Sender>, + Range, + ), /// Gets the tree states for a specified block height. TreeState(oneshot::Sender, BlockHeight), /// Get a full transaction by txid. @@ -74,14 +77,14 @@ pub async fn get_chain_height( pub async fn get_compact_block_range( fetch_request_sender: UnboundedSender, block_range: Range, -) -> Result, ()> { +) -> Result, ()> { let (reply_sender, reply_receiver) = oneshot::channel(); fetch_request_sender .send(FetchRequest::CompactBlockRange(reply_sender, block_range)) .unwrap(); - let compact_blocks = reply_receiver.await.unwrap(); + let block_stream = reply_receiver.await.unwrap(); - Ok(compact_blocks) + Ok(block_stream) } /// Gets the stream of shards (subtree roots) diff --git a/zingo-sync/src/client/fetch.rs b/zingo-sync/src/client/fetch.rs index 0b8339227..61d9bdc08 100644 --- a/zingo-sync/src/client/fetch.rs +++ b/zingo-sync/src/client/fetch.rs @@ -109,8 +109,8 @@ async fn fetch_from_server( } FetchRequest::CompactBlockRange(sender, block_range) => { tracing::debug!("Fetching compact blocks. {:?}", &block_range); - let compact_blocks = get_block_range(client, block_range).await.unwrap(); - sender.send(compact_blocks).unwrap(); + let block_stream = get_block_range(client, block_range).await.unwrap(); + sender.send(block_stream).unwrap(); } FetchRequest::GetSubtreeRoots(sender, start_index, shielded_protocol, max_entries) => { tracing::debug!( @@ -169,13 +169,11 @@ async fn get_latest_block( Ok(client.get_latest_block(request).await.unwrap().into_inner()) } + async fn get_block_range( client: &mut CompactTxStreamerClient, block_range: Range, -) -> Result, ()> { - let mut compact_blocks: Vec = - Vec::with_capacity(u64::from(block_range.end - block_range.start) as usize); - +) -> Result, ()> { let request = tonic::Request::new(BlockRange { start: Some(BlockId { height: u64::from(block_range.start), @@ -186,13 +184,8 @@ async fn get_block_range( hash: vec![], }), }); - let mut block_stream = client.get_block_range(request).await.unwrap().into_inner(); - - while let Some(compact_block) = block_stream.message().await.unwrap() { - compact_blocks.push(compact_block); - } - Ok(compact_blocks) + Ok(client.get_block_range(request).await.unwrap().into_inner()) } async fn get_subtree_roots( @@ -206,12 +199,14 @@ async fn get_subtree_roots( shielded_protocol, max_entries, }; + Ok(client .get_subtree_roots(request) .await .unwrap() .into_inner()) } + async fn get_tree_state( client: &mut CompactTxStreamerClient, block_height: BlockHeight, diff --git a/zingo-sync/src/primitives.rs b/zingo-sync/src/primitives.rs index 61009a7a5..463f3405b 100644 --- a/zingo-sync/src/primitives.rs +++ b/zingo-sync/src/primitives.rs @@ -131,7 +131,28 @@ impl SyncState { } } + /// Returns the highest block height that has been scanned. + /// + /// If no scan ranges have been scanned, returns the block below the wallet birthday. + /// Will panic if called before scan ranges are updated for the first time. + pub fn highest_scanned_height(&self) -> BlockHeight { + if let Some(last_scanned_range) = self + .scan_ranges() + .iter() + .filter(|scan_range| scan_range.priority() == ScanPriority::Scanned) + .last() + { + last_scanned_range.block_range().end - 1 + } else { + self.wallet_birthday() + .expect("scan ranges always non-empty") + - 1 + } + } + /// Returns the wallet birthday or `None` if `self.scan_ranges` is empty. + /// + /// If the wallet birthday is below the sapling activation height, returns the sapling activation height instead. pub fn wallet_birthday(&self) -> Option { self.scan_ranges() .first() diff --git a/zingo-sync/src/scan.rs b/zingo-sync/src/scan.rs index 2d2b8cc40..1b6c68ffc 100644 --- a/zingo-sync/src/scan.rs +++ b/zingo-sync/src/scan.rs @@ -1,23 +1,20 @@ -use std::{ - cmp, - collections::{BTreeMap, BTreeSet, HashMap}, -}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use orchard::tree::MerkleHashOrchard; +use task::ScanTask; use tokio::sync::mpsc; use incrementalmerkletree::Position; -use zcash_client_backend::{data_api::scanning::ScanRange, proto::compact_formats::CompactBlock}; +use zcash_client_backend::proto::compact_formats::CompactBlock; use zcash_keys::keys::UnifiedFullViewingKey; use zcash_primitives::{ - consensus::{BlockHeight, NetworkUpgrade, Parameters}, + consensus::{self, BlockHeight}, transaction::TxId, zip32::AccountId, }; use crate::{ - client::{self, FetchRequest}, - keys::transparent::TransparentAddressId, + client::FetchRequest, primitives::{Locator, NullifierMap, OutPointMap, OutputId, WalletBlock, WalletTransaction}, witness::{self, LocatedTreeData, WitnessData}, }; @@ -32,7 +29,8 @@ pub(crate) mod task; pub(crate) mod transactions; struct InitialScanData { - previous_block: Option, + start_seam_block: Option, + end_seam_block: Option, sapling_initial_tree_size: u32, orchard_initial_tree_size: u32, } @@ -42,79 +40,35 @@ impl InitialScanData { fetch_request_sender: mpsc::UnboundedSender, consensus_parameters: &P, first_block: &CompactBlock, - previous_wallet_block: Option, + start_seam_block: Option, + end_seam_block: Option, ) -> Result where - P: Parameters + Sync + Send + 'static, + P: consensus::Parameters + Sync + Send + 'static, { - // gets initial tree size from previous block if available - // otherwise, from first block if available - // otherwise, fetches frontiers from server - let (sapling_initial_tree_size, orchard_initial_tree_size) = if let Some(prev) = - &previous_wallet_block - { - ( - prev.tree_boundaries().sapling_final_tree_size, - prev.tree_boundaries().orchard_final_tree_size, - ) - } else if let Some(chain_metadata) = &first_block.chain_metadata { - // calculate initial tree size by subtracting number of outputs in block from the blocks final tree size - let sapling_output_count: u32 = first_block - .vtx - .iter() - .map(|tx| tx.outputs.len()) - .sum::() - .try_into() - .expect("Sapling output count cannot exceed a u32"); - let orchard_output_count: u32 = first_block - .vtx - .iter() - .map(|tx| tx.actions.len()) - .sum::() - .try_into() - .expect("Sapling output count cannot exceed a u32"); - - ( - chain_metadata - .sapling_commitment_tree_size - .checked_sub(sapling_output_count) - .unwrap(), - chain_metadata - .orchard_commitment_tree_size - .checked_sub(orchard_output_count) - .unwrap(), - ) - } else { - let sapling_activation_height = consensus_parameters - .activation_height(NetworkUpgrade::Sapling) - .expect("should have some sapling activation height"); - - match first_block.height().cmp(&sapling_activation_height) { - cmp::Ordering::Greater => { - let frontiers = - client::get_frontiers(fetch_request_sender, first_block.height() - 1) - .await - .unwrap(); - ( - frontiers - .final_sapling_tree() - .tree_size() - .try_into() - .expect("should not be more than 2^32 note commitments in the tree!"), - frontiers - .final_orchard_tree() - .tree_size() - .try_into() - .expect("should not be more than 2^32 note commitments in the tree!"), - ) - } - cmp::Ordering::Equal => (0, 0), - cmp::Ordering::Less => panic!("pre-sapling not supported!"), - } - }; + let (sapling_initial_tree_size, orchard_initial_tree_size) = + if let Some(prev) = &start_seam_block { + ( + prev.tree_boundaries().sapling_final_tree_size, + prev.tree_boundaries().orchard_final_tree_size, + ) + } else { + let tree_boundaries = compact_blocks::calculate_block_tree_boundaries( + consensus_parameters, + fetch_request_sender, + first_block, + ) + .await; + + ( + tree_boundaries.sapling_initial_tree_size, + tree_boundaries.orchard_initial_tree_size, + ) + }; Ok(InitialScanData { - previous_block: previous_wallet_block, + start_seam_block, + end_seam_block, sapling_initial_tree_size, orchard_initial_tree_size, }) @@ -154,41 +108,56 @@ impl DecryptedNoteData { /// Scans a given range and returns all data relevant to the specified keys. /// -/// `previous_wallet_block` is the wallet block with height [scan_range.start - 1]. +/// `start_seam_block` and `end_seam_block` are the blocks adjacent to the `scan_range` for verification of continuity. /// `locators` are the block height and txid of transactions in the `scan_range` that are known to be relevant to the /// wallet and are appended to during scanning if trial decryption succeeds. If there are no known relevant transctions /// then `locators` will start empty. +#[allow(clippy::too_many_arguments)] pub(crate) async fn scan

( fetch_request_sender: mpsc::UnboundedSender, - parameters: &P, + consensus_parameters: &P, ufvks: &HashMap, - scan_range: ScanRange, - previous_wallet_block: Option, - mut locators: BTreeSet, - transparent_addresses: HashMap, + scan_task: ScanTask, ) -> Result where - P: Parameters + Sync + Send + 'static, + P: consensus::Parameters + Sync + Send + 'static, { - let compact_blocks = client::get_compact_block_range( - fetch_request_sender.clone(), - scan_range.block_range().clone(), - ) - .await - .unwrap(); + let ScanTask { + compact_blocks, + scan_range, + start_seam_block, + end_seam_block, + mut locators, + transparent_addresses, + } = scan_task; + + if compact_blocks + .first() + .expect("compacts blocks should not be empty") + .height + != scan_range.block_range().start.into() + || compact_blocks + .last() + .expect("compacts blocks should not be empty") + .height + != (scan_range.block_range().end - 1).into() + { + panic!("compact blocks do not match scan range!") + } let initial_scan_data = InitialScanData::new( fetch_request_sender.clone(), - parameters, + consensus_parameters, compact_blocks .first() .expect("compacts blocks should not be empty"), - previous_wallet_block, + start_seam_block, + end_seam_block, ) .await .unwrap(); - let consensus_parameters_clone = parameters.clone(); + let consensus_parameters_clone = consensus_parameters.clone(); let ufvks_clone = ufvks.clone(); let scan_data = tokio::task::spawn_blocking(move || { scan_compact_blocks( @@ -214,7 +183,7 @@ where let mut outpoints = OutPointMap::new(); let wallet_transactions = scan_transactions( fetch_request_sender, - parameters, + consensus_parameters, ufvks, locators, decrypted_note_data, @@ -232,15 +201,13 @@ where orchard_leaves_and_retentions, } = witness_data; - let sapling_located_trees = tokio::task::spawn_blocking(move || { - witness::build_located_trees(sapling_initial_position, sapling_leaves_and_retentions) - .unwrap() - }) - .await - .unwrap(); - let orchard_located_trees = tokio::task::spawn_blocking(move || { - witness::build_located_trees(orchard_initial_position, orchard_leaves_and_retentions) - .unwrap() + let (sapling_located_trees, orchard_located_trees) = tokio::task::spawn_blocking(move || { + ( + witness::build_located_trees(sapling_initial_position, sapling_leaves_and_retentions) + .unwrap(), + witness::build_located_trees(orchard_initial_position, orchard_leaves_and_retentions) + .unwrap(), + ) }) .await .unwrap(); diff --git a/zingo-sync/src/scan/compact_blocks.rs b/zingo-sync/src/scan/compact_blocks.rs index d0d593956..6a7beb3bf 100644 --- a/zingo-sync/src/scan/compact_blocks.rs +++ b/zingo-sync/src/scan/compact_blocks.rs @@ -1,8 +1,12 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::{ + cmp, + collections::{BTreeMap, BTreeSet, HashMap}, +}; use incrementalmerkletree::{Marking, Position, Retention}; use orchard::{note_encryption::CompactAction, tree::MerkleHashOrchard}; use sapling_crypto::{note_encryption::CompactOutputDescription, Node}; +use tokio::sync::mpsc; use zcash_client_backend::proto::compact_formats::{ CompactBlock, CompactOrchardAction, CompactSaplingOutput, CompactTx, }; @@ -10,11 +14,12 @@ use zcash_keys::keys::UnifiedFullViewingKey; use zcash_note_encryption::Domain; use zcash_primitives::{ block::BlockHash, - consensus::{BlockHeight, Parameters}, + consensus::{self, BlockHeight, Parameters}, zip32::AccountId, }; use crate::{ + client::{self, FetchRequest}, keys::{KeyId, ScanningKeyOps, ScanningKeys}, primitives::{NullifierMap, OutputId, TreeBoundaries, WalletBlock}, witness::WitnessData, @@ -30,7 +35,7 @@ use super::{ mod runners; // TODO: move parameters to config module -const TRIAL_DECRYPT_TASK_SIZE: usize = 1_000; +const TRIAL_DECRYPT_TASK_SIZE: usize = 1_024; // 2^10 pub(crate) fn scan_compact_blocks

( compact_blocks: Vec, @@ -41,7 +46,11 @@ pub(crate) fn scan_compact_blocks

( where P: Parameters + Sync + Send + 'static, { - check_continuity(&compact_blocks, initial_scan_data.previous_block.as_ref())?; + check_continuity( + &compact_blocks, + initial_scan_data.start_seam_block.as_ref(), + initial_scan_data.end_seam_block.as_ref(), + )?; let scanning_keys = ScanningKeys::from_account_ufvks(ufvks.clone()); let mut runners = trial_decrypt(parameters, &scanning_keys, &compact_blocks).unwrap(); @@ -62,8 +71,7 @@ where sapling_initial_tree_size = sapling_final_tree_size; orchard_initial_tree_size = orchard_final_tree_size; - let block_height = - BlockHeight::from_u32(block.height.try_into().expect("should never overflow")); + let block_height = block.height(); let mut transactions = block.vtx.iter().peekable(); while let Some(transaction) = transactions.next() { @@ -170,19 +178,21 @@ where Ok(runners) } -// checks height and hash continuity of a batch of compact blocks. -// takes the last wallet compact block of the adjacent lower scan range, if available. -// TODO: remove option and revisit scanner flow to use the last block of previously scanned batch to check continuity +/// Checks height and hash continuity of a batch of compact blocks. +/// +/// If available, also checks continuity with the blocks adjacent to the `compact_blocks` forming the start and end +/// seams of the scan ranges. fn check_continuity( compact_blocks: &[CompactBlock], - previous_compact_block: Option<&WalletBlock>, + start_seam_block: Option<&WalletBlock>, + end_seam_block: Option<&WalletBlock>, ) -> Result<(), ContinuityError> { let mut prev_height: Option = None; let mut prev_hash: Option = None; - if let Some(prev) = previous_compact_block { - prev_height = Some(prev.block_height()); - prev_hash = Some(prev.block_hash()); + if let Some(start_seam_block) = start_seam_block { + prev_height = Some(start_seam_block.block_height()); + prev_hash = Some(start_seam_block.block_hash()); } for block in compact_blocks { @@ -209,6 +219,25 @@ fn check_continuity( prev_hash = Some(block.hash()); } + if let Some(end_seam_block) = end_seam_block { + let prev_height = prev_height.expect("compact blocks should not be empty"); + if end_seam_block.block_height() != prev_height + 1 { + return Err(ContinuityError::HeightDiscontinuity { + height: end_seam_block.block_height(), + previous_block_height: prev_height, + }); + } + + let prev_hash = prev_hash.expect("compact blocks should not be empty"); + if end_seam_block.prev_hash() != prev_hash { + return Err(ContinuityError::HashDiscontinuity { + height: end_seam_block.block_height(), + prev_hash: end_seam_block.prev_hash(), + previous_block_hash: prev_hash, + }); + } + } + Ok(()) } @@ -384,3 +413,69 @@ fn collect_nullifiers( }); Ok(()) } + +pub(super) async fn calculate_block_tree_boundaries

( + consensus_parameters: &P, + fetch_request_sender: mpsc::UnboundedSender, + compact_block: &CompactBlock, +) -> TreeBoundaries +where + P: consensus::Parameters + Sync + Send + 'static, +{ + let (sapling_final_tree_size, orchard_final_tree_size) = + if let Some(chain_metadata) = compact_block.chain_metadata { + ( + chain_metadata.sapling_commitment_tree_size, + chain_metadata.orchard_commitment_tree_size, + ) + } else { + let sapling_activation_height = consensus_parameters + .activation_height(consensus::NetworkUpgrade::Sapling) + .expect("should have some sapling activation height"); + + match compact_block.height().cmp(&sapling_activation_height) { + cmp::Ordering::Greater => { + let frontiers = + client::get_frontiers(fetch_request_sender.clone(), compact_block.height()) + .await + .unwrap(); + ( + frontiers + .final_sapling_tree() + .tree_size() + .try_into() + .expect("should not be more than 2^32 note commitments in the tree!"), + frontiers + .final_orchard_tree() + .tree_size() + .try_into() + .expect("should not be more than 2^32 note commitments in the tree!"), + ) + } + cmp::Ordering::Equal => (0, 0), + cmp::Ordering::Less => panic!("pre-sapling not supported!"), + } + }; + + let sapling_output_count: u32 = compact_block + .vtx + .iter() + .map(|tx| tx.outputs.len()) + .sum::() + .try_into() + .expect("Sapling output count cannot exceed a u32"); + let orchard_output_count: u32 = compact_block + .vtx + .iter() + .map(|tx| tx.actions.len()) + .sum::() + .try_into() + .expect("Sapling output count cannot exceed a u32"); + + TreeBoundaries { + sapling_initial_tree_size: sapling_final_tree_size - sapling_output_count, + sapling_final_tree_size, + orchard_initial_tree_size: orchard_final_tree_size - orchard_output_count, + orchard_final_tree_size, + } +} diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 120c808ab..e0424d097 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -11,24 +11,29 @@ use tokio::{ task::{JoinError, JoinHandle}, }; -use zcash_client_backend::data_api::scanning::{ScanPriority, ScanRange}; +use zcash_client_backend::{ + data_api::scanning::{ScanPriority, ScanRange}, + proto::compact_formats::CompactBlock, +}; use zcash_keys::keys::UnifiedFullViewingKey; use zcash_primitives::{ - consensus::{self}, + consensus::{self, BlockHeight}, + transaction::TxId, zip32::AccountId, }; use crate::{ - client::FetchRequest, + client::{self, FetchRequest}, keys::transparent::TransparentAddressId, primitives::{Locator, WalletBlock}, sync, traits::{SyncBlocks, SyncWallet}, }; -use super::{error::ScanError, scan, ScanResults}; +use super::{compact_blocks::calculate_block_tree_boundaries, error::ScanError, scan, ScanResults}; const MAX_WORKER_POOLSIZE: usize = 2; +const MAX_BATCH_OUTPUTS: usize = 8_192; // 2^13 pub(crate) enum ScannerState { Verification, @@ -48,6 +53,7 @@ impl ScannerState { pub(crate) struct Scanner

{ state: ScannerState, + batcher: Option>, workers: Vec>, unique_id: usize, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, @@ -71,19 +77,45 @@ where Self { state: ScannerState::Verification, + batcher: None, workers, unique_id: 0, - consensus_parameters, scan_results_sender, fetch_request_sender, + consensus_parameters, ufvks, } } + pub(crate) fn launch(&mut self) { + self.spawn_batcher(); + self.spawn_workers(); + } + pub(crate) fn worker_poolsize(&self) -> usize { self.workers.len() } + /// Spawns the batcher. + /// + /// When the batcher is running it will wait for a scan task. + pub(crate) fn spawn_batcher(&mut self) { + tracing::debug!("Spawning batcher"); + let mut batcher = Batcher::new( + self.consensus_parameters.clone(), + self.fetch_request_sender.clone(), + ); + batcher.run().unwrap(); + self.batcher = Some(batcher); + } + + async fn shutdown_batcher(&mut self) -> Result<(), JoinError> { + let mut batcher = self.batcher.take().expect("batcher should exist!"); + batcher.shutdown().await?; + + Ok(()) + } + /// Spawns a worker. /// /// When the worker is running it will wait for a scan task. @@ -92,7 +124,6 @@ where let mut worker = ScanWorker::new( self.unique_id, self.consensus_parameters.clone(), - None, self.scan_results_sender.clone(), self.fetch_request_sender.clone(), self.ufvks.clone(), @@ -139,15 +170,24 @@ where /// Updates the scanner. /// - /// If verification is still in progress, do not create scan tasks. - /// If there is an idle worker, create a new scan task and add to worker. - /// If there are no more range available to scan, shutdown the idle workers. + /// Creates a new scan task and sends to batcher if it's idle. + /// The batcher will stream compact blocks into the scan task, splitting the scan task when the maximum number of + /// outputs is reached. When a scan task is ready it is stored in the batcher ready to be taken by an idle scan + /// worker for scanning. + /// When verification is still in progress, only scan tasks with `Verify` scan priority are created. + /// When all ranges are scanned, the batcher, idle workers and mempool are shutdown. pub(crate) async fn update(&mut self, wallet: &mut W, shutdown_mempool: Arc) where W: SyncWallet + SyncBlocks, { match self.state { ScannerState::Verification => { + self.batcher + .as_mut() + .expect("batcher should be running") + .update_batch_store(); + self.update_workers(); + let sync_state = wallet.get_sync_state().unwrap(); if !sync_state .scan_ranges() @@ -169,35 +209,24 @@ where } // scan ranges with `Verify` priority - if let Some(worker) = self.idle_worker() { - let scan_task = sync::state::create_scan_task(wallet) - .unwrap() - .expect("scan range with `Verify` priority must exist!"); - - assert_eq!(scan_task.scan_range.priority(), ScanPriority::Verify); - worker.add_scan_task(scan_task).unwrap(); - } + self.update_batcher(wallet); } ScannerState::Scan => { - // create scan tasks until all ranges are scanned or currently scanning - if let Some(worker) = self.idle_worker() { - if let Some(scan_task) = sync::state::create_scan_task(wallet).unwrap() { - worker.add_scan_task(scan_task).unwrap(); - } else if wallet.get_sync_state().unwrap().scan_complete() { - self.state.scan_completed(); - } - } + self.batcher + .as_mut() + .expect("batcher should be running") + .update_batch_store(); + self.update_workers(); + self.update_batcher(wallet); } ScannerState::Shutdown => { - // shutdown mempool shutdown_mempool.store(true, atomic::Ordering::Release); - - // shutdown idle workers while let Some(worker) = self.idle_worker() { - self.shutdown_worker(worker.id) - .await - .expect("worker should be in worker pool"); + self.shutdown_worker(worker.id).await.unwrap(); } + self.shutdown_batcher() + .await + .expect("batcher should not fail!"); } } @@ -205,6 +234,242 @@ where panic!("worker pool should not be empty with unscanned ranges!") } } + + fn update_workers(&mut self) { + let batcher = self.batcher.as_ref().expect("batcher should be running"); + if batcher.batch.is_some() { + if let Some(worker) = self.idle_worker() { + let batch = batcher + .batch + .clone() + .expect("batch should exist in this closure"); + worker.add_scan_task(batch); + self.batcher + .as_mut() + .expect("batcher should be running") + .batch = None; + } + } + } + + fn update_batcher(&mut self, wallet: &mut W) + where + W: SyncWallet + SyncBlocks, + { + let batcher = self.batcher.as_ref().expect("batcher should be running"); + if !batcher.is_batching() { + if let Some(scan_task) = + sync::state::create_scan_task(&self.consensus_parameters, wallet).unwrap() + { + batcher.add_scan_task(scan_task); + } else if wallet.get_sync_state().unwrap().scan_complete() { + self.state.scan_completed(); + } + } + } +} + +struct Batcher

{ + handle: Option>, + is_batching: Arc, + batch: Option, + consensus_parameters: P, + scan_task_sender: Option>, + batch_receiver: Option>, + fetch_request_sender: mpsc::UnboundedSender, +} + +impl

Batcher

+where + P: consensus::Parameters + Sync + Send + 'static, +{ + fn new( + consensus_parameters: P, + fetch_request_sender: mpsc::UnboundedSender, + ) -> Self { + Self { + handle: None, + is_batching: Arc::new(AtomicBool::new(false)), + batch: None, + consensus_parameters, + scan_task_sender: None, + batch_receiver: None, + fetch_request_sender, + } + } + + /// Runs the batcher in a new tokio task. + /// + /// Waits for a scan task and then fetches compact blocks to form fixed output batches. The scan task is split if + /// needed and the compact blocks are added to each scan task and sent to the scan workers for scanning. + fn run(&mut self) -> Result<(), ()> { + let (scan_task_sender, mut scan_task_receiver) = mpsc::channel::(1); + let (batch_sender, batch_receiver) = mpsc::channel::(1); + + let is_batching = self.is_batching.clone(); + let fetch_request_sender = self.fetch_request_sender.clone(); + let consensus_parameters = self.consensus_parameters.clone(); + + let handle = tokio::spawn(async move { + // save seam blocks between scan tasks for linear scanning continuuity checks + // during non-linear scanning the wallet blocks from the scanned ranges will already be saved in the wallet + let mut previous_task_first_block: Option = None; + let mut previous_task_last_block: Option = None; + + while let Some(mut scan_task) = scan_task_receiver.recv().await { + let mut sapling_output_count = 0; + let mut orchard_output_count = 0; + let mut first_batch = true; + + let mut block_stream = client::get_compact_block_range( + fetch_request_sender.clone(), + scan_task.scan_range.block_range().clone(), + ) + .await + .unwrap(); + while let Some(compact_block) = block_stream.message().await.unwrap() { + if let Some(block) = previous_task_last_block.as_ref() { + if scan_task.start_seam_block.is_none() + && scan_task.scan_range.block_range().start == block.block_height() + 1 + { + scan_task.start_seam_block = previous_task_last_block.clone(); + } + } + if let Some(block) = previous_task_first_block.as_ref() { + if scan_task.end_seam_block.is_none() + && scan_task.scan_range.block_range().end == block.block_height() + { + scan_task.end_seam_block = previous_task_first_block.clone(); + } + } + + if first_batch { + let tree_boundaries = calculate_block_tree_boundaries( + &consensus_parameters, + fetch_request_sender.clone(), + &compact_block, + ) + .await; + + previous_task_first_block = Some(WalletBlock::from_parts( + compact_block.height(), + compact_block.hash(), + compact_block.prev_hash(), + compact_block.time, + compact_block.vtx.iter().map(|tx| tx.txid()).collect(), + tree_boundaries, + )); + first_batch = false; + } + if compact_block.height() == scan_task.scan_range.block_range().end - 1 { + let tree_boundaries = calculate_block_tree_boundaries( + &consensus_parameters, + fetch_request_sender.clone(), + &compact_block, + ) + .await; + + previous_task_last_block = Some(WalletBlock::from_parts( + compact_block.height(), + compact_block.hash(), + compact_block.prev_hash(), + compact_block.time, + compact_block.vtx.iter().map(|tx| tx.txid()).collect(), + tree_boundaries, + )); + } + + sapling_output_count += compact_block + .vtx + .iter() + .fold(0, |acc, transaction| acc + transaction.outputs.len()); + orchard_output_count += compact_block + .vtx + .iter() + .fold(0, |acc, transaction| acc + transaction.actions.len()); + if sapling_output_count + orchard_output_count > MAX_BATCH_OUTPUTS { + let (full_batch, new_batch) = scan_task + .clone() + .split( + &consensus_parameters, + fetch_request_sender.clone(), + compact_block.height(), + ) + .await + .unwrap(); + + batch_sender + .send(full_batch) + .await + .expect("receiver should never be dropped before sender!"); + + scan_task = new_batch; + sapling_output_count = 0; + orchard_output_count = 0; + } + + scan_task.compact_blocks.push(compact_block); + } + + batch_sender + .send(scan_task) + .await + .expect("receiver should never be dropped before sender!"); + + is_batching.store(false, atomic::Ordering::Release); + } + }); + + self.handle = Some(handle); + self.scan_task_sender = Some(scan_task_sender); + self.batch_receiver = Some(batch_receiver); + + Ok(()) + } + + fn is_batching(&self) -> bool { + self.is_batching.load(atomic::Ordering::Acquire) + } + + fn add_scan_task(&self, scan_task: ScanTask) { + tracing::debug!("Adding scan task to batcher:\n{:#?}", &scan_task); + self.scan_task_sender + .clone() + .expect("batcher should be running") + .try_send(scan_task) + .expect("batcher should never be sent multiple tasks at one time"); + self.is_batching.store(true, atomic::Ordering::Release); + } + + fn update_batch_store(&mut self) { + let batch_receiver = self + .batch_receiver + .as_mut() + .expect("batcher should be running"); + if self.batch.is_none() && !batch_receiver.is_empty() { + self.batch = Some( + batch_receiver + .try_recv() + .expect("channel should be non-empty!"), + ); + } + } + + /// Shuts down batcher by dropping the sender to the batcher task and awaiting the handle. + /// + /// This should always be called in the context of the scanner as it must be also be taken from the Scanner struct. + async fn shutdown(&mut self) -> Result<(), JoinError> { + tracing::debug!("Shutting down batcher"); + if let Some(sender) = self.scan_task_sender.take() { + drop(sender); + } + let handle = self + .handle + .take() + .expect("batcher should always have a handle to take!"); + + handle.await + } } struct ScanWorker

{ @@ -225,7 +490,6 @@ where fn new( id: usize, consensus_parameters: P, - scan_task_sender: Option>, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, ufvks: HashMap, @@ -235,7 +499,7 @@ where handle: None, is_scanning: Arc::new(AtomicBool::new(false)), consensus_parameters, - scan_task_sender, + scan_task_sender: None, scan_results_sender, fetch_request_sender, ufvks, @@ -256,19 +520,17 @@ where let handle = tokio::spawn(async move { while let Some(scan_task) = scan_task_receiver.recv().await { + let scan_range = scan_task.scan_range.clone(); let scan_results = scan( fetch_request_sender.clone(), &consensus_parameters, &ufvks, - scan_task.scan_range.clone(), - scan_task.previous_wallet_block, - scan_task.locators, - scan_task.transparent_addresses, + scan_task, ) .await; scan_results_sender - .send((scan_task.scan_range, scan_results)) + .send((scan_range, scan_results)) .expect("receiver should never be dropped before sender!"); is_scanning.store(false, atomic::Ordering::Release); @@ -285,16 +547,14 @@ where self.is_scanning.load(atomic::Ordering::Acquire) } - fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { + fn add_scan_task(&self, scan_task: ScanTask) { tracing::debug!("Adding scan task to worker {}:\n{:#?}", self.id, &scan_task); self.scan_task_sender .clone() - .unwrap() + .expect("worker should be running") .try_send(scan_task) .expect("worker should never be sent multiple tasks at one time"); self.is_scanning.store(true, atomic::Ordering::Release); - - Ok(()) } /// Shuts down worker by dropping the sender to the worker task and awaiting the handle. @@ -314,26 +574,119 @@ where } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct ScanTask { - scan_range: ScanRange, - previous_wallet_block: Option, - locators: BTreeSet, - transparent_addresses: HashMap, + pub(crate) compact_blocks: Vec, + pub(crate) scan_range: ScanRange, + pub(crate) start_seam_block: Option, + pub(crate) end_seam_block: Option, + pub(crate) locators: BTreeSet, + pub(crate) transparent_addresses: HashMap, } impl ScanTask { pub(crate) fn from_parts( scan_range: ScanRange, - previous_wallet_block: Option, + start_seam_block: Option, + end_seam_block: Option, locators: BTreeSet, transparent_addresses: HashMap, ) -> Self { Self { + compact_blocks: Vec::new(), scan_range, - previous_wallet_block, + start_seam_block, + end_seam_block, locators, transparent_addresses, } } + + /// Splits a scan task into two at `block_height`. + /// + /// Panics if `block_height` is not contained in the scan task's block range. + async fn split

( + self, + consensus_parameters: &P, + fetch_request_sender: mpsc::UnboundedSender, + block_height: BlockHeight, + ) -> Result<(Self, Self), ()> + where + P: consensus::Parameters + Sync + Send + 'static, + { + if block_height < self.scan_range.block_range().start + && block_height > self.scan_range.block_range().end - 1 + { + panic!("block height should be within scan tasks block range!"); + } + + let mut lower_compact_blocks = self.compact_blocks; + let upper_compact_blocks = if let Some(index) = lower_compact_blocks + .iter() + .position(|block| block.height() == block_height) + { + lower_compact_blocks.split_off(index) + } else { + Vec::new() + }; + + let mut lower_task_locators = self.locators; + let upper_task_locators = + lower_task_locators.split_off(&(block_height, TxId::from_bytes([0; 32]))); + + let lower_task_last_block = if let Some(block) = lower_compact_blocks.last() { + let tree_boundaries = calculate_block_tree_boundaries( + consensus_parameters, + fetch_request_sender.clone(), + block, + ) + .await; + + Some(WalletBlock::from_parts( + block.height(), + block.hash(), + block.prev_hash(), + block.time, + block.vtx.iter().map(|tx| tx.txid()).collect(), + tree_boundaries, + )) + } else { + None + }; + let upper_task_first_block = if let Some(block) = upper_compact_blocks.first() { + let tree_boundaries = + calculate_block_tree_boundaries(consensus_parameters, fetch_request_sender, block) + .await; + + Some(WalletBlock::from_parts( + block.height(), + block.hash(), + block.prev_hash(), + block.time, + block.vtx.iter().map(|tx| tx.txid()).collect(), + tree_boundaries, + )) + } else { + None + }; + + Ok(( + ScanTask { + compact_blocks: lower_compact_blocks, + scan_range: self.scan_range.truncate_end(block_height).unwrap(), + start_seam_block: self.start_seam_block, + end_seam_block: upper_task_first_block, + locators: lower_task_locators, + transparent_addresses: self.transparent_addresses.clone(), + }, + ScanTask { + compact_blocks: upper_compact_blocks, + scan_range: self.scan_range.truncate_start(block_height).unwrap(), + start_seam_block: lower_task_last_block, + end_seam_block: self.end_seam_block, + locators: upper_task_locators, + transparent_addresses: self.transparent_addresses, + }, + )) + } } diff --git a/zingo-sync/src/scan/transactions.rs b/zingo-sync/src/scan/transactions.rs index 1f4c2dcf2..7175bd366 100644 --- a/zingo-sync/src/scan/transactions.rs +++ b/zingo-sync/src/scan/transactions.rs @@ -518,8 +518,15 @@ fn collect_outpoints( +/// For each locator, fetch the spending transaction and then scan and append to the wallet transactions. +/// +/// This is only intended to be used for transactions that do not contain any incoming notes and therefore evaded +/// trial decryption. +/// For targetted rescan of transactions by locator, locators should be added to the wallet using the `TODO` API and +/// the `FoundNote` priorities will be automatically set for scan prioritisation. Transactions with incoming notes +/// are required to be scanned in the context of a scan task to correctly derive the nullifiers and positions for +/// spending. +pub(crate) async fn scan_spending_transactions( fetch_request_sender: mpsc::UnboundedSender, consensus_parameters: &P, wallet: &mut W, @@ -545,8 +552,6 @@ where } spending_locators.insert(locator); - // TODO: fetch block from server if not in wallet so wallet blocks added to wallet while scanning out of order - // don't need to be held in memory wallet_blocks.insert( block_height, wallet.get_wallet_block(block_height).expect( diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 193f31490..3cdfe5078 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -37,15 +37,12 @@ pub(crate) mod spend; pub(crate) mod state; pub(crate) mod transparent; -// TODO: move parameters to config module -// TODO; replace fixed batches with variable batches with fixed memory size -const BATCH_SIZE: u32 = 10_000; const VERIFY_BLOCK_RANGE_SIZE: u32 = 10; -const MAX_VERIFICATION_WINDOW: u32 = 100; // TODO: fail if re-org goes beyond this window +const MAX_VERIFICATION_WINDOW: u32 = 100; /// Syncs a wallet to the latest state of the blockchain pub async fn sync( - client: CompactTxStreamerClient, // TODO: change underlying service for generic + client: CompactTxStreamerClient, consensus_parameters: &P, wallet: Arc>, ) -> Result<(), SyncError> @@ -53,7 +50,7 @@ where P: consensus::Parameters + Sync + Send + 'static, W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees, { - tracing::info!("Syncing wallet..."); + tracing::info!("Starting sync..."); // create channel for sending fetch requests and launch fetcher task let (fetch_request_sender, fetch_request_receiver) = mpsc::unbounded_channel(); @@ -140,12 +137,17 @@ where fetch_request_sender.clone(), ufvks.clone(), ); - scanner.spawn_workers(); + scanner.launch(); // TODO: invalidate any pending transactions after eviction height (40 below best chain height?) // TODO: implement an option for continuous scanning where it doesnt exit when complete let mut wallet_guard = wallet.lock().await; + let initial_verification_height = wallet_guard + .get_sync_state() + .unwrap() + .highest_scanned_height() + + 1; let mut interval = tokio::time::interval(Duration::from_millis(30)); loop { tokio::select! { @@ -157,6 +159,7 @@ where &ufvks, scan_range, scan_results, + initial_verification_height, ) .await .unwrap(); @@ -187,8 +190,6 @@ where } } - // TODO: clear locators - drop(wallet_guard); drop(scanner); drop(fetch_request_sender); @@ -285,6 +286,7 @@ async fn process_scan_results( ufvks: &HashMap, scan_range: ScanRange, scan_results: Result, + initial_verification_height: BlockHeight, ) -> Result<(), SyncError> where P: consensus::Parameters, @@ -302,13 +304,12 @@ where ) .await .unwrap(); - remove_irrelevant_data(wallet, &scan_range).unwrap(); - state::set_scan_priority( + state::set_scanned_scan_range( wallet.get_sync_state_mut().unwrap(), - scan_range.block_range(), - ScanPriority::Scanned, + scan_range.block_range().clone(), ) .unwrap(); + remove_irrelevant_data(wallet).unwrap(); tracing::debug!("Scan results processed."); } Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => { @@ -328,6 +329,15 @@ where state::VerifyEnd::VerifyHighest, ); truncate_wallet_data(wallet, scan_range_to_verify.block_range().start - 1).unwrap(); + + if initial_verification_height - scan_range_to_verify.block_range().start + > MAX_VERIFICATION_WINDOW + { + panic!( + "sync failed. re-org of larger than {} blocks detected", + MAX_VERIFICATION_WINDOW + ); + } } else { scan_results?; } @@ -495,28 +505,33 @@ where wallet .update_shard_trees(sapling_located_trees, orchard_located_trees) .unwrap(); - // TODO: add trait to save wallet data to persistence for in-memory wallets Ok(()) } -// TODO: replace this function with a filter on the data added to wallet -fn remove_irrelevant_data(wallet: &mut W, scan_range: &ScanRange) -> Result<(), ()> +fn remove_irrelevant_data(wallet: &mut W) -> Result<(), ()> where W: SyncWallet + SyncBlocks + SyncNullifiers + SyncTransactions, { - if scan_range.priority() != ScanPriority::Historic { - return Ok(()); - } + let sync_state = wallet.get_sync_state().unwrap(); + let fully_scanned_height = sync_state.fully_scanned_height(); + let highest_scanned_height = sync_state.highest_scanned_height(); + let sync_start_height = sync_state.initial_sync_state().sync_start_height(); - let wallet_height = wallet - .get_sync_state() - .unwrap() + let scanned_block_range_boundaries = sync_state .scan_ranges() - .last() - .expect("wallet should always have scan ranges after sync has started") - .block_range() - .end; + .iter() + .filter(|scan_range| { + scan_range.priority() == ScanPriority::Scanned + && scan_range.block_range().start >= sync_start_height + }) + .flat_map(|scan_range| { + vec![ + scan_range.block_range().start, + scan_range.block_range().end - 1, + ] + }) + .collect::>(); let wallet_transaction_heights = wallet .get_wallet_transactions() @@ -524,21 +539,28 @@ where .values() .filter_map(|tx| tx.confirmation_status().get_confirmed_height()) .collect::>(); + wallet.get_wallet_blocks_mut().unwrap().retain(|height, _| { - *height >= scan_range.block_range().end - 1 - || *height >= wallet_height - 100 + *height >= sync_start_height - 1 + || *height >= highest_scanned_height - MAX_VERIFICATION_WINDOW + || scanned_block_range_boundaries.contains(height) || wallet_transaction_heights.contains(height) }); wallet .get_nullifiers_mut() .unwrap() .sapling_mut() - .retain(|_, (height, _)| *height >= scan_range.block_range().end); + .retain(|_, (height, _)| *height > fully_scanned_height); wallet .get_nullifiers_mut() .unwrap() .orchard_mut() - .retain(|_, (height, _)| *height >= scan_range.block_range().end); + .retain(|_, (height, _)| *height > fully_scanned_height); + wallet + .get_sync_state_mut() + .unwrap() + .locators_mut() + .retain(|(height, _)| *height > fully_scanned_height); Ok(()) } diff --git a/zingo-sync/src/sync/spend.rs b/zingo-sync/src/sync/spend.rs index acdb1d638..9538d245d 100644 --- a/zingo-sync/src/sync/spend.rs +++ b/zingo-sync/src/sync/spend.rs @@ -14,7 +14,7 @@ use zip32::AccountId; use crate::{ client::FetchRequest, primitives::{Locator, NullifierMap, OutPointMap, OutputId, WalletTransaction}, - scan::transactions::scan_located_transactions, + scan::transactions::scan_spending_transactions, traits::{SyncBlocks, SyncNullifiers, SyncOutPoints, SyncTransactions}, }; @@ -63,7 +63,7 @@ where .unwrap(); // in the edge case where a spending transaction received no change, scan the transactions that evaded trial decryption - scan_located_transactions( + scan_spending_transactions( fetch_request_sender, consensus_parameters, wallet, diff --git a/zingo-sync/src/sync/state.rs b/zingo-sync/src/sync/state.rs index 87852f63e..f4c5d688e 100644 --- a/zingo-sync/src/sync/state.rs +++ b/zingo-sync/src/sync/state.rs @@ -26,7 +26,7 @@ use crate::{ traits::{SyncBlocks, SyncWallet}, }; -use super::{BATCH_SIZE, VERIFY_BLOCK_RANGE_SIZE}; +use super::VERIFY_BLOCK_RANGE_SIZE; /// Used to determine which end of the scan range is verified. pub(super) enum VerifyEnd { @@ -64,7 +64,6 @@ where } /// Returns the locators for a given `block_range` from the wallet's [`crate::primitives::SyncState`] -// TODO: unit test high priority fn find_locators(sync_state: &SyncState, block_range: &Range) -> BTreeSet { sync_state .locators() @@ -76,8 +75,6 @@ fn find_locators(sync_state: &SyncState, block_range: &Range) -> BT .collect() } -// TODO: remove locators after range is scanned - /// Update scan ranges for scanning pub(super) async fn update_scan_ranges( consensus_parameters: &impl consensus::Parameters, @@ -96,6 +93,11 @@ pub(super) async fn update_scan_ranges( )?; set_chain_tip_scan_range(consensus_parameters, sync_state, chain_height)?; + let verification_height = sync_state.highest_scanned_height() + 1; + if verification_height <= chain_height { + set_verify_scan_range(sync_state, verification_height, VerifyEnd::VerifyLowest); + } + // TODO: add logic to merge scan ranges Ok(()) @@ -125,8 +127,6 @@ async fn create_scan_range( panic!("scan ranges should never be empty after updating"); } - set_verify_scan_range(sync_state, wallet_height + 1, VerifyEnd::VerifyLowest); - Ok(()) } @@ -145,8 +145,6 @@ fn reset_scan_ranges(sync_state: &mut SyncState) -> Result<(), ()> { set_scan_priority(sync_state, scan_range.block_range(), ScanPriority::Verify).unwrap(); } - // TODO: determine OpenAdjacent priority ranges from the end block of previous ChainTip ranges - Ok(()) } @@ -200,6 +198,39 @@ pub(super) fn set_verify_scan_range( scan_range_to_verify } +/// Punches in the chain tip block range with `ScanPriority::ChainTip`. +/// +/// Determines the chain tip block range by finding the lowest start height of the latest incomplete shard for each +/// shielded protocol. +fn set_chain_tip_scan_range( + consensus_parameters: &impl consensus::Parameters, + sync_state: &mut SyncState, + chain_height: BlockHeight, +) -> Result<(), ()> { + let sapling_incomplete_shard = determine_block_range( + consensus_parameters, + sync_state, + chain_height, + ShieldedProtocol::Sapling, + ); + let orchard_incomplete_shard = determine_block_range( + consensus_parameters, + sync_state, + chain_height, + ShieldedProtocol::Orchard, + ); + + let chain_tip = if sapling_incomplete_shard.start < orchard_incomplete_shard.start { + sapling_incomplete_shard + } else { + orchard_incomplete_shard + }; + + punch_scan_priority(sync_state, chain_tip, ScanPriority::ChainTip).unwrap(); + + Ok(()) +} + /// Punches in the `shielded_protocol` shard block ranges surrounding each locator with `ScanPriority::FoundNote`. pub(super) fn set_found_note_scan_ranges>( consensus_parameters: &impl consensus::Parameters, @@ -233,35 +264,27 @@ pub(super) fn set_found_note_scan_range( Ok(()) } -/// Punches in the chain tip block range with `ScanPriority::ChainTip`. -/// -/// Determines the chain tip block range by finding the lowest start height of the latest incomplete shard for each -/// shielded protocol. -fn set_chain_tip_scan_range( - consensus_parameters: &impl consensus::Parameters, +pub(super) fn set_scanned_scan_range( sync_state: &mut SyncState, - chain_height: BlockHeight, + scanned_range: Range, ) -> Result<(), ()> { - let sapling_incomplete_shard = determine_block_range( - consensus_parameters, - sync_state, - chain_height, - ShieldedProtocol::Sapling, - ); - let orchard_incomplete_shard = determine_block_range( - consensus_parameters, - sync_state, - chain_height, - ShieldedProtocol::Orchard, - ); + let scan_ranges = sync_state.scan_ranges_mut(); - let chain_tip = if sapling_incomplete_shard.start < orchard_incomplete_shard.start { - sapling_incomplete_shard - } else { - orchard_incomplete_shard + let Some((index, scan_range)) = scan_ranges.iter().enumerate().find(|(_, scan_range)| { + scan_range.block_range().contains(&scanned_range.start) + && scan_range.block_range().contains(&(scanned_range.end - 1)) + }) else { + panic!("scan range containing scanned range should exist!"); }; - punch_scan_priority(sync_state, chain_tip, ScanPriority::ChainTip).unwrap(); + let split_ranges = split_out_scan_range( + scan_range.clone(), + scanned_range.clone(), + ScanPriority::Scanned, + ); + sync_state + .scan_ranges_mut() + .splice(index..=index, split_ranges); Ok(()) } @@ -316,7 +339,7 @@ fn punch_scan_priority( match ( block_range.contains(&scan_range.block_range().start), - block_range.contains(&scan_range.block_range().end), + block_range.contains(&(scan_range.block_range().end - 1)), scan_range.block_range().contains(&block_range.start), ) { (true, true, _) => scan_ranges_contained_by_block_range.push(scan_range.clone()), @@ -444,28 +467,28 @@ fn split_out_scan_range( if let Some((lower_range, higher_range)) = scan_range.split_at(block_range.start) { split_ranges.push(lower_range); if let Some((middle_range, higher_range)) = higher_range.split_at(block_range.end) { - // [scan_range] is split at the upper and lower bound of [block_range] + // `scan_range` is split at the upper and lower bound of `block_range` split_ranges.push(ScanRange::from_parts( middle_range.block_range().clone(), scan_priority, )); split_ranges.push(higher_range); } else { - // [scan_range] is split only at the lower bound of [block_range] + // `scan_range` is split only at the lower bound of `block_range` split_ranges.push(ScanRange::from_parts( higher_range.block_range().clone(), scan_priority, )); } } else if let Some((lower_range, higher_range)) = scan_range.split_at(block_range.end) { - // [scan_range] is split only at the upper bound of [block_range] + // `scan_range` is split only at the upper bound of `block_range` split_ranges.push(ScanRange::from_parts( lower_range.block_range().clone(), scan_priority, )); split_ranges.push(higher_range); } else { - // [scan_range] is not split as it is fully contained within [block_range] + // `scan_range` is not split as it is fully contained within `block_range` // only scan priority is updated assert!(scan_range.block_range().start >= block_range.start); assert!(scan_range.block_range().end <= block_range.end); @@ -483,7 +506,10 @@ fn split_out_scan_range( /// /// Sets the range for scanning to `Ignored` priority in the wallet `sync_state` but returns the scan range with its initial priority. /// Returns `None` if there are no more ranges to scan. -fn select_scan_range(sync_state: &mut SyncState) -> Option { +fn select_scan_range( + consensus_parameters: &impl consensus::Parameters, + sync_state: &mut SyncState, +) -> Option { let scan_ranges = sync_state.scan_ranges_mut(); // scan ranges are sorted from lowest to highest priority @@ -504,25 +530,44 @@ fn select_scan_range(sync_state: &mut SyncState) -> Option { } let selected_priority = highest_priority_scan_range.priority(); - // TODO: fixed memory batching - let batch_block_range = Range { - start: highest_priority_scan_range.block_range().start, - end: highest_priority_scan_range.block_range().start + BATCH_SIZE, - }; - let split_ranges = split_out_scan_range( - highest_priority_scan_range, - batch_block_range, - ScanPriority::Ignored, - ); - let selected_block_range = split_ranges - .first() - .expect("split ranges should always be non-empty") - .block_range() - .clone(); - sync_state - .scan_ranges_mut() - .splice(index..=index, split_ranges); + // historic scan ranges can be larger than a shard block range so must be split out. + // otherwise, just set the scan priority of selected range to `Ignored` (scanning) in sync state. + let selected_block_range = if selected_priority == ScanPriority::Historic { + let shard_block_range = determine_block_range( + consensus_parameters, + sync_state, + highest_priority_scan_range.block_range().start, + ShieldedProtocol::Orchard, + ); + let split_ranges = split_out_scan_range( + highest_priority_scan_range, + shard_block_range, + ScanPriority::Ignored, + ); + let selected_block_range = split_ranges + .first() + .expect("split ranges should always be non-empty") + .block_range() + .clone(); + sync_state + .scan_ranges_mut() + .splice(index..=index, split_ranges); + + selected_block_range + } else { + let selected_scan_range = sync_state + .scan_ranges_mut() + .get_mut(index) + .expect("scan range should exist due to previous logic"); + + *selected_scan_range = ScanRange::from_parts( + highest_priority_scan_range.block_range().clone(), + ScanPriority::Ignored, + ); + + selected_scan_range.block_range().clone() + }; // TODO: when this library has its own version of ScanRange this can be simplified and more readable Some(ScanRange::from_parts( @@ -532,15 +577,20 @@ fn select_scan_range(sync_state: &mut SyncState) -> Option { } /// Creates a scan task to be sent to a [`crate::scan::task::ScanWorker`] for scanning. -pub(crate) fn create_scan_task(wallet: &mut W) -> Result, ()> +pub(crate) fn create_scan_task( + consensus_parameters: &impl consensus::Parameters, + wallet: &mut W, +) -> Result, ()> where W: SyncWallet + SyncBlocks, { - if let Some(scan_range) = select_scan_range(wallet.get_sync_state_mut().unwrap()) { - // TODO: disallow scanning without previous wallet block - let previous_wallet_block = wallet + if let Some(scan_range) = + select_scan_range(consensus_parameters, wallet.get_sync_state_mut().unwrap()) + { + let start_seam_block = wallet .get_wallet_block(scan_range.block_range().start - 1) .ok(); + let end_seam_block = wallet.get_wallet_block(scan_range.block_range().end).ok(); let locators = find_locators(wallet.get_sync_state().unwrap(), scan_range.block_range()); let transparent_addresses: HashMap = wallet @@ -552,7 +602,8 @@ where Ok(Some(ScanTask::from_parts( scan_range, - previous_wallet_block, + start_seam_block, + end_seam_block, locators, transparent_addresses, )))