From fec01edf3584630fc6dca19c1d038dc88f42880a Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 31 Dec 2024 06:47:32 +0000 Subject: [PATCH 01/18] fix reorg bug and add end seam block for improved continuity checks --- zingo-sync/src/primitives.rs | 21 ++++++++++++++ zingo-sync/src/scan.rs | 19 ++++++++----- zingo-sync/src/scan/compact_blocks.rs | 41 +++++++++++++++++++++------ zingo-sync/src/scan/task.rs | 14 +++++---- zingo-sync/src/sync/state.rs | 14 +++++---- 5 files changed, 84 insertions(+), 25 deletions(-) diff --git a/zingo-sync/src/primitives.rs b/zingo-sync/src/primitives.rs index 4fb48fc59..207d611c2 100644 --- a/zingo-sync/src/primitives.rs +++ b/zingo-sync/src/primitives.rs @@ -98,7 +98,28 @@ impl SyncState { } } + /// Returns the highest block height that has been scanned. + /// + /// If no scan ranges have been scanned, returns the block below the wallet birthday. + /// Will panic if called before scan ranges are updated for the first time. + pub fn highest_scanned_height(&self) -> BlockHeight { + if let Some(last_scanned_range) = self + .scan_ranges() + .iter() + .filter(|scan_range| scan_range.priority() == ScanPriority::Scanned) + .last() + { + last_scanned_range.block_range().end - 1 + } else { + self.wallet_birthday() + .expect("scan ranges always non-empty") + - 1 + } + } + /// Returns the wallet birthday or `None` if `self.scan_ranges` is empty. + /// + /// If the wallet birthday is below the sapling activation height, returns the sapling activation height instead. pub fn wallet_birthday(&self) -> Option { self.scan_ranges() .first() diff --git a/zingo-sync/src/scan.rs b/zingo-sync/src/scan.rs index 8dd09a855..12c8313c8 100644 --- a/zingo-sync/src/scan.rs +++ b/zingo-sync/src/scan.rs @@ -32,7 +32,8 @@ pub(crate) mod task; pub(crate) mod transactions; struct InitialScanData { - previous_block: Option, + start_seam_block: Option, + end_seam_block: Option, sapling_initial_tree_size: u32, orchard_initial_tree_size: u32, } @@ -42,7 +43,8 @@ impl InitialScanData { fetch_request_sender: mpsc::UnboundedSender, consensus_parameters: &P, first_block: &CompactBlock, - previous_wallet_block: Option, + start_seam_block: Option, + end_seam_block: Option, ) -> Result where P: Parameters + Sync + Send + 'static, @@ -51,7 +53,7 @@ impl InitialScanData { // otherwise, from first block if available // otherwise, fetches frontiers from server let (sapling_initial_tree_size, orchard_initial_tree_size) = - if let Some(prev) = &previous_wallet_block { + if let Some(prev) = &start_seam_block { ( prev.sapling_commitment_tree_size(), prev.orchard_commitment_tree_size(), @@ -105,7 +107,8 @@ impl InitialScanData { }; Ok(InitialScanData { - previous_block: previous_wallet_block, + start_seam_block, + end_seam_block, sapling_initial_tree_size, orchard_initial_tree_size, }) @@ -145,7 +148,7 @@ impl DecryptedNoteData { /// Scans a given range and returns all data relevant to the specified keys. /// -/// `previous_wallet_block` is the wallet block with height [scan_range.start - 1]. +/// `start_seam_block` and `end_seam_block` are the blocks adjacent to the `scan_range` for verification of continuity. /// `locators` are the block height and txid of transactions in the `scan_range` that are known to be relevant to the /// wallet and are appended to during scanning if trial decryption succeeds. If there are no known relevant transctions /// then `locators` will start empty. @@ -154,7 +157,8 @@ pub(crate) async fn scan

( parameters: &P, ufvks: &HashMap, scan_range: ScanRange, - previous_wallet_block: Option, + start_seam_block: Option, + end_seam_block: Option, mut locators: BTreeSet, transparent_addresses: HashMap, ) -> Result @@ -174,7 +178,8 @@ where compact_blocks .first() .expect("compacts blocks should not be empty"), - previous_wallet_block, + start_seam_block, + end_seam_block, ) .await .unwrap(); diff --git a/zingo-sync/src/scan/compact_blocks.rs b/zingo-sync/src/scan/compact_blocks.rs index b1dd11893..568bfa220 100644 --- a/zingo-sync/src/scan/compact_blocks.rs +++ b/zingo-sync/src/scan/compact_blocks.rs @@ -41,7 +41,11 @@ pub(crate) fn scan_compact_blocks

