Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed output batching #1615

Merged
merged 28 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fec01ed
fix reorg bug and add end seam block for improved continuity checks
Oscar-Pepper Dec 31, 2024
627769a
fix clippy and combine located tree builds into one spawn blocking
Oscar-Pepper Dec 31, 2024
feb5d2a
revisit wallet data cleanup
Oscar-Pepper Dec 31, 2024
6d4daee
clear locators
Oscar-Pepper Dec 31, 2024
0a21d09
fix bug in
Oscar-Pepper Dec 31, 2024
6175ba0
add max re-org window
Oscar-Pepper Dec 31, 2024
03bbbc8
remove todo
Oscar-Pepper Dec 31, 2024
0d37159
get block range returns a stream
Oscar-Pepper Jan 2, 2025
0b6f68b
implemented batcher
Oscar-Pepper Jan 3, 2025
182eb2f
added fixed output batching
Oscar-Pepper Jan 7, 2025
b561545
Merge branch 'shard_ranges' into fix_reorg_and_improve_continuity_checks
Oscar-Pepper Jan 7, 2025
1a1233c
Merge branch 'fix_reorg_and_improve_continuity_checks' into fixed_out…
Oscar-Pepper Jan 7, 2025
69fe10f
start work on setting scanned ranges correctly in post-scan processing
Oscar-Pepper Jan 7, 2025
24aa381
improve scan task split
Oscar-Pepper Jan 7, 2025
59dfbef
complete batcher with linear scanning continuity checks
Oscar-Pepper Jan 7, 2025
7b9ddc4
fix clippy warnings
Oscar-Pepper Jan 7, 2025
312f5c8
solve merge conflicts with shard_ranges branch changes
Oscar-Pepper Jan 16, 2025
ce51a0d
solve merge conflicts and update to changes in sync_status branch
Oscar-Pepper Jan 16, 2025
23f4442
retain all scanned ranges boundary blocks in the wallet
Oscar-Pepper Jan 16, 2025
c9b898b
Merge branch 'fix_reorg_and_improve_continuity_checks' into fixed_out…
Oscar-Pepper Jan 16, 2025
5c34d84
Merge branch 'shard_ranges' into fix_reorg_and_improve_continuity_checks
Oscar-Pepper Jan 16, 2025
4f42071
Merge branch 'fix_reorg_and_improve_continuity_checks' into fixed_out…
Oscar-Pepper Jan 16, 2025
acfd0af
Merge branch 'shard_ranges' into fix_reorg_and_improve_continuity_checks
Oscar-Pepper Jan 17, 2025
1372b69
solve merge conflicts with changes to shard_ranges branch
Oscar-Pepper Jan 17, 2025
9339488
small cleanup
Oscar-Pepper Jan 17, 2025
69c5066
Update zingo-sync/src/primitives.rs
dorianvp Jan 31, 2025
e92bf85
Update zingo-sync/src/scan/task.rs
dorianvp Jan 31, 2025
2fda65b
Merge branch 'dev_2-0' into fixed_output_batching
dorianvp Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions zingo-sync/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ pub enum FetchRequest {
/// Gets the height of the blockchain from the server.
ChainTip(oneshot::Sender<BlockId>),
/// Gets the specified range of compact blocks from the server (end exclusive).
CompactBlockRange(oneshot::Sender<Vec<CompactBlock>>, Range<BlockHeight>),
CompactBlockRange(
oneshot::Sender<tonic::Streaming<CompactBlock>>,
Range<BlockHeight>,
),
/// Gets the tree states for a specified block height.
TreeState(oneshot::Sender<TreeState>, BlockHeight),
/// Get a full transaction by txid.
Expand Down Expand Up @@ -74,14 +77,14 @@ pub async fn get_chain_height(
pub async fn get_compact_block_range(
fetch_request_sender: UnboundedSender<FetchRequest>,
block_range: Range<BlockHeight>,
) -> Result<Vec<CompactBlock>, ()> {
) -> Result<tonic::Streaming<CompactBlock>, ()> {
let (reply_sender, reply_receiver) = oneshot::channel();
fetch_request_sender
.send(FetchRequest::CompactBlockRange(reply_sender, block_range))
.unwrap();
let compact_blocks = reply_receiver.await.unwrap();
let block_stream = reply_receiver.await.unwrap();

Ok(compact_blocks)
Ok(block_stream)
}

/// Gets the stream of shards (subtree roots)
Expand Down
19 changes: 7 additions & 12 deletions zingo-sync/src/client/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ async fn fetch_from_server(
}
FetchRequest::CompactBlockRange(sender, block_range) => {
tracing::debug!("Fetching compact blocks. {:?}", &block_range);
let compact_blocks = get_block_range(client, block_range).await.unwrap();
sender.send(compact_blocks).unwrap();
let block_stream = get_block_range(client, block_range).await.unwrap();
sender.send(block_stream).unwrap();
}
FetchRequest::GetSubtreeRoots(sender, start_index, shielded_protocol, max_entries) => {
tracing::debug!(
Expand Down Expand Up @@ -169,13 +169,11 @@ async fn get_latest_block(

Ok(client.get_latest_block(request).await.unwrap().into_inner())
}

async fn get_block_range(
client: &mut CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
block_range: Range<BlockHeight>,
) -> Result<Vec<CompactBlock>, ()> {
let mut compact_blocks: Vec<CompactBlock> =
Vec::with_capacity(u64::from(block_range.end - block_range.start) as usize);

) -> Result<tonic::Streaming<CompactBlock>, ()> {
let request = tonic::Request::new(BlockRange {
start: Some(BlockId {
height: u64::from(block_range.start),
Expand All @@ -186,13 +184,8 @@ async fn get_block_range(
hash: vec![],
}),
});
let mut block_stream = client.get_block_range(request).await.unwrap().into_inner();

while let Some(compact_block) = block_stream.message().await.unwrap() {
compact_blocks.push(compact_block);
}

Ok(compact_blocks)
Ok(client.get_block_range(request).await.unwrap().into_inner())
}

async fn get_subtree_roots(
Expand All @@ -206,12 +199,14 @@ async fn get_subtree_roots(
shielded_protocol,
max_entries,
};

Ok(client
.get_subtree_roots(request)
.await
.unwrap()
.into_inner())
}

async fn get_tree_state(
client: &mut CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
block_height: BlockHeight,
Expand Down
21 changes: 21 additions & 0 deletions zingo-sync/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,28 @@ impl SyncState {
}
}

/// Returns the highest block height that has been scanned.
///
/// If no scan ranges have been scanned, returns the block below the wallet birthday.
/// Will panic if called before scan ranges are updated for the first time.
pub fn highest_scanned_height(&self) -> BlockHeight {
if let Some(last_scanned_range) = self
.scan_ranges()
.iter()
.filter(|scan_range| scan_range.priority() == ScanPriority::Scanned)
.last()
{
last_scanned_range.block_range().end - 1
} else {
self.wallet_birthday()
.expect("scan ranges always non-empty")
- 1
}
}

/// Returns the wallet birthday or `None` if `self.scan_ranges` is empty.
///
/// If the wallet birthday is below the sapling activation height, returns the sapling activation height instead.
pub fn wallet_birthday(&self) -> Option<BlockHeight> {
self.scan_ranges()
.first()
Expand Down
173 changes: 70 additions & 103 deletions zingo-sync/src/scan.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
use std::{
cmp,
collections::{BTreeMap, BTreeSet, HashMap},
};
use std::collections::{BTreeMap, BTreeSet, HashMap};

use orchard::tree::MerkleHashOrchard;
use task::ScanTask;
use tokio::sync::mpsc;

use incrementalmerkletree::Position;
use zcash_client_backend::{data_api::scanning::ScanRange, proto::compact_formats::CompactBlock};
use zcash_client_backend::proto::compact_formats::CompactBlock;
use zcash_keys::keys::UnifiedFullViewingKey;
use zcash_primitives::{
consensus::{BlockHeight, NetworkUpgrade, Parameters},
consensus::{self, BlockHeight},
transaction::TxId,
zip32::AccountId,
};

use crate::{
client::{self, FetchRequest},
keys::transparent::TransparentAddressId,
client::FetchRequest,
primitives::{Locator, NullifierMap, OutPointMap, OutputId, WalletBlock, WalletTransaction},
witness::{self, LocatedTreeData, WitnessData},
};
Expand All @@ -32,7 +29,8 @@ pub(crate) mod task;
pub(crate) mod transactions;

struct InitialScanData {
previous_block: Option<WalletBlock>,
start_seam_block: Option<WalletBlock>,
end_seam_block: Option<WalletBlock>,
sapling_initial_tree_size: u32,
orchard_initial_tree_size: u32,
}
Expand All @@ -42,79 +40,35 @@ impl InitialScanData {
fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
consensus_parameters: &P,
first_block: &CompactBlock,
previous_wallet_block: Option<WalletBlock>,
start_seam_block: Option<WalletBlock>,
end_seam_block: Option<WalletBlock>,
) -> Result<Self, ()>
where
P: Parameters + Sync + Send + 'static,
P: consensus::Parameters + Sync + Send + 'static,
{
// gets initial tree size from previous block if available
// otherwise, from first block if available
// otherwise, fetches frontiers from server
let (sapling_initial_tree_size, orchard_initial_tree_size) = if let Some(prev) =
&previous_wallet_block
{
(
prev.tree_boundaries().sapling_final_tree_size,
prev.tree_boundaries().orchard_final_tree_size,
)
} else if let Some(chain_metadata) = &first_block.chain_metadata {
// calculate initial tree size by subtracting number of outputs in block from the blocks final tree size
let sapling_output_count: u32 = first_block
.vtx
.iter()
.map(|tx| tx.outputs.len())
.sum::<usize>()
.try_into()
.expect("Sapling output count cannot exceed a u32");
let orchard_output_count: u32 = first_block
.vtx
.iter()
.map(|tx| tx.actions.len())
.sum::<usize>()
.try_into()
.expect("Sapling output count cannot exceed a u32");

(
chain_metadata
.sapling_commitment_tree_size
.checked_sub(sapling_output_count)
.unwrap(),
chain_metadata
.orchard_commitment_tree_size
.checked_sub(orchard_output_count)
.unwrap(),
)
} else {
let sapling_activation_height = consensus_parameters
.activation_height(NetworkUpgrade::Sapling)
.expect("should have some sapling activation height");

match first_block.height().cmp(&sapling_activation_height) {
cmp::Ordering::Greater => {
let frontiers =
client::get_frontiers(fetch_request_sender, first_block.height() - 1)
.await
.unwrap();
(
frontiers
.final_sapling_tree()
.tree_size()
.try_into()
.expect("should not be more than 2^32 note commitments in the tree!"),
frontiers
.final_orchard_tree()
.tree_size()
.try_into()
.expect("should not be more than 2^32 note commitments in the tree!"),
)
}
cmp::Ordering::Equal => (0, 0),
cmp::Ordering::Less => panic!("pre-sapling not supported!"),
}
};
let (sapling_initial_tree_size, orchard_initial_tree_size) =
if let Some(prev) = &start_seam_block {
(
prev.tree_boundaries().sapling_final_tree_size,
prev.tree_boundaries().orchard_final_tree_size,
)
} else {
let tree_boundaries = compact_blocks::calculate_block_tree_boundaries(
consensus_parameters,
fetch_request_sender,
first_block,
)
.await;

(
tree_boundaries.sapling_initial_tree_size,
tree_boundaries.orchard_initial_tree_size,
)
};

Ok(InitialScanData {
previous_block: previous_wallet_block,
start_seam_block,
end_seam_block,
sapling_initial_tree_size,
orchard_initial_tree_size,
})
Expand Down Expand Up @@ -154,41 +108,56 @@ impl DecryptedNoteData {

/// Scans a given range and returns all data relevant to the specified keys.
///
/// `previous_wallet_block` is the wallet block with height [scan_range.start - 1].
/// `start_seam_block` and `end_seam_block` are the blocks adjacent to the `scan_range` for verification of continuity.
/// `locators` are the block height and txid of transactions in the `scan_range` that are known to be relevant to the
/// wallet and are appended to during scanning if trial decryption succeeds. If there are no known relevant transctions
/// then `locators` will start empty.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn scan<P>(
fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
parameters: &P,
consensus_parameters: &P,
ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
scan_range: ScanRange,
previous_wallet_block: Option<WalletBlock>,
mut locators: BTreeSet<Locator>,
transparent_addresses: HashMap<String, TransparentAddressId>,
scan_task: ScanTask,
) -> Result<ScanResults, ScanError>
where
P: Parameters + Sync + Send + 'static,
P: consensus::Parameters + Sync + Send + 'static,
{
let compact_blocks = client::get_compact_block_range(
fetch_request_sender.clone(),
scan_range.block_range().clone(),
)
.await
.unwrap();
let ScanTask {
compact_blocks,
scan_range,
start_seam_block,
end_seam_block,
mut locators,
transparent_addresses,
} = scan_task;

if compact_blocks
.first()
.expect("compacts blocks should not be empty")
.height
!= scan_range.block_range().start.into()
|| compact_blocks
.last()
.expect("compacts blocks should not be empty")
.height
!= (scan_range.block_range().end - 1).into()
{
panic!("compact blocks do not match scan range!")
}

let initial_scan_data = InitialScanData::new(
fetch_request_sender.clone(),
parameters,
consensus_parameters,
compact_blocks
.first()
.expect("compacts blocks should not be empty"),
previous_wallet_block,
start_seam_block,
end_seam_block,
)
.await
.unwrap();

let consensus_parameters_clone = parameters.clone();
let consensus_parameters_clone = consensus_parameters.clone();
let ufvks_clone = ufvks.clone();
let scan_data = tokio::task::spawn_blocking(move || {
scan_compact_blocks(
Expand All @@ -214,7 +183,7 @@ where
let mut outpoints = OutPointMap::new();
let wallet_transactions = scan_transactions(
fetch_request_sender,
parameters,
consensus_parameters,
ufvks,
locators,
decrypted_note_data,
Expand All @@ -232,15 +201,13 @@ where
orchard_leaves_and_retentions,
} = witness_data;

let sapling_located_trees = tokio::task::spawn_blocking(move || {
witness::build_located_trees(sapling_initial_position, sapling_leaves_and_retentions)
.unwrap()
})
.await
.unwrap();
let orchard_located_trees = tokio::task::spawn_blocking(move || {
witness::build_located_trees(orchard_initial_position, orchard_leaves_and_retentions)
.unwrap()
let (sapling_located_trees, orchard_located_trees) = tokio::task::spawn_blocking(move || {
(
witness::build_located_trees(sapling_initial_position, sapling_leaves_and_retentions)
.unwrap(),
witness::build_located_trees(orchard_initial_position, orchard_leaves_and_retentions)
.unwrap(),
)
})
.await
.unwrap();
Expand Down
Loading
Loading