Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sync hang #1665

Merged
merged 9 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 50 additions & 70 deletions libtonode-tests/tests/concrete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,21 @@ mod fast {
zip321::{Payment, TransactionRequest},
PoolType, ShieldedProtocol,
};
use zcash_primitives::{memo::Memo, transaction::components::amount::NonNegativeAmount};
use zcash_primitives::{
consensus::BlockHeight, memo::Memo, transaction::components::amount::NonNegativeAmount,
};
use zingo_status::confirmation_status::ConfirmationStatus;
use zingolib::{
config::ZENNIES_FOR_ZINGO_REGTEST_ADDRESS,
testutils::{
chain_generics::{conduct_chain::ConductChain, libtonode::LibtonodeEnvironment},
lightclient::{from_inputs, get_base_address},
},
wallet::{
data::summaries::{SelfSendValueTransfer, SentValueTransfer, ValueTransferKind},
data::summaries::{
SelfSendValueTransfer, SentValueTransfer, TransactionSummaryInterface as _,
ValueTransferKind,
},
keys::unified::ReceiverSelection,
},
UAReceivers,
Expand Down Expand Up @@ -792,80 +798,54 @@ mod fast {
}
}

// FIXME:
// #[tokio::test]
// async fn targeted_rescan() {
// let (regtest_manager, _cph, _faucet, recipient, txid) =
// scenarios::faucet_funded_recipient_default(100_000).await;

// *recipient
// .wallet
// .transaction_context
// .transaction_metadata_set
// .write()
// .await
// .transaction_records_by_id
// .get_mut(&txid_from_hex_encoded_str(&txid).unwrap())
// .unwrap()
// .orchard_notes[0]
// .output_index_mut() = None;

// let tx_summaries = recipient.transaction_summaries().await.0;
// assert!(tx_summaries[0].orchard_notes()[0].output_index().is_none());

// increase_height_and_wait_for_client(&regtest_manager, &recipient, 1)
// .await
// .unwrap();

// let tx_summaries = recipient.transaction_summaries().await.0;
// assert!(tx_summaries[0].orchard_notes()[0].output_index().is_some());
// }

// #[tokio::test]
// async fn received_tx_status_pending_to_confirmed_with_mempool_monitor() {
// let (regtest_manager, _cph, faucet, recipient, _txid) =
// scenarios::faucet_funded_recipient_default(100_000).await;
#[tokio::test]
async fn received_tx_status_pending_to_confirmed_with_mempool_monitor() {
tracing_subscriber::fmt().init();

// let recipient = std::sync::Arc::new(recipient);
let (regtest_manager, _cph, faucet, recipient, _txid) =
scenarios::faucet_funded_recipient_default(100_000).await;

// from_inputs::quick_send(
// &faucet,
// vec![(
// &get_base_address_macro!(&recipient, "sapling"),
// 20_000,
// None,
// )],
// )
// .await
// .unwrap();
from_inputs::quick_send(
&faucet,
vec![(
&get_base_address_macro!(&recipient, "unified"),
// &get_base_address_macro!(&recipient, "sapling"),
20_000,
None,
)],
)
.await
.unwrap();

// LightClient::start_mempool_monitor(recipient.clone()).unwrap();
// tokio::time::sleep(Duration::from_secs(5)).await;
recipient.do_sync(false).await.unwrap();

// let transactions = &recipient.transaction_summaries().await.0;
// assert_eq!(
// transactions
// .iter()
// .find(|tx| tx.value() == 20_000)
// .unwrap()
// .status(),
// ConfirmationStatus::Mempool(BlockHeight::from_u32(6))
// );
let transactions = &recipient.transaction_summaries().await.0;
transactions.iter().for_each(|tx| {
dbg!(tx);
});
assert_eq!(
transactions
.iter()
.find(|tx| tx.value() == 20_000)
.unwrap()
.status(),
ConfirmationStatus::Mempool(BlockHeight::from_u32(6))
);

// increase_height_and_wait_for_client(&regtest_manager, &recipient, 1)
// .await
// .unwrap();
increase_height_and_wait_for_client(&regtest_manager, &recipient, 1)
.await
.unwrap();

// let transactions = &recipient.transaction_summaries().await.0;
// assert_eq!(
// transactions
// .iter()
// .find(|tx| tx.value() == 20_000)
// .unwrap()
// .status(),
// ConfirmationStatus::Confirmed(BlockHeight::from_u32(6))
// );
// }
let transactions = &recipient.transaction_summaries().await.0;
assert_eq!(
transactions
.iter()
.find(|tx| tx.value() == 20_000)
.unwrap()
.status(),
ConfirmationStatus::Confirmed(BlockHeight::from_u32(6))
);
}