( where P: Parameters + Sync + Send + 'static, { - check_continuity(&compact_blocks, initial_scan_data.previous_block.as_ref())?; + check_continuity( + &compact_blocks, + initial_scan_data.start_seam_block.as_ref(), + initial_scan_data.end_seam_block.as_ref(), + )?; let scanning_keys = ScanningKeys::from_account_ufvks(ufvks.clone()); let mut runners = trial_decrypt(parameters, &scanning_keys, &compact_blocks).unwrap(); @@ -160,19 +164,21 @@ where Ok(runners) } -// checks height and hash continuity of a batch of compact blocks. -// takes the last wallet compact block of the adjacent lower scan range, if available. -// TODO: remove option and revisit scanner flow to use the last block of previously scanned batch to check continuity +/// Checks height and hash continuity of a batch of compact blocks. +/// +/// If available, also checks continuity with the blocks adjacent to the `compact_blocks` forming the start and end +/// seams of the scan ranges. fn check_continuity( compact_blocks: &[CompactBlock], - previous_compact_block: Option<&WalletBlock>, + start_seam_block: Option<&WalletBlock>, + end_seam_block: Option<&WalletBlock>, ) -> Result<(), ContinuityError> { let mut prev_height: Option = None; let mut prev_hash: Option = None; - if let Some(prev) = previous_compact_block { - prev_height = Some(prev.block_height()); - prev_hash = Some(prev.block_hash()); + if let Some(start_seam_block) = start_seam_block { + prev_height = Some(start_seam_block.block_height()); + prev_hash = Some(start_seam_block.block_hash()); } for block in compact_blocks { @@ -199,6 +205,25 @@ fn check_continuity( prev_hash = Some(block.hash()); } + if let Some(end_seam_block) = end_seam_block { + let prev_height = prev_height.expect("compact blocks should not be empty"); + if end_seam_block.block_height() != prev_height + 1 { + return Err(ContinuityError::HeightDiscontinuity { + height: end_seam_block.block_height(), + previous_block_height: prev_height, + }); + } + + let prev_hash = prev_hash.expect("compact blocks should not be empty"); + if end_seam_block.prev_hash() != prev_hash { + return Err(ContinuityError::HashDiscontinuity { + height: end_seam_block.block_height(), + prev_hash: end_seam_block.prev_hash(), + previous_block_hash: prev_hash, + }); + } + } + Ok(()) } diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 120c808ab..26e3362f0 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -139,7 +139,7 @@ where /// Updates the scanner. /// - /// If verification is still in progress, do not create scan tasks. + /// If verification is still in progress, only create scan tasks with `Verify` scan priority. /// If there is an idle worker, create a new scan task and add to worker. /// If there are no more range available to scan, shutdown the idle workers. pub(crate) async fn update(&mut self, wallet: &mut W, shutdown_mempool: Arc) @@ -261,7 +261,8 @@ where &consensus_parameters, &ufvks, scan_task.scan_range.clone(), - scan_task.previous_wallet_block, + scan_task.start_seam_block, + scan_task.end_seam_block, scan_task.locators, scan_task.transparent_addresses, ) @@ -317,7 +318,8 @@ where #[derive(Debug)] pub(crate) struct ScanTask { scan_range: ScanRange, - previous_wallet_block: Option, + start_seam_block: Option, + end_seam_block: Option, locators: BTreeSet, transparent_addresses: HashMap, } @@ -325,13 +327,15 @@ pub(crate) struct ScanTask { impl ScanTask { pub(crate) fn from_parts( scan_range: ScanRange, - previous_wallet_block: Option, + start_seam_block: Option, + end_seam_block: Option, locators: BTreeSet, transparent_addresses: HashMap, ) -> Self { Self { scan_range, - previous_wallet_block, + start_seam_block, + end_seam_block, locators, transparent_addresses, } diff --git a/zingo-sync/src/sync/state.rs b/zingo-sync/src/sync/state.rs index 1486d3a34..64cff5e1a 100644 --- a/zingo-sync/src/sync/state.rs +++ b/zingo-sync/src/sync/state.rs @@ -87,6 +87,11 @@ pub(super) async fn update_scan_ranges( set_found_note_scan_ranges(sync_state, ShieldedProtocol::Orchard, locators.into_iter())?; set_chain_tip_scan_range(sync_state, chain_height)?; + let verification_height = sync_state.highest_scanned_height() + 1; + if verification_height <= chain_height { + set_verify_scan_range(sync_state, verification_height, VerifyEnd::VerifyLowest); + } + // TODO: add logic to merge scan ranges Ok(()) @@ -116,8 +121,6 @@ async fn create_scan_range( panic!("scan ranges should never be empty after updating"); } - set_verify_scan_range(sync_state, wallet_height + 1, VerifyEnd::VerifyLowest); - Ok(()) } @@ -484,10 +487,10 @@ where W: SyncWallet + SyncBlocks, { if let Some(scan_range) = select_scan_range(wallet.get_sync_state_mut().unwrap()) { - // TODO: disallow scanning without previous wallet block - let previous_wallet_block = wallet + let start_seam_block = wallet .get_wallet_block(scan_range.block_range().start - 1) .ok(); + let end_seam_block = wallet.get_wallet_block(scan_range.block_range().end).ok(); let locators = find_locators(wallet.get_sync_state().unwrap(), scan_range.block_range()); let transparent_addresses: HashMap = wallet @@ -499,7 +502,8 @@ where Ok(Some(ScanTask::from_parts( scan_range, - previous_wallet_block, + start_seam_block, + end_seam_block, locators, transparent_addresses, ))) From 627769a60817e327d027712fc6fee20f6c00866d Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 31 Dec 2024 10:44:15 +0000 Subject: [PATCH 02/18] fix clippy and combine located tree builds into one spawn blocking --- zingo-sync/src/scan.rs | 17 ++++++++--------- zingo-sync/src/sync.rs | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/zingo-sync/src/scan.rs b/zingo-sync/src/scan.rs index 12c8313c8..c9d827c15 100644 --- a/zingo-sync/src/scan.rs +++ b/zingo-sync/src/scan.rs @@ -152,6 +152,7 @@ impl DecryptedNoteData { /// `locators` are the block height and txid of transactions in the `scan_range` that are known to be relevant to the /// wallet and are appended to during scanning if trial decryption succeeds. If there are no known relevant transctions /// then `locators` will start empty. +#[allow(clippy::too_many_arguments)] pub(crate) async fn scan

( fetch_request_sender: mpsc::UnboundedSender, parameters: &P, @@ -228,15 +229,13 @@ where orchard_leaves_and_retentions, } = witness_data; - let sapling_located_trees = tokio::task::spawn_blocking(move || { - witness::build_located_trees(sapling_initial_position, sapling_leaves_and_retentions) - .unwrap() - }) - .await - .unwrap(); - let orchard_located_trees = tokio::task::spawn_blocking(move || { - witness::build_located_trees(orchard_initial_position, orchard_leaves_and_retentions) - .unwrap() + let (sapling_located_trees, orchard_located_trees) = tokio::task::spawn_blocking(move || { + ( + witness::build_located_trees(sapling_initial_position, sapling_leaves_and_retentions) + .unwrap(), + witness::build_located_trees(orchard_initial_position, orchard_leaves_and_retentions) + .unwrap(), + ) }) .await .unwrap(); diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 60ed542c6..bc6fc496d 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -52,7 +52,7 @@ where P: consensus::Parameters + Sync + Send + 'static, W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees, { - tracing::info!("Syncing wallet..."); + tracing::info!("Starting sync..."); // create channel for sending fetch requests and launch fetcher task let (fetch_request_sender, fetch_request_receiver) = mpsc::unbounded_channel(); From feb5d2abe9bb83dc161f061e7712d10af1c4689d Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 31 Dec 2024 11:20:28 +0000 Subject: [PATCH 03/18] revisit wallet data cleanup --- zingo-sync/src/scan/transactions.rs | 13 +++++++++---- zingo-sync/src/sync.rs | 14 ++------------ zingo-sync/src/sync/spend.rs | 4 ++-- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/zingo-sync/src/scan/transactions.rs b/zingo-sync/src/scan/transactions.rs index 1f4c2dcf2..7175bd366 100644 --- a/zingo-sync/src/scan/transactions.rs +++ b/zingo-sync/src/scan/transactions.rs @@ -518,8 +518,15 @@ fn collect_outpoints( +/// For each locator, fetch the spending transaction and then scan and append to the wallet transactions. +/// +/// This is only intended to be used for transactions that do not contain any incoming notes and therefore evaded +/// trial decryption. +/// For targetted rescan of transactions by locator, locators should be added to the wallet using the `TODO` API and +/// the `FoundNote` priorities will be automatically set for scan prioritisation. Transactions with incoming notes +/// are required to be scanned in the context of a scan task to correctly derive the nullifiers and positions for +/// spending. +pub(crate) async fn scan_spending_transactions( fetch_request_sender: mpsc::UnboundedSender, consensus_parameters: &P, wallet: &mut W, @@ -545,8 +552,6 @@ where } spending_locators.insert(locator); - // TODO: fetch block from server if not in wallet so wallet blocks added to wallet while scanning out of order - // don't need to be held in memory wallet_blocks.insert( block_height, wallet.get_wallet_block(block_height).expect( diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index bc6fc496d..7702ca008 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -444,12 +444,10 @@ where wallet .update_shard_trees(sapling_located_trees, orchard_located_trees) .unwrap(); - // TODO: add trait to save wallet data to persistence for in-memory wallets Ok(()) } -// TODO: replace this function with a filter on the data added to wallet fn remove_irrelevant_data(wallet: &mut W, scan_range: &ScanRange) -> Result<(), ()> where W: SyncWallet + SyncBlocks + SyncNullifiers + SyncTransactions, @@ -458,15 +456,7 @@ where return Ok(()); } - let wallet_height = wallet - .get_sync_state() - .unwrap() - .scan_ranges() - .last() - .expect("wallet should always have scan ranges after sync has started") - .block_range() - .end; - + let highest_scanned_height = wallet.get_sync_state().unwrap().highest_scanned_height(); let wallet_transaction_heights = wallet .get_wallet_transactions() .unwrap() @@ -475,7 +465,7 @@ where .collect::>(); wallet.get_wallet_blocks_mut().unwrap().retain(|height, _| { *height >= scan_range.block_range().end - 1 - || *height >= wallet_height - 100 + || *height >= highest_scanned_height - MAX_VERIFICATION_WINDOW || wallet_transaction_heights.contains(height) }); wallet diff --git a/zingo-sync/src/sync/spend.rs b/zingo-sync/src/sync/spend.rs index 44925b559..4391044c8 100644 --- a/zingo-sync/src/sync/spend.rs +++ b/zingo-sync/src/sync/spend.rs @@ -14,7 +14,7 @@ use zip32::AccountId; use crate::{ client::FetchRequest, primitives::{Locator, NullifierMap, OutPointMap, OutputId, WalletTransaction}, - scan::transactions::scan_located_transactions, + scan::transactions::scan_spending_transactions, traits::{SyncBlocks, SyncNullifiers, SyncOutPoints, SyncTransactions}, }; @@ -61,7 +61,7 @@ where .unwrap(); // in the edge case where a spending transaction received no change, scan the transactions that evaded trial decryption - scan_located_transactions( + scan_spending_transactions( fetch_request_sender, consensus_parameters, wallet, From 6d4daee330b4ccf1e90c67cbdb729d530818dfd8 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 31 Dec 2024 11:28:58 +0000 Subject: [PATCH 04/18] clear locators --- zingo-sync/src/sync.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 7702ca008..29af90378 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -179,8 +179,6 @@ where } } - // TODO: clear locators - drop(wallet_guard); drop(scanner); drop(fetch_request_sender); @@ -478,6 +476,11 @@ where .unwrap() .orchard_mut() .retain(|_, (height, _)| *height >= scan_range.block_range().end); + wallet + .get_sync_state_mut() + .unwrap() + .locators_mut() + .retain(|(height, _)| *height >= scan_range.block_range().end); Ok(()) } From 0a21d09ff0f012a3097bf84312d8887263caf7e9 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 31 Dec 2024 11:47:07 +0000 Subject: [PATCH 05/18] fix bug in --- libtonode-tests/tests/sync.rs | 2 ++ zingo-sync/src/sync.rs | 20 +++++++++----------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/libtonode-tests/tests/sync.rs b/libtonode-tests/tests/sync.rs index 7fba16f88..f1f706721 100644 --- a/libtonode-tests/tests/sync.rs +++ b/libtonode-tests/tests/sync.rs @@ -99,6 +99,8 @@ async fn sync_status() { }); sync_handle.await.unwrap(); + + dbg!(&lightclient.wallet.lock().await.wallet_blocks); } // temporary test for sync development diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 29af90378..8ab3d2734 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -263,13 +263,13 @@ where ) .await .unwrap(); - remove_irrelevant_data(wallet, &scan_range).unwrap(); state::set_scan_priority( wallet.get_sync_state_mut().unwrap(), scan_range.block_range(), ScanPriority::Scanned, ) .unwrap(); + remove_irrelevant_data(wallet).unwrap(); tracing::debug!("Scan results processed."); } Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => { @@ -446,15 +446,13 @@ where Ok(()) } -fn remove_irrelevant_data(wallet: &mut W, scan_range: &ScanRange) -> Result<(), ()> +fn remove_irrelevant_data(wallet: &mut W) -> Result<(), ()> where W: SyncWallet + SyncBlocks + SyncNullifiers + SyncTransactions, { - if scan_range.priority() != ScanPriority::Historic { - return Ok(()); - } - - let highest_scanned_height = wallet.get_sync_state().unwrap().highest_scanned_height(); + let sync_state = wallet.get_sync_state().unwrap(); + let fully_scanned_height = sync_state.fully_scanned_height(); + let highest_scanned_height = sync_state.highest_scanned_height(); let wallet_transaction_heights = wallet .get_wallet_transactions() .unwrap() @@ -462,7 +460,7 @@ where .filter_map(|tx| tx.confirmation_status().get_confirmed_height()) .collect::>(); wallet.get_wallet_blocks_mut().unwrap().retain(|height, _| { - *height >= scan_range.block_range().end - 1 + *height >= fully_scanned_height - 1 || *height >= highest_scanned_height - MAX_VERIFICATION_WINDOW || wallet_transaction_heights.contains(height) }); @@ -470,17 +468,17 @@ where .get_nullifiers_mut() .unwrap() .sapling_mut() - .retain(|_, (height, _)| *height >= scan_range.block_range().end); + .retain(|_, (height, _)| *height > fully_scanned_height); wallet .get_nullifiers_mut() .unwrap() .orchard_mut() - .retain(|_, (height, _)| *height >= scan_range.block_range().end); + .retain(|_, (height, _)| *height > fully_scanned_height); wallet .get_sync_state_mut() .unwrap() .locators_mut() - .retain(|(height, _)| *height >= scan_range.block_range().end); + .retain(|(height, _)| *height > fully_scanned_height); Ok(()) } From 6175ba09d847ba31acbbacd9fc69f2561288cb07 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 31 Dec 2024 11:59:10 +0000 Subject: [PATCH 06/18] add max re-org window --- zingo-sync/src/sync.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 8ab3d2734..257587124 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -138,6 +138,11 @@ where // TODO: implement an option for continuous scanning where it doesnt exit when complete let mut wallet_guard = wallet.lock().await; + let initial_verification_height = wallet_guard + .get_sync_state() + .unwrap() + .highest_scanned_height() + + 1; let mut interval = tokio::time::interval(Duration::from_millis(30)); loop { tokio::select! { @@ -149,6 +154,7 @@ where &ufvks, scan_range, scan_results, + initial_verification_height, ) .await .unwrap(); @@ -246,6 +252,7 @@ async fn process_scan_results( ufvks: &HashMap, scan_range: ScanRange, scan_results: Result, + initial_verification_height: BlockHeight, ) -> Result<(), SyncError> where P: consensus::Parameters, @@ -289,6 +296,15 @@ where state::VerifyEnd::VerifyHighest, ); truncate_wallet_data(wallet, scan_range_to_verify.block_range().start - 1).unwrap(); + + if initial_verification_height - scan_range_to_verify.block_range().start + > MAX_VERIFICATION_WINDOW + { + panic!( + "sync failed. re-org of larger than {} blocks detected", + MAX_VERIFICATION_WINDOW + ); + } } else { scan_results?; } From 03bbbc8b6c7e6689f82a1559d4e6429467330c08 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 31 Dec 2024 12:02:08 +0000 Subject: [PATCH 07/18] remove todo --- zingo-sync/src/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 257587124..2dc67161a 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -40,7 +40,7 @@ pub(crate) mod transparent; // TODO; replace fixed batches with variable batches with fixed memory size const BATCH_SIZE: u32 = 10_000; const VERIFY_BLOCK_RANGE_SIZE: u32 = 10; -const MAX_VERIFICATION_WINDOW: u32 = 100; // TODO: fail if re-org goes beyond this window +const MAX_VERIFICATION_WINDOW: u32 = 100; /// Syncs a wallet to the latest state of the blockchain pub async fn sync( From 0d371596e56bd195acfbc76d295adfd42b5ddd24 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Thu, 2 Jan 2025 02:16:33 +0000 Subject: [PATCH 08/18] get block range returns a stream --- libtonode-tests/tests/sync.rs | 2 -- zingo-sync/src/client.rs | 11 +++++++---- zingo-sync/src/client/fetch.rs | 17 +++++------------ zingo-sync/src/scan/task.rs | 19 +++++++++++++++++-- 4 files changed, 29 insertions(+), 20 deletions(-) diff --git a/libtonode-tests/tests/sync.rs b/libtonode-tests/tests/sync.rs index f1f706721..7fba16f88 100644 --- a/libtonode-tests/tests/sync.rs +++ b/libtonode-tests/tests/sync.rs @@ -99,8 +99,6 @@ async fn sync_status() { }); sync_handle.await.unwrap(); - - dbg!(&lightclient.wallet.lock().await.wallet_blocks); } // temporary test for sync development diff --git a/zingo-sync/src/client.rs b/zingo-sync/src/client.rs index ce94ac0af..5acc2282e 100644 --- a/zingo-sync/src/client.rs +++ b/zingo-sync/src/client.rs @@ -29,7 +29,10 @@ pub enum FetchRequest { /// Gets the height of the blockchain from the server. ChainTip(oneshot::Sender), /// Gets the specified range of compact blocks from the server (end exclusive). - CompactBlockRange(oneshot::Sender>, Range), + CompactBlockRange( + oneshot::Sender>, + Range, + ), /// Gets the tree states for a specified block height. TreeState(oneshot::Sender, BlockHeight), /// Get a full transaction by txid. @@ -74,14 +77,14 @@ pub async fn get_chain_height( pub async fn get_compact_block_range( fetch_request_sender: UnboundedSender, block_range: Range, -) -> Result, ()> { +) -> Result, ()> { let (reply_sender, reply_receiver) = oneshot::channel(); fetch_request_sender .send(FetchRequest::CompactBlockRange(reply_sender, block_range)) .unwrap(); - let compact_blocks = reply_receiver.await.unwrap(); + let block_stream = reply_receiver.await.unwrap(); - Ok(compact_blocks) + Ok(block_stream) } /// Gets the stream of shards (subtree roots) diff --git a/zingo-sync/src/client/fetch.rs b/zingo-sync/src/client/fetch.rs index 0b8339227..4cb364e2d 100644 --- a/zingo-sync/src/client/fetch.rs +++ b/zingo-sync/src/client/fetch.rs @@ -109,8 +109,8 @@ async fn fetch_from_server( } FetchRequest::CompactBlockRange(sender, block_range) => { tracing::debug!("Fetching compact blocks. {:?}", &block_range); - let compact_blocks = get_block_range(client, block_range).await.unwrap(); - sender.send(compact_blocks).unwrap(); + let block_stream = get_block_range(client, block_range).await.unwrap(); + sender.send(block_stream).unwrap(); } FetchRequest::GetSubtreeRoots(sender, start_index, shielded_protocol, max_entries) => { tracing::debug!( @@ -172,10 +172,7 @@ async fn get_latest_block( async fn get_block_range( client: &mut CompactTxStreamerClient, block_range: Range, -) -> Result, ()> { - let mut compact_blocks: Vec = - Vec::with_capacity(u64::from(block_range.end - block_range.start) as usize); - +) -> Result, ()> { let request = tonic::Request::new(BlockRange { start: Some(BlockId { height: u64::from(block_range.start), @@ -186,13 +183,9 @@ async fn get_block_range( hash: vec![], }), }); - let mut block_stream = client.get_block_range(request).await.unwrap().into_inner(); - - while let Some(compact_block) = block_stream.message().await.unwrap() { - compact_blocks.push(compact_block); - } + let block_stream = client.get_block_range(request).await.unwrap().into_inner(); - Ok(compact_blocks) + Ok(block_stream) } async fn get_subtree_roots( diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 26e3362f0..c3c45df1e 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -11,7 +11,10 @@ use tokio::{ task::{JoinError, JoinHandle}, }; -use zcash_client_backend::data_api::scanning::{ScanPriority, ScanRange}; +use zcash_client_backend::{ + data_api::scanning::{ScanPriority, ScanRange}, + proto::compact_formats::CompactBlock, +}; use zcash_keys::keys::UnifiedFullViewingKey; use zcash_primitives::{ consensus::{self}, @@ -19,7 +22,7 @@ use zcash_primitives::{ }; use crate::{ - client::FetchRequest, + client::{self, FetchRequest}, keys::transparent::TransparentAddressId, primitives::{Locator, WalletBlock}, sync, @@ -29,6 +32,7 @@ use crate::{ use super::{error::ScanError, scan, ScanResults}; const MAX_WORKER_POOLSIZE: usize = 2; +const MAX_BATCH_OUTPUTS: usize = 16384; // 2^14 pub(crate) enum ScannerState { Verification, @@ -256,6 +260,17 @@ where let handle = tokio::spawn(async move { while let Some(scan_task) = scan_task_receiver.recv().await { + let mut block_stream = client::get_compact_block_range( + fetch_request_sender.clone(), + scan_task.scan_range.block_range().clone(), + ) + .await + .unwrap(); + let mut compact_blocks: Vec = Vec::new(); + while let Some(compact_block) = block_stream.message().await.unwrap() { + compact_blocks.push(compact_block); + } + let scan_results = scan( fetch_request_sender.clone(), &consensus_parameters, From 0b6f68bd5eeb69c672c767b5ad981219ae5ed846 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Fri, 3 Jan 2025 07:14:14 +0000 Subject: [PATCH 09/18] implemented batcher --- zingo-sync/src/scan.rs | 38 ++++-- zingo-sync/src/scan/task.rs | 251 ++++++++++++++++++++++++++++-------- zingo-sync/src/sync.rs | 2 +- 3 files changed, 224 insertions(+), 67 deletions(-) diff --git a/zingo-sync/src/scan.rs b/zingo-sync/src/scan.rs index c9d827c15..0bd419b25 100644 --- a/zingo-sync/src/scan.rs +++ b/zingo-sync/src/scan.rs @@ -4,10 +4,11 @@ use std::{ }; use orchard::tree::MerkleHashOrchard; +use task::ScanTask; use tokio::sync::mpsc; use incrementalmerkletree::Position; -use zcash_client_backend::{data_api::scanning::ScanRange, proto::compact_formats::CompactBlock}; +use zcash_client_backend::proto::compact_formats::CompactBlock; use zcash_keys::keys::UnifiedFullViewingKey; use zcash_primitives::{ consensus::{BlockHeight, NetworkUpgrade, Parameters}, @@ -17,7 +18,6 @@ use zcash_primitives::{ use crate::{ client::{self, FetchRequest}, - keys::transparent::TransparentAddressId, primitives::{Locator, NullifierMap, OutPointMap, OutputId, WalletBlock, WalletTransaction}, witness::{self, LocatedTreeData, WitnessData}, }; @@ -157,21 +157,33 @@ pub(crate) async fn scan

( fetch_request_sender: mpsc::UnboundedSender, parameters: &P, ufvks: &HashMap, - scan_range: ScanRange, - start_seam_block: Option, - end_seam_block: Option, - mut locators: BTreeSet, - transparent_addresses: HashMap, + scan_task: ScanTask, ) -> Result where P: Parameters + Sync + Send + 'static, { - let compact_blocks = client::get_compact_block_range( - fetch_request_sender.clone(), - scan_range.block_range().clone(), - ) - .await - .unwrap(); + let ScanTask { + compact_blocks, + scan_range, + start_seam_block, + end_seam_block, + mut locators, + transparent_addresses, + } = scan_task; + + if compact_blocks + .first() + .expect("compacts blocks should not be empty") + .height + != scan_range.block_range().start.into() + || compact_blocks + .last() + .expect("compacts blocks should not be empty") + .height + != (scan_range.block_range().end - 1).into() + { + panic!("compact blocks do not match scan range!") + } let initial_scan_data = InitialScanData::new( fetch_request_sender.clone(), diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index c3c45df1e..5d91c3655 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -7,7 +7,7 @@ use std::{ }; use tokio::{ - sync::mpsc, + sync::mpsc::{self, error::TryRecvError}, task::{JoinError, JoinHandle}, }; @@ -52,6 +52,7 @@ impl ScannerState { pub(crate) struct Scanner

{ state: ScannerState, + batcher: Option, workers: Vec>, unique_id: usize, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, @@ -75,19 +76,42 @@ where Self { state: ScannerState::Verification, + batcher: None, workers, unique_id: 0, - consensus_parameters, scan_results_sender, fetch_request_sender, + consensus_parameters, ufvks, } } + pub(crate) fn launch(&mut self) { + self.spawn_batcher(); + self.spawn_workers(); + } + pub(crate) fn worker_poolsize(&self) -> usize { self.workers.len() } + /// Spawns the batcher. + /// + /// When the batcher is running it will wait for a scan task. + pub(crate) fn spawn_batcher(&mut self) { + tracing::debug!("Spawning batcher"); + let mut batcher = Batcher::new(self.fetch_request_sender.clone()); + batcher.run().unwrap(); + self.batcher = Some(batcher); + } + + async fn shutdown_batcher(&mut self) -> Result<(), JoinError> { + let mut batcher = self.batcher.take().expect("batcher should exist!"); + batcher.shutdown().await?; + + Ok(()) + } + /// Spawns a worker. /// /// When the worker is running it will wait for a scan task. @@ -96,7 +120,6 @@ where let mut worker = ScanWorker::new( self.unique_id, self.consensus_parameters.clone(), - None, self.scan_results_sender.clone(), self.fetch_request_sender.clone(), self.ufvks.clone(), @@ -144,14 +167,20 @@ where /// Updates the scanner. /// /// If verification is still in progress, only create scan tasks with `Verify` scan priority. - /// If there is an idle worker, create a new scan task and add to worker. - /// If there are no more range available to scan, shutdown the idle workers. + /// If there are no batches ready and the batcher is idle, + /// If there are no more range available to scan, shutdown the batcher, idle workers and mempool. pub(crate) async fn update(&mut self, wallet: &mut W, shutdown_mempool: Arc) where W: SyncWallet + SyncBlocks, { match self.state { ScannerState::Verification => { + self.batcher + .as_mut() + .expect("batcher should be running") + .update_batch_store(); + self.update_workers(); + let sync_state = wallet.get_sync_state().unwrap(); if !sync_state .scan_ranges() @@ -173,24 +202,16 @@ where } // scan ranges with `Verify` priority - if let Some(worker) = self.idle_worker() { - let scan_task = sync::state::create_scan_task(wallet) - .unwrap() - .expect("scan range with `Verify` priority must exist!"); - - assert_eq!(scan_task.scan_range.priority(), ScanPriority::Verify); - worker.add_scan_task(scan_task).unwrap(); - } + self.update_batcher(wallet); } ScannerState::Scan => { - // create scan tasks until all ranges are scanned or currently scanning - if let Some(worker) = self.idle_worker() { - if let Some(scan_task) = sync::state::create_scan_task(wallet).unwrap() { - worker.add_scan_task(scan_task).unwrap(); - } else if wallet.get_sync_state().unwrap().scan_complete() { - self.state.scan_completed(); - } - } + // create scan tasks for batching and scanning until all ranges are scanned + self.batcher + .as_mut() + .expect("batcher should be running") + .update_batch_store(); + self.update_workers(); + self.update_batcher(wallet); } ScannerState::Shutdown => { // shutdown mempool @@ -198,10 +219,13 @@ where // shutdown idle workers while let Some(worker) = self.idle_worker() { - self.shutdown_worker(worker.id) - .await - .expect("worker should be in worker pool"); + self.shutdown_worker(worker.id).await.unwrap(); } + + // shutdown batcher + self.shutdown_batcher() + .await + .expect("batcher should not fail!"); } } @@ -209,6 +233,142 @@ where panic!("worker pool should not be empty with unscanned ranges!") } } + + fn update_workers(&mut self) { + let batcher = self.batcher.as_ref().expect("batcher should be running"); + if batcher.batch.is_some() { + if let Some(worker) = self.idle_worker() { + let batch = batcher + .batch + .clone() + .expect("batch should exist in this closure"); + worker.add_scan_task(batch); + self.batcher + .as_mut() + .expect("batcher should be running") + .batch = None; + } + } + } + + fn update_batcher(&mut self, wallet: &mut W) + where + W: SyncWallet + SyncBlocks, + { + let batcher = self.batcher.as_ref().expect("batcher should be running"); + if !batcher.is_batching() { + if let Some(scan_task) = sync::state::create_scan_task(wallet).unwrap() { + batcher.add_scan_task(scan_task); + } else if wallet.get_sync_state().unwrap().scan_complete() { + self.state.scan_completed(); + } + } + } +} + +struct Batcher { + handle: Option>, + is_batching: Arc, + batch: Option, + scan_task_sender: Option>, + batch_receiver: Option>, + fetch_request_sender: mpsc::UnboundedSender, +} + +impl Batcher { + fn new(fetch_request_sender: mpsc::UnboundedSender) -> Self { + Self { + handle: None, + is_batching: Arc::new(AtomicBool::new(false)), + batch: None, + scan_task_sender: None, + batch_receiver: None, + fetch_request_sender, + } + } + + /// Runs the batcher in a new tokio task. + /// + /// Waits for a scan task and then fetches compact blocks to form fixed output batches. The scan task is split if + /// needed and the compact blocks are added to each scan task and sent to the scan workers for scanning. + fn run(&mut self) -> Result<(), ()> { + let (scan_task_sender, mut scan_task_receiver) = mpsc::channel::(1); + let (batch_sender, batch_receiver) = mpsc::channel::(1); + + let is_batching = self.is_batching.clone(); + let fetch_request_sender = self.fetch_request_sender.clone(); + + let handle = tokio::spawn(async move { + while let Some(mut scan_task) = scan_task_receiver.recv().await { + let mut block_stream = client::get_compact_block_range( + fetch_request_sender.clone(), + scan_task.scan_range.block_range().clone(), + ) + .await + .unwrap(); + while let Some(compact_block) = block_stream.message().await.unwrap() { + scan_task.compact_blocks.push(compact_block); + } + + batch_sender + .send(scan_task) + .await + .expect("receiver should never be dropped before sender!"); + + is_batching.store(false, atomic::Ordering::Release); + } + }); + + self.handle = Some(handle); + self.scan_task_sender = Some(scan_task_sender); + self.batch_receiver = Some(batch_receiver); + + Ok(()) + } + + fn is_batching(&self) -> bool { + self.is_batching.load(atomic::Ordering::Acquire) + } + + fn add_scan_task(&self, scan_task: ScanTask) { + tracing::debug!("Adding scan task to batcher:\n{:#?}", &scan_task); + self.scan_task_sender + .clone() + .expect("batcher should be running") + .try_send(scan_task) + .expect("batcher should never be sent multiple tasks at one time"); + self.is_batching.store(true, atomic::Ordering::Release); + } + + fn update_batch_store(&mut self) { + let batch_receiver = self + .batch_receiver + .as_mut() + .expect("batcher should be running"); + if self.batch.is_none() && !batch_receiver.is_empty() { + self.batch = Some( + batch_receiver + .try_recv() + .expect("channel should be non-empty!"), + ); + } + } + + /// Shuts down batcher by dropping the sender to the batcher task and awaiting the handle. + /// + /// This should always be called in the context of the scanner as it must be also be taken from the Scanner struct. + async fn shutdown(&mut self) -> Result<(), JoinError> { + tracing::debug!("Shutting down batcher"); + if let Some(sender) = self.scan_task_sender.take() { + drop(sender); + } + let handle = self + .handle + .take() + .expect("batcher should always have a handle to take!"); + + handle.await + } } struct ScanWorker

{ @@ -229,7 +389,6 @@ where fn new( id: usize, consensus_parameters: P, - scan_task_sender: Option>, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, ufvks: HashMap, @@ -239,7 +398,7 @@ where handle: None, is_scanning: Arc::new(AtomicBool::new(false)), consensus_parameters, - scan_task_sender, + scan_task_sender: None, scan_results_sender, fetch_request_sender, ufvks, @@ -260,31 +419,17 @@ where let handle = tokio::spawn(async move { while let Some(scan_task) = scan_task_receiver.recv().await { - let mut block_stream = client::get_compact_block_range( - fetch_request_sender.clone(), - scan_task.scan_range.block_range().clone(), - ) - .await - .unwrap(); - let mut compact_blocks: Vec = Vec::new(); - while let Some(compact_block) = block_stream.message().await.unwrap() { - compact_blocks.push(compact_block); - } - + let scan_range = scan_task.scan_range.clone(); let scan_results = scan( fetch_request_sender.clone(), &consensus_parameters, &ufvks, - scan_task.scan_range.clone(), - scan_task.start_seam_block, - scan_task.end_seam_block, - scan_task.locators, - scan_task.transparent_addresses, + scan_task, ) .await; scan_results_sender - .send((scan_task.scan_range, scan_results)) + .send((scan_range, scan_results)) .expect("receiver should never be dropped before sender!"); is_scanning.store(false, atomic::Ordering::Release); @@ -301,16 +446,14 @@ where self.is_scanning.load(atomic::Ordering::Acquire) } - fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { + fn add_scan_task(&self, scan_task: ScanTask) { tracing::debug!("Adding scan task to worker {}:\n{:#?}", self.id, &scan_task); self.scan_task_sender .clone() - .unwrap() + .expect("worker should be running") .try_send(scan_task) .expect("worker should never be sent multiple tasks at one time"); self.is_scanning.store(true, atomic::Ordering::Release); - - Ok(()) } /// Shuts down worker by dropping the sender to the worker task and awaiting the handle. @@ -330,13 +473,14 @@ where } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct ScanTask { - scan_range: ScanRange, - start_seam_block: Option, - end_seam_block: Option, - locators: BTreeSet, - transparent_addresses: HashMap, + pub(crate) compact_blocks: Vec, + pub(crate) scan_range: ScanRange, + pub(crate) start_seam_block: Option, + pub(crate) end_seam_block: Option, + pub(crate) locators: BTreeSet, + pub(crate) transparent_addresses: HashMap, } impl ScanTask { @@ -348,6 +492,7 @@ impl ScanTask { transparent_addresses: HashMap, ) -> Self { Self { + compact_blocks: Vec::new(), scan_range, start_seam_block, end_seam_block, diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 2dc67161a..1947aa539 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -132,7 +132,7 @@ where fetch_request_sender.clone(), ufvks.clone(), ); - scanner.spawn_workers(); + scanner.launch(); // TODO: invalidate any pending transactions after eviction height (40 below best chain height?) // TODO: implement an option for continuous scanning where it doesnt exit when complete From 182eb2f3de05770b18d80bfadb20a999fc786bb3 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 7 Jan 2025 02:28:59 +0000 Subject: [PATCH 10/18] added fixed output batching --- zingo-sync/src/scan/task.rs | 96 +++++++++++++++++++++++++++++++++++-- 1 file changed, 92 insertions(+), 4 deletions(-) diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 5d91c3655..8898538d7 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -17,7 +17,8 @@ use zcash_client_backend::{ }; use zcash_keys::keys::UnifiedFullViewingKey; use zcash_primitives::{ - consensus::{self}, + consensus::{self, BlockHeight}, + transaction::TxId, zip32::AccountId, }; @@ -166,9 +167,12 @@ where /// Updates the scanner. /// - /// If verification is still in progress, only create scan tasks with `Verify` scan priority. - /// If there are no batches ready and the batcher is idle, - /// If there are no more range available to scan, shutdown the batcher, idle workers and mempool. + /// Creates a new scan task and sends to batcher if it's idle. + /// The batcher will stream compact blocks into the scan task, splitting the scan task when the maximum number of + /// outputs is reached. When a scan task is ready is it stored in the batcher ready to be taken by an idle scan + /// worker for scanning. + /// If verification is still in progress, only scan tasks with `Verify` scan priority are created. + /// If there are no more ranges available to scan, the batcher, idle workers and mempool are shutdown. pub(crate) async fn update(&mut self, wallet: &mut W, shutdown_mempool: Arc) where W: SyncWallet + SyncBlocks, @@ -299,7 +303,16 @@ impl Batcher { let fetch_request_sender = self.fetch_request_sender.clone(); let handle = tokio::spawn(async move { + // save seam blocks between scan tasks for linear scanning continuuity checks + // during non-linear scanning the wallet blocks from the scanned ranges will already be saved in the wallet + let mut first_block: Option = None; + let mut last_block: Option = None; + // TODO: finish seam block logic + while let Some(mut scan_task) = scan_task_receiver.recv().await { + let mut sapling_output_count = 0; + let mut orchard_output_count = 0; + let mut block_stream = client::get_compact_block_range( fetch_request_sender.clone(), scan_task.scan_range.block_range().clone(), @@ -307,6 +320,45 @@ impl Batcher { .await .unwrap(); while let Some(compact_block) = block_stream.message().await.unwrap() { + sapling_output_count += compact_block + .vtx + .iter() + .fold(0, |acc, transaction| acc + transaction.outputs.len()); + orchard_output_count += compact_block + .vtx + .iter() + .fold(0, |acc, transaction| acc + transaction.actions.len()); + + if sapling_output_count + orchard_output_count > MAX_BATCH_OUTPUTS { + let new_batch_first_block = WalletBlock::from_parts( + compact_block.height(), + compact_block.hash(), + compact_block.prev_hash(), + 0, + Vec::new(), + 0, + 0, + ); + + let (full_batch, new_batch) = scan_task + .clone() + .split( + new_batch_first_block.block_height(), + Some(new_batch_first_block), + None, + ) + .unwrap(); + + batch_sender + .send(full_batch) + .await + .expect("receiver should never be dropped before sender!"); + + scan_task = new_batch; + sapling_output_count = 0; + orchard_output_count = 0; + } + scan_task.compact_blocks.push(compact_block); } @@ -500,4 +552,40 @@ impl ScanTask { transparent_addresses, } } + + fn split( + self, + block_height: BlockHeight, + lower_task_end_seam_block: Option, + upper_task_start_seam_block: Option, + ) -> Result<(Self, Self), ()> { + if block_height > self.scan_range.block_range().start + && block_height < self.scan_range.block_range().end + { + panic!("block height should be within scan tasks block range!"); + } + + let mut lower_task_locators = self.locators; + let upper_task_locators = + lower_task_locators.split_off(&(block_height, TxId::from_bytes([0; 32]))); + + Ok(( + ScanTask { + compact_blocks: self.compact_blocks, + scan_range: self.scan_range.truncate_end(block_height).unwrap(), + start_seam_block: self.start_seam_block, + end_seam_block: lower_task_end_seam_block, + locators: lower_task_locators, + transparent_addresses: self.transparent_addresses.clone(), + }, + ScanTask { + compact_blocks: Vec::new(), + scan_range: self.scan_range.truncate_start(block_height).unwrap(), + start_seam_block: upper_task_start_seam_block, + end_seam_block: self.end_seam_block, + locators: upper_task_locators, + transparent_addresses: self.transparent_addresses, + }, + )) + } } From 69fe10fd9f09e4f159ab0cf817e7063d670eda8f Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 7 Jan 2025 05:02:28 +0000 Subject: [PATCH 11/18] start work on setting scanned ranges correctly in post-scan processing --- zingo-sync/src/scan/task.rs | 6 +- zingo-sync/src/sync.rs | 6 +- zingo-sync/src/sync/state.rs | 106 +++++++++++++++++++++++------------ 3 files changed, 75 insertions(+), 43 deletions(-) diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 8898538d7..a50e19fe7 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -7,7 +7,7 @@ use std::{ }; use tokio::{ - sync::mpsc::{self, error::TryRecvError}, + sync::mpsc, task::{JoinError, JoinHandle}, }; @@ -559,8 +559,8 @@ impl ScanTask { lower_task_end_seam_block: Option, upper_task_start_seam_block: Option, ) -> Result<(Self, Self), ()> { - if block_height > self.scan_range.block_range().start - && block_height < self.scan_range.block_range().end + if block_height < self.scan_range.block_range().start + && block_height > self.scan_range.block_range().end - 1 { panic!("block height should be within scan tasks block range!"); } diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 1947aa539..c5f8a9fb9 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -37,8 +37,6 @@ pub(crate) mod state; pub(crate) mod transparent; // TODO: move parameters to config module -// TODO; replace fixed batches with variable batches with fixed memory size -const BATCH_SIZE: u32 = 10_000; const VERIFY_BLOCK_RANGE_SIZE: u32 = 10; const MAX_VERIFICATION_WINDOW: u32 = 100; @@ -270,9 +268,9 @@ where ) .await .unwrap(); - state::set_scan_priority( + state::punch_scan_priority( wallet.get_sync_state_mut().unwrap(), - scan_range.block_range(), + scan_range.block_range().clone(), ScanPriority::Scanned, ) .unwrap(); diff --git a/zingo-sync/src/sync/state.rs b/zingo-sync/src/sync/state.rs index 64cff5e1a..b31f3f2aa 100644 --- a/zingo-sync/src/sync/state.rs +++ b/zingo-sync/src/sync/state.rs @@ -23,7 +23,7 @@ use crate::{ traits::{SyncBlocks, SyncWallet}, }; -use super::{BATCH_SIZE, VERIFY_BLOCK_RANGE_SIZE}; +use super::VERIFY_BLOCK_RANGE_SIZE; /// Used to determine which end of the scan range is verified. pub(super) enum VerifyEnd { @@ -194,6 +194,30 @@ pub(super) fn set_verify_scan_range( scan_range_to_verify } +/// Punches in the chain tip block range with `ScanPriority::ChainTip`. +/// +/// Determines the chain tip block range by finding the lowest start height of the latest incomplete shard for each +/// shielded protocol. +fn set_chain_tip_scan_range( + sync_state: &mut SyncState, + chain_height: BlockHeight, +) -> Result<(), ()> { + let sapling_incomplete_shard = + determine_block_range(sync_state, chain_height, ShieldedProtocol::Sapling); + let orchard_incomplete_shard = + determine_block_range(sync_state, chain_height, ShieldedProtocol::Orchard); + + let chain_tip = if sapling_incomplete_shard.start < orchard_incomplete_shard.start { + sapling_incomplete_shard + } else { + orchard_incomplete_shard + }; + + punch_scan_priority(sync_state, chain_tip, ScanPriority::ChainTip).unwrap(); + + Ok(()) +} + /// Punches in the `shielded_protocol` shard block ranges surrounding each locator with `ScanPriority::FoundNote`. pub(super) fn set_found_note_scan_ranges>( sync_state: &mut SyncState, @@ -220,27 +244,19 @@ pub(super) fn set_found_note_scan_range( Ok(()) } -/// Punches in the chain tip block range with `ScanPriority::ChainTip`. -/// -/// Determines the chain tip block range by finding the lowest start height of the latest incomplete shard for each -/// shielded protocol. -fn set_chain_tip_scan_range( +pub(super) fn set_scanned_scan_range( sync_state: &mut SyncState, - chain_height: BlockHeight, + scanned_range: Range, ) -> Result<(), ()> { - let sapling_incomplete_shard = - determine_block_range(sync_state, chain_height, ShieldedProtocol::Sapling); - let orchard_incomplete_shard = - determine_block_range(sync_state, chain_height, ShieldedProtocol::Orchard); + let scan_ranges = sync_state.scan_ranges_mut(); - let chain_tip = if sapling_incomplete_shard.start < orchard_incomplete_shard.start { - sapling_incomplete_shard - } else { - orchard_incomplete_shard + let Some((index, scan_range)) = scan_ranges.iter().enumerate().find(|(_, scan_range)| { + scan_range.block_range().contains(&scanned_range.start) + && scan_range.block_range().contains(&scanned_range.end) + }) else { + panic!("scan range containing scanned range should exist!"); }; - punch_scan_priority(sync_state, chain_tip, ScanPriority::ChainTip).unwrap(); - Ok(()) } @@ -276,7 +292,7 @@ pub(super) fn set_scan_priority( /// Any scan ranges that fully contain the `block_range` will be split out with the given `scan_priority`. /// Any scan ranges with `Ignored` (Scanning) or `Scanned` priority or with higher (or equal) priority than /// `scan_priority` will be ignored. -fn punch_scan_priority( +pub(crate) fn punch_scan_priority( sync_state: &mut SyncState, block_range: Range, scan_priority: ScanPriority, @@ -454,25 +470,43 @@ fn select_scan_range(sync_state: &mut SyncState) -> Option { } let selected_priority = highest_priority_scan_range.priority(); - // TODO: fixed memory batching - let batch_block_range = Range { - start: highest_priority_scan_range.block_range().start, - end: highest_priority_scan_range.block_range().start + BATCH_SIZE, - }; - let split_ranges = split_out_scan_range( - highest_priority_scan_range, - batch_block_range, - ScanPriority::Ignored, - ); - let selected_block_range = split_ranges - .first() - .expect("split ranges should always be non-empty") - .block_range() - .clone(); - sync_state - .scan_ranges_mut() - .splice(index..=index, split_ranges); + // historic scan ranges can be larger than a shard block range so must be split out. + // otherwise, just set the scan priority of selected range to `Ignored` (scanning) in sync state. + let selected_block_range = if selected_priority == ScanPriority::Historic { + let shard_block_range = determine_block_range( + &sync_state, + highest_priority_scan_range.block_range().start, + ShieldedProtocol::Orchard, + ); + let split_ranges = split_out_scan_range( + highest_priority_scan_range, + shard_block_range, + ScanPriority::Ignored, + ); + let selected_block_range = split_ranges + .first() + .expect("split ranges should always be non-empty") + .block_range() + .clone(); + sync_state + .scan_ranges_mut() + .splice(index..=index, split_ranges); + + selected_block_range + } else { + let selected_scan_range = sync_state + .scan_ranges_mut() + .get_mut(index) + .expect("scan range should exist due to previous logic"); + + *selected_scan_range = ScanRange::from_parts( + highest_priority_scan_range.block_range().clone(), + ScanPriority::Ignored, + ); + + selected_scan_range.block_range().clone() + }; // TODO: when this library has its own version of ScanRange this can be simplified and more readable Some(ScanRange::from_parts( From 24aa381c555495a72284e6e199ffe612b2994de3 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 7 Jan 2025 13:49:03 +0000 Subject: [PATCH 12/18] improve scan task split --- zingo-sync/src/scan/task.rs | 81 ++++++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 33 deletions(-) diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index a50e19fe7..18d3bc9f2 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -171,8 +171,8 @@ where /// The batcher will stream compact blocks into the scan task, splitting the scan task when the maximum number of /// outputs is reached. When a scan task is ready is it stored in the batcher ready to be taken by an idle scan /// worker for scanning. - /// If verification is still in progress, only scan tasks with `Verify` scan priority are created. - /// If there are no more ranges available to scan, the batcher, idle workers and mempool are shutdown. + /// When verification is still in progress, only scan tasks with `Verify` scan priority are created. + /// When all ranges are scanned, the batcher, idle workers and mempool are shutdown. pub(crate) async fn update(&mut self, wallet: &mut W, shutdown_mempool: Arc) where W: SyncWallet + SyncBlocks, @@ -209,7 +209,6 @@ where self.update_batcher(wallet); } ScannerState::Scan => { - // create scan tasks for batching and scanning until all ranges are scanned self.batcher .as_mut() .expect("batcher should be running") @@ -218,15 +217,10 @@ where self.update_batcher(wallet); } ScannerState::Shutdown => { - // shutdown mempool shutdown_mempool.store(true, atomic::Ordering::Release); - - // shutdown idle workers while let Some(worker) = self.idle_worker() { self.shutdown_worker(worker.id).await.unwrap(); } - - // shutdown batcher self.shutdown_batcher() .await .expect("batcher should not fail!"); @@ -330,23 +324,14 @@ impl Batcher { .fold(0, |acc, transaction| acc + transaction.actions.len()); if sapling_output_count + orchard_output_count > MAX_BATCH_OUTPUTS { - let new_batch_first_block = WalletBlock::from_parts( - compact_block.height(), - compact_block.hash(), - compact_block.prev_hash(), - 0, - Vec::new(), - 0, - 0, - ); - let (full_batch, new_batch) = scan_task .clone() - .split( - new_batch_first_block.block_height(), - Some(new_batch_first_block), - None, - ) + .split(BlockHeight::from_u32( + compact_block + .height + .try_into() + .expect("should never overflow"), + )) .unwrap(); batch_sender @@ -553,35 +538,65 @@ impl ScanTask { } } - fn split( - self, - block_height: BlockHeight, - lower_task_end_seam_block: Option, - upper_task_start_seam_block: Option, - ) -> Result<(Self, Self), ()> { + /// Splits a scan task into two at `block_height`. + /// + /// Panics if `block_height` is not contained in the scan task's block range. + fn split(self, block_height: BlockHeight) -> Result<(Self, Self), ()> { if block_height < self.scan_range.block_range().start && block_height > self.scan_range.block_range().end - 1 { panic!("block height should be within scan tasks block range!"); } + let mut lower_compact_blocks = self.compact_blocks; + let upper_compact_blocks = if let Some(index) = lower_compact_blocks + .iter() + .position(|block| block.height == block_height.into()) + { + lower_compact_blocks.split_off(index) + } else { + Vec::new() + }; + let mut lower_task_locators = self.locators; let upper_task_locators = lower_task_locators.split_off(&(block_height, TxId::from_bytes([0; 32]))); + let lower_task_last_block = lower_compact_blocks.last().map(|block| { + WalletBlock::from_parts( + block.height(), + block.hash(), + block.prev_hash(), + 0, + Vec::new(), + 0, + 0, + ) + }); + let upper_task_first_block = upper_compact_blocks.first().map(|block| { + WalletBlock::from_parts( + block.height(), + block.hash(), + block.prev_hash(), + 0, + Vec::new(), + 0, + 0, + ) + }); Ok(( ScanTask { - compact_blocks: self.compact_blocks, + compact_blocks: lower_compact_blocks, scan_range: self.scan_range.truncate_end(block_height).unwrap(), start_seam_block: self.start_seam_block, - end_seam_block: lower_task_end_seam_block, + end_seam_block: upper_task_first_block, locators: lower_task_locators, transparent_addresses: self.transparent_addresses.clone(), }, ScanTask { - compact_blocks: Vec::new(), + compact_blocks: upper_compact_blocks, scan_range: self.scan_range.truncate_start(block_height).unwrap(), - start_seam_block: upper_task_start_seam_block, + start_seam_block: lower_task_last_block, end_seam_block: self.end_seam_block, locators: upper_task_locators, transparent_addresses: self.transparent_addresses, From 59dfbefc3b4b24e62d54fc69248c9efc70f12b00 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 7 Jan 2025 15:22:19 +0000 Subject: [PATCH 13/18] complete batcher with linear scanning continuity checks --- zingo-sync/src/scan/compact_blocks.rs | 5 +- zingo-sync/src/scan/task.rs | 84 ++++++++++++++++++++++----- zingo-sync/src/sync.rs | 3 +- zingo-sync/src/sync/state.rs | 23 +++++--- 4 files changed, 89 insertions(+), 26 deletions(-) diff --git a/zingo-sync/src/scan/compact_blocks.rs b/zingo-sync/src/scan/compact_blocks.rs index 568bfa220..b322d6ecd 100644 --- a/zingo-sync/src/scan/compact_blocks.rs +++ b/zingo-sync/src/scan/compact_blocks.rs @@ -30,7 +30,7 @@ use super::{ mod runners; // TODO: move parameters to config module -const TRIAL_DECRYPT_TASK_SIZE: usize = 1_000; +const TRIAL_DECRYPT_TASK_SIZE: usize = 1_024; // 2^10 pub(crate) fn scan_compact_blocks

( compact_blocks: Vec, @@ -61,8 +61,7 @@ where let mut sapling_tree_size = initial_scan_data.sapling_initial_tree_size; let mut orchard_tree_size = initial_scan_data.orchard_initial_tree_size; for block in &compact_blocks { - let block_height = - BlockHeight::from_u32(block.height.try_into().expect("should never overflow")); + let block_height = block.height(); let mut transactions = block.vtx.iter().peekable(); while let Some(transaction) = transactions.next() { // collect trial decryption results by transaction diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 18d3bc9f2..016eb902f 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -33,7 +33,7 @@ use crate::{ use super::{error::ScanError, scan, ScanResults}; const MAX_WORKER_POOLSIZE: usize = 2; -const MAX_BATCH_OUTPUTS: usize = 16384; // 2^14 +const MAX_BATCH_OUTPUTS: usize = 8_192; // 2^13 pub(crate) enum ScannerState { Verification, @@ -299,13 +299,13 @@ impl Batcher { let handle = tokio::spawn(async move { // save seam blocks between scan tasks for linear scanning continuuity checks // during non-linear scanning the wallet blocks from the scanned ranges will already be saved in the wallet - let mut first_block: Option = None; - let mut last_block: Option = None; - // TODO: finish seam block logic + let mut previous_task_first_block: Option = None; + let mut previous_task_last_block: Option = None; while let Some(mut scan_task) = scan_task_receiver.recv().await { let mut sapling_output_count = 0; let mut orchard_output_count = 0; + let mut first_batch = true; let mut block_stream = client::get_compact_block_range( fetch_request_sender.clone(), @@ -314,6 +314,53 @@ impl Batcher { .await .unwrap(); while let Some(compact_block) = block_stream.message().await.unwrap() { + if let Some(block) = previous_task_last_block.as_ref() { + if scan_task.start_seam_block.is_none() + && scan_task.scan_range.block_range().start == block.block_height() + 1 + { + scan_task.start_seam_block = previous_task_last_block.clone(); + } + } + if let Some(block) = previous_task_first_block.as_ref() { + if scan_task.end_seam_block.is_none() + && scan_task.scan_range.block_range().end == block.block_height() + { + scan_task.end_seam_block = previous_task_first_block.clone(); + } + } + + if first_batch { + // TODO: check conditions where chain metadata is none + let chain_metadata = compact_block + .chain_metadata + .expect("chain metadata should always exist"); + previous_task_first_block = Some(WalletBlock::from_parts( + compact_block.height(), + compact_block.hash(), + compact_block.prev_hash(), + compact_block.time, + compact_block.vtx.iter().map(|tx| tx.txid()).collect(), + chain_metadata.sapling_commitment_tree_size, + chain_metadata.orchard_commitment_tree_size, + )); + first_batch = false; + } + if compact_block.height() == scan_task.scan_range.block_range().end - 1 { + // TODO: check conditions where chain metadata is none + let chain_metadata = compact_block + .chain_metadata + .expect("chain metadata should always exist"); + previous_task_last_block = Some(WalletBlock::from_parts( + compact_block.height(), + compact_block.hash(), + compact_block.prev_hash(), + compact_block.time, + compact_block.vtx.iter().map(|tx| tx.txid()).collect(), + chain_metadata.sapling_commitment_tree_size, + chain_metadata.orchard_commitment_tree_size, + )); + } + sapling_output_count += compact_block .vtx .iter() @@ -322,7 +369,6 @@ impl Batcher { .vtx .iter() .fold(0, |acc, transaction| acc + transaction.actions.len()); - if sapling_output_count + orchard_output_count > MAX_BATCH_OUTPUTS { let (full_batch, new_batch) = scan_task .clone() @@ -551,7 +597,7 @@ impl ScanTask { let mut lower_compact_blocks = self.compact_blocks; let upper_compact_blocks = if let Some(index) = lower_compact_blocks .iter() - .position(|block| block.height == block_height.into()) + .position(|block| block.height() == block_height) { lower_compact_blocks.split_off(index) } else { @@ -563,25 +609,35 @@ impl ScanTask { lower_task_locators.split_off(&(block_height, TxId::from_bytes([0; 32]))); let lower_task_last_block = lower_compact_blocks.last().map(|block| { + // TODO: check conditions where chain metadata is none + let chain_metadata = block + .chain_metadata + .expect("chain metadata should always exist"); + WalletBlock::from_parts( block.height(), block.hash(), block.prev_hash(), - 0, - Vec::new(), - 0, - 0, + block.time, + block.vtx.iter().map(|tx| tx.txid()).collect(), + chain_metadata.sapling_commitment_tree_size, + chain_metadata.orchard_commitment_tree_size, ) }); let upper_task_first_block = upper_compact_blocks.first().map(|block| { + // TODO: check conditions where chain metadata is none + let chain_metadata = block + .chain_metadata + .expect("chain metadata should always exist"); + WalletBlock::from_parts( block.height(), block.hash(), block.prev_hash(), - 0, - Vec::new(), - 0, - 0, + block.time, + block.vtx.iter().map(|tx| tx.txid()).collect(), + chain_metadata.sapling_commitment_tree_size, + chain_metadata.orchard_commitment_tree_size, ) }); Ok(( diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index c5f8a9fb9..ff6bfa387 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -268,10 +268,9 @@ where ) .await .unwrap(); - state::punch_scan_priority( + state::set_scanned_scan_range( wallet.get_sync_state_mut().unwrap(), scan_range.block_range().clone(), - ScanPriority::Scanned, ) .unwrap(); remove_irrelevant_data(wallet).unwrap(); diff --git a/zingo-sync/src/sync/state.rs b/zingo-sync/src/sync/state.rs index b31f3f2aa..c9833bf74 100644 --- a/zingo-sync/src/sync/state.rs +++ b/zingo-sync/src/sync/state.rs @@ -252,11 +252,20 @@ pub(super) fn set_scanned_scan_range( let Some((index, scan_range)) = scan_ranges.iter().enumerate().find(|(_, scan_range)| { scan_range.block_range().contains(&scanned_range.start) - && scan_range.block_range().contains(&scanned_range.end) + && scan_range.block_range().contains(&(scanned_range.end - 1)) }) else { panic!("scan range containing scanned range should exist!"); }; + let split_ranges = split_out_scan_range( + scan_range.clone(), + scanned_range.clone(), + ScanPriority::Scanned, + ); + sync_state + .scan_ranges_mut() + .splice(index..=index, split_ranges); + Ok(()) } @@ -292,7 +301,7 @@ pub(super) fn set_scan_priority( /// Any scan ranges that fully contain the `block_range` will be split out with the given `scan_priority`. /// Any scan ranges with `Ignored` (Scanning) or `Scanned` priority or with higher (or equal) priority than /// `scan_priority` will be ignored. -pub(crate) fn punch_scan_priority( +fn punch_scan_priority( sync_state: &mut SyncState, block_range: Range, scan_priority: ScanPriority, @@ -310,7 +319,7 @@ pub(crate) fn punch_scan_priority( match ( block_range.contains(&scan_range.block_range().start), - block_range.contains(&scan_range.block_range().end), + block_range.contains(&(scan_range.block_range().end - 1)), scan_range.block_range().contains(&block_range.start), ) { (true, true, _) => scan_ranges_contained_by_block_range.push(scan_range.clone()), @@ -410,28 +419,28 @@ fn split_out_scan_range( if let Some((lower_range, higher_range)) = scan_range.split_at(block_range.start) { split_ranges.push(lower_range); if let Some((middle_range, higher_range)) = higher_range.split_at(block_range.end) { - // [scan_range] is split at the upper and lower bound of [block_range] + // `scan_range` is split at the upper and lower bound of `block_range` split_ranges.push(ScanRange::from_parts( middle_range.block_range().clone(), scan_priority, )); split_ranges.push(higher_range); } else { - // [scan_range] is split only at the lower bound of [block_range] + // `scan_range` is split only at the lower bound of `block_range` split_ranges.push(ScanRange::from_parts( higher_range.block_range().clone(), scan_priority, )); } } else if let Some((lower_range, higher_range)) = scan_range.split_at(block_range.end) { - // [scan_range] is split only at the upper bound of [block_range] + // `scan_range` is split only at the upper bound of `block_range` split_ranges.push(ScanRange::from_parts( lower_range.block_range().clone(), scan_priority, )); split_ranges.push(higher_range); } else { - // [scan_range] is not split as it is fully contained within [block_range] + // `scan_range` is not split as it is fully contained within `block_range` // only scan priority is updated assert!(scan_range.block_range().start >= block_range.start); assert!(scan_range.block_range().end <= block_range.end); From 7b9ddc44036e663d0b2522c758edd484b1c8ce12 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Tue, 7 Jan 2025 15:24:05 +0000 Subject: [PATCH 14/18] fix clippy warnings --- zingo-sync/src/sync/state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zingo-sync/src/sync/state.rs b/zingo-sync/src/sync/state.rs index c9833bf74..71f05fc1a 100644 --- a/zingo-sync/src/sync/state.rs +++ b/zingo-sync/src/sync/state.rs @@ -484,7 +484,7 @@ fn select_scan_range(sync_state: &mut SyncState) -> Option { // otherwise, just set the scan priority of selected range to `Ignored` (scanning) in sync state. let selected_block_range = if selected_priority == ScanPriority::Historic { let shard_block_range = determine_block_range( - &sync_state, + sync_state, highest_priority_scan_range.block_range().start, ShieldedProtocol::Orchard, ); From 23f4442a1a3d24a9d3899736e01e0a1b4b42996a Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Thu, 16 Jan 2025 08:31:39 +0000 Subject: [PATCH 15/18] retain all scanned ranges boundary blocks in the wallet --- zingo-sync/src/sync.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index b0ae46aa3..984e07c95 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -502,15 +502,34 @@ where let sync_state = wallet.get_sync_state().unwrap(); let fully_scanned_height = sync_state.fully_scanned_height(); let highest_scanned_height = sync_state.highest_scanned_height(); + let sync_start_height = sync_state.initial_sync_state().sync_start_height(); + + let scanned_block_range_boundaries = sync_state + .scan_ranges() + .iter() + .filter(|scan_range| { + scan_range.priority() == ScanPriority::Scanned + && scan_range.block_range().start >= sync_start_height + }) + .flat_map(|scan_range| { + vec![ + scan_range.block_range().start, + scan_range.block_range().end - 1, + ] + }) + .collect::>(); + let wallet_transaction_heights = wallet .get_wallet_transactions() .unwrap() .values() .filter_map(|tx| tx.confirmation_status().get_confirmed_height()) .collect::>(); + wallet.get_wallet_blocks_mut().unwrap().retain(|height, _| { - *height >= fully_scanned_height - 1 + *height >= sync_start_height - 1 || *height >= highest_scanned_height - MAX_VERIFICATION_WINDOW + || scanned_block_range_boundaries.contains(height) || wallet_transaction_heights.contains(height) }); wallet From 93394884adf3dcc4394f7e51c9afff01b0df0b12 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Fri, 17 Jan 2025 04:54:28 +0000 Subject: [PATCH 16/18] small cleanup --- zingo-sync/src/client/fetch.rs | 6 ++++-- zingo-sync/src/sync.rs | 3 +-- zingo-sync/src/sync/state.rs | 5 ----- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/zingo-sync/src/client/fetch.rs b/zingo-sync/src/client/fetch.rs index 4cb364e2d..61d9bdc08 100644 --- a/zingo-sync/src/client/fetch.rs +++ b/zingo-sync/src/client/fetch.rs @@ -169,6 +169,7 @@ async fn get_latest_block( Ok(client.get_latest_block(request).await.unwrap().into_inner()) } + async fn get_block_range( client: &mut CompactTxStreamerClient, block_range: Range, @@ -183,9 +184,8 @@ async fn get_block_range( hash: vec![], }), }); - let block_stream = client.get_block_range(request).await.unwrap().into_inner(); - Ok(block_stream) + Ok(client.get_block_range(request).await.unwrap().into_inner()) } async fn get_subtree_roots( @@ -199,12 +199,14 @@ async fn get_subtree_roots( shielded_protocol, max_entries, }; + Ok(client .get_subtree_roots(request) .await .unwrap() .into_inner()) } + async fn get_tree_state( client: &mut CompactTxStreamerClient, block_height: BlockHeight, diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 50eab338b..3cdfe5078 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -37,13 +37,12 @@ pub(crate) mod spend; pub(crate) mod state; pub(crate) mod transparent; -// TODO: move parameters to config module const VERIFY_BLOCK_RANGE_SIZE: u32 = 10; const MAX_VERIFICATION_WINDOW: u32 = 100; /// Syncs a wallet to the latest state of the blockchain pub async fn sync( - client: CompactTxStreamerClient, // TODO: change underlying service for generic + client: CompactTxStreamerClient, consensus_parameters: &P, wallet: Arc>, ) -> Result<(), SyncError> diff --git a/zingo-sync/src/sync/state.rs b/zingo-sync/src/sync/state.rs index 9a5ad09c8..5b31c4c4c 100644 --- a/zingo-sync/src/sync/state.rs +++ b/zingo-sync/src/sync/state.rs @@ -64,7 +64,6 @@ where } /// Returns the locators for a given `block_range` from the wallet's [`crate::primitives::SyncState`] -// TODO: unit test high priority fn find_locators(sync_state: &SyncState, block_range: &Range) -> BTreeSet { sync_state .locators() @@ -76,8 +75,6 @@ fn find_locators(sync_state: &SyncState, block_range: &Range) -> BT .collect() } -// TODO: remove locators after range is scanned - /// Update scan ranges for scanning pub(super) async fn update_scan_ranges( consensus_parameters: &impl consensus::Parameters, @@ -148,8 +145,6 @@ fn reset_scan_ranges(sync_state: &mut SyncState) -> Result<(), ()> { set_scan_priority(sync_state, scan_range.block_range(), ScanPriority::Verify).unwrap(); } - // TODO: determine OpenAdjacent priority ranges from the end block of previous ChainTip ranges - Ok(()) } From 69c5066c4e3a7e71dfca98ccfbadc1c78cc569b7 Mon Sep 17 00:00:00 2001 From: Dorian <58955380+dorianvp@users.noreply.github.com> Date: Fri, 31 Jan 2025 18:10:44 -0300 Subject: [PATCH 17/18] Update zingo-sync/src/primitives.rs Signed-off-by: Dorian <58955380+dorianvp@users.noreply.github.com> --- zingo-sync/src/primitives.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zingo-sync/src/primitives.rs b/zingo-sync/src/primitives.rs index 02f57fda8..463f3405b 100644 --- a/zingo-sync/src/primitives.rs +++ b/zingo-sync/src/primitives.rs @@ -24,7 +24,7 @@ use crate::{ utils, }; -/// Block height and txid of relevant transactions that have yet to be scanned. These may be added due transparent +/// Block height and txid of relevant transactions that have yet to be scanned. These may be added due to transparent /// output/spend discovery or for targetted rescan. pub type Locator = (BlockHeight, TxId); From e92bf8559fea4d0ed377d31daeab36216639a8f8 Mon Sep 17 00:00:00 2001 From: Dorian <58955380+dorianvp@users.noreply.github.com> Date: Fri, 31 Jan 2025 18:10:55 -0300 Subject: [PATCH 18/18] Update zingo-sync/src/scan/task.rs Signed-off-by: Dorian <58955380+dorianvp@users.noreply.github.com> --- zingo-sync/src/scan/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index d0330001e..e0424d097 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -172,7 +172,7 @@ where /// /// Creates a new scan task and sends to batcher if it's idle. /// The batcher will stream compact blocks into the scan task, splitting the scan task when the maximum number of - /// outputs is reached. When a scan task is ready is it stored in the batcher ready to be taken by an idle scan + /// outputs is reached. When a scan task is ready it is stored in the batcher ready to be taken by an idle scan /// worker for scanning. /// When verification is still in progress, only scan tasks with `Verify` scan priority are created. /// When all ranges are scanned, the batcher, idle workers and mempool are shutdown.