// #[tokio::test]
// async fn utxos_are_not_prematurely_confirmed() {
Expand Down
32 changes: 14 additions & 18 deletions libtonode-tests/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use zingolib::{
config::{construct_lightwalletd_uri, load_clientconfig, DEFAULT_LIGHTWALLETD_SERVER},
get_base_address_macro,
lightclient::LightClient,
testutils::{lightclient::from_inputs, scenarios},
testutils::{increase_server_height, lightclient::from_inputs, scenarios},
wallet::WalletBase,
};

Expand Down Expand Up @@ -99,12 +99,12 @@ async fn sync_status() {
}

// temporary test for sync development
#[ignore = "hangs"]
#[ignore = "sync development only"]
#[tokio::test]
async fn sync_test() {
tracing_subscriber::fmt().init();

let (_regtest_manager, _cph, faucet, recipient, _txid) =
let (regtest_manager, _cph, faucet, recipient, _txid) =
scenarios::faucet_funded_recipient_default(5_000_000).await;
from_inputs::quick_send(
&faucet,
Expand All @@ -127,20 +127,16 @@ async fn sync_test() {
// .await
// .unwrap();

// increase_server_height(&regtest_manager, 1).await;
// recipient.do_sync(false).await.unwrap();
// recipient.quick_shield().await.unwrap();
// increase_server_height(&regtest_manager, 1).await;
increase_server_height(&regtest_manager, 1).await;
recipient.do_sync(false).await.unwrap();
recipient.quick_shield().await.unwrap();
increase_server_height(&regtest_manager, 1).await;
recipient.do_sync(true).await.unwrap();

let uri = recipient.config().lightwalletd_uri.read().unwrap().clone();
let client = GrpcConnector::new(uri).get_client().await.unwrap();
pepper_sync::sync(client, &recipient.config().chain.clone(), recipient.wallet)
.await
.unwrap();

// dbg!(&recipient.wallet.wallet_transactions);
// dbg!(recipient.wallet.wallet_blocks());
// dbg!(recipient.wallet.nullifier_map());
// dbg!(recipient.wallet.outpoint_map());
// dbg!(recipient.wallet.sync_state());
// let wallet = recipient.wallet.lock().await;
// dbg!(&wallet.wallet_transactions);
// dbg!(&wallet.wallet_blocks);
// dbg!(&wallet.nullifier_map);
// dbg!(&wallet.outpoint_map);
// dbg!(&wallet.sync_state);
}
35 changes: 30 additions & 5 deletions pepper-sync/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
//! Module for handling all connections to the server

use std::ops::Range;
use std::{
ops::Range,
sync::{
atomic::{self, AtomicBool},
Arc,
},
time::Duration,
};

use tokio::sync::{mpsc::UnboundedSender, oneshot};

Expand All @@ -19,6 +26,8 @@ use zcash_primitives::{
transaction::{Transaction, TxId},
};

use crate::sync::error::MempoolError;

pub(crate) mod fetch;

/// Fetch requests are created and sent to the [`crate::client::fetch::fetch`] task when a connection to the server is required.
Expand Down Expand Up @@ -195,11 +204,27 @@ pub(crate) async fn get_transparent_address_transactions(
}

/// Gets stream of mempool transactions until the next block is mined.
///
/// Checks at intervals if `shutdown_mempool` is set to prevent hanging on awating mempool monitor handle.
pub(crate) async fn get_mempool_transaction_stream(
client: &mut CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
) -> Result<tonic::Streaming<RawTransaction>, ()> {
shutdown_mempool: Arc<AtomicBool>,
) -> Result<tonic::Streaming<RawTransaction>, MempoolError> {
tracing::debug!("Fetching mempool stream");
let mempool_stream = fetch::get_mempool_stream(client).await.unwrap();

Ok(mempool_stream)
let mut interval = tokio::time::interval(Duration::from_secs(3));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval.tick().await;
loop {
tokio::select! {
mempool_stream_response = fetch::get_mempool_stream(client) => {
return mempool_stream_response.map_err(MempoolError::RequestFailed);
}

_ = interval.tick() => {
if shutdown_mempool.load(atomic::Ordering::Acquire) {
return Err(MempoolError::ShutdownWithoutStream);
}
}
}
}
}
16 changes: 6 additions & 10 deletions pepper-sync/src/client/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::sync::mpsc::UnboundedReceiver;
use zcash_client_backend::proto::{
compact_formats::CompactBlock,
service::{
compact_tx_streamer_client::CompactTxStreamerClient, BlockId, BlockRange, ChainSpec, Empty,
compact_tx_streamer_client::CompactTxStreamerClient, BlockId, BlockRange, ChainSpec,
GetAddressUtxosArg, GetAddressUtxosReply, GetSubtreeRootsArg, RawTransaction, SubtreeRoot,
TransparentAddressBlockFilter, TreeState, TxFilter,
},
Expand Down Expand Up @@ -313,17 +313,13 @@ async fn get_taddress_txs(
Ok(transactions)
}

/// Call `GetMempoolStream` client gPRC
/// Call `GetMempoolStream` client gPRC.
///
/// This is not called from the fetch request framework and is intended to be called independently.
pub(super) async fn get_mempool_stream(
pub(crate) async fn get_mempool_stream(
client: &mut CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
) -> Result<tonic::Streaming<RawTransaction>, ()> {
let request = tonic::Request::new(Empty {});
) -> Result<tonic::Streaming<RawTransaction>, tonic::Status> {
let request = tonic::Request::new(zcash_client_backend::proto::service::Empty {});

Ok(client
.get_mempool_stream(request)
.await
.unwrap()
.into_inner())
Ok(client.get_mempool_stream(request).await?.into_inner())
}
7 changes: 5 additions & 2 deletions pepper-sync/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
//! Top level error module for the crate

use crate::scan::error::ScanError;
use crate::{scan::error::ScanError, sync::error::MempoolError};

/// Top level error enum encapsulating any error that may occur during sync
#[derive(Debug, thiserror::Error)]
pub enum SyncError {
/// Errors associated with scanning
/// Scan error.
#[error("Scan error. {0}")]
ScanError(#[from] ScanError),
/// Mempool error.
#[error("Mempool error. {0}")]
MempoolError(#[from] MempoolError),
}
5 changes: 5 additions & 0 deletions pepper-sync/src/scan/error.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use zcash_primitives::{block::BlockHash, consensus::BlockHeight};

/// Scan errors.
#[derive(Debug, thiserror::Error)]
pub enum ScanError {
/// Continuity error.
#[error("Continuity error. {0}")]
ContinuityError(#[from] ContinuityError),
}

/// Block continuity errors.
#[derive(Debug, thiserror::Error)]
pub enum ContinuityError {
/// Height discontinuity.
#[error("Height discontinuity. Block with height {height} is not continuous with previous block height {previous_block_height}")]
HeightDiscontinuity {
height: BlockHeight,
previous_block_height: BlockHeight,
},
/// Hash discontinuity.
#[error("Hash discontinuity. Block prev_hash {prev_hash} with height {height} does not match previous block hash {previous_block_hash}")]
HashDiscontinuity {
height: BlockHeight,
Expand Down
8 changes: 5 additions & 3 deletions pepper-sync/src/scan/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl ScannerState {
}

pub(crate) struct Scanner<P> {
state: ScannerState,
pub(crate) state: ScannerState,
batcher: Option<Batcher<P>>,
workers: Vec<ScanWorker<P>>,
unique_id: usize,
Expand Down Expand Up @@ -110,8 +110,10 @@ where
}

async fn shutdown_batcher(&mut self) -> Result<(), JoinError> {
let mut batcher = self.batcher.take().expect("batcher should exist!");
batcher.shutdown().await?;
let batcher = self.batcher.take();
if let Some(mut batcher) = batcher {
batcher.shutdown().await?;
}

Ok(())
}
Expand Down
Loading
Loading