From 1ad18421f5ddb572048f11bcbddebe77a262fac5 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Tue, 14 Jan 2025 16:52:49 +0200 Subject: [PATCH] Retry approval on availability failure if the check is still needed (#6807) Recovering the POV can fail in situation where the node just restart and the DHT topology wasn't fully discovered yet, so the current node can't connect to most of its Peers. This is bad because for gossiping the assignment you need to be connected to just a few peers, so because we can't approve the candidate and other nodes will see this as a no show. This becomes bad in the scenario where you've got a lot of nodes restarting at the same time, so you end up having a lot of no-shows in the network that are never covered, in that case it makes sense for nodes to actually retry approving the candidate at a later data in time and retry several times if the block containing the candidate wasn't approved. ## TODO - [x] Add a subsystem test. --------- Signed-off-by: Alexandru Gheorghe --- polkadot/node/core/approval-voting/src/lib.rs | 137 +++++++++++++++- .../node/core/approval-voting/src/tests.rs | 146 ++++++++++++++++++ .../subsystem-bench/src/lib/approval/mod.rs | 2 + prdoc/pr_6807.prdoc | 19 +++ 4 files changed, 297 insertions(+), 7 deletions(-) create mode 100644 prdoc/pr_6807.prdoc diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 7cea22d1a6a7..27361df37310 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -21,6 +21,7 @@ //! of others. It uses this information to determine when candidates and blocks have //! been sufficiently approved to finalize. +use futures_timer::Delay; use polkadot_node_primitives::{ approval::{ v1::{BlockApprovalMeta, DelayTranche}, @@ -122,6 +123,9 @@ const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120); const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500); const APPROVAL_CACHE_SIZE: u32 = 1024; +/// The maximum number of times we retry to approve a block if is still needed. +const MAX_APPROVAL_RETRIES: u32 = 16; + const APPROVAL_DELAY: Tick = 2; pub(crate) const LOG_TARGET: &str = "parachain::approval-voting"; @@ -165,6 +169,10 @@ pub struct ApprovalVotingSubsystem { metrics: Metrics, clock: Arc, spawner: Arc, + /// The maximum time we retry to approve a block if it is still needed and PoV fetch failed. + max_approval_retries: u32, + /// The backoff before we retry the approval. + retry_backoff: Duration, } #[derive(Clone)] @@ -493,6 +501,8 @@ impl ApprovalVotingSubsystem { metrics, Arc::new(SystemClock {}), spawner, + MAX_APPROVAL_RETRIES, + APPROVAL_CHECKING_TIMEOUT / 2, ) } @@ -505,6 +515,8 @@ impl ApprovalVotingSubsystem { metrics: Metrics, clock: Arc, spawner: Arc, + max_approval_retries: u32, + retry_backoff: Duration, ) -> Self { ApprovalVotingSubsystem { keystore, @@ -515,6 +527,8 @@ impl ApprovalVotingSubsystem { metrics, clock, spawner, + max_approval_retries, + retry_backoff, } } @@ -706,18 +720,53 @@ enum ApprovalOutcome { TimedOut, } +#[derive(Clone)] +struct RetryApprovalInfo { + candidate: CandidateReceipt, + backing_group: GroupIndex, + executor_params: ExecutorParams, + core_index: Option, + session_index: SessionIndex, + attempts_remaining: u32, + backoff: Duration, +} + struct ApprovalState { validator_index: ValidatorIndex, candidate_hash: CandidateHash, approval_outcome: ApprovalOutcome, + retry_info: Option, } impl ApprovalState { fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self { - Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Approved } + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Approved, + retry_info: None, + } } fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self { - Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Failed } + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Failed, + retry_info: None, + } + } + + fn failed_with_retry( + validator_index: ValidatorIndex, + candidate_hash: CandidateHash, + retry_info: Option, + ) -> Self { + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Failed, + retry_info, + } } } @@ -757,6 +806,7 @@ impl CurrentlyCheckingSet { candidate_hash, validator_index, approval_outcome: ApprovalOutcome::TimedOut, + retry_info: None, }, Some(approval_state) => approval_state, } @@ -1271,18 +1321,19 @@ where validator_index, candidate_hash, approval_outcome, + retry_info, } ) = approval_state; if matches!(approval_outcome, ApprovalOutcome::Approved) { let mut approvals: Vec = relay_block_hashes - .into_iter() + .iter() .map(|block_hash| Action::IssueApproval( candidate_hash, ApprovalVoteRequest { validator_index, - block_hash, + block_hash: *block_hash, }, ) ) @@ -1290,6 +1341,43 @@ where actions.append(&mut approvals); } + if let Some(retry_info) = retry_info { + for block_hash in relay_block_hashes { + if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) { + let sender = to_other_subsystems.clone(); + let spawn_handle = subsystem.spawner.clone(); + let metrics = subsystem.metrics.clone(); + let retry_info = retry_info.clone(); + let executor_params = retry_info.executor_params.clone(); + let candidate = retry_info.candidate.clone(); + + currently_checking_set + .insert_relay_block_hash( + candidate_hash, + validator_index, + block_hash, + async move { + launch_approval( + sender, + spawn_handle, + metrics, + retry_info.session_index, + candidate, + validator_index, + block_hash, + retry_info.backing_group, + executor_params, + retry_info.core_index, + retry_info, + ) + .await + }, + ) + .await?; + } + } + } + actions }, (block_hash, validator_index) = delayed_approvals_timers.select_next_some() => { @@ -1340,6 +1428,8 @@ where &mut approvals_cache, &mut subsystem.mode, actions, + subsystem.max_approval_retries, + subsystem.retry_backoff, ) .await? { @@ -1389,6 +1479,8 @@ pub async fn start_approval_worker< metrics, clock, spawner, + MAX_APPROVAL_RETRIES, + APPROVAL_CHECKING_TIMEOUT / 2, ); let backend = DbBackend::new(db.clone(), approval_voting.db_config); let spawner = approval_voting.spawner.clone(); @@ -1456,6 +1548,8 @@ async fn handle_actions< approvals_cache: &mut LruMap, mode: &mut Mode, actions: Vec, + max_approval_retries: u32, + retry_backoff: Duration, ) -> SubsystemResult { let mut conclude = false; let mut actions_iter = actions.into_iter(); @@ -1542,6 +1636,16 @@ async fn handle_actions< let sender = sender.clone(); let spawn_handle = spawn_handle.clone(); + let retry = RetryApprovalInfo { + candidate: candidate.clone(), + backing_group, + executor_params: executor_params.clone(), + core_index, + session_index: session, + attempts_remaining: max_approval_retries, + backoff: retry_backoff, + }; + currently_checking_set .insert_relay_block_hash( candidate_hash, @@ -1559,6 +1663,7 @@ async fn handle_actions< backing_group, executor_params, core_index, + retry, ) .await }, @@ -3329,6 +3434,7 @@ async fn launch_approval< backing_group: GroupIndex, executor_params: ExecutorParams, core_index: Option, + retry: RetryApprovalInfo, ) -> SubsystemResult> { let (a_tx, a_rx) = oneshot::channel(); let (code_tx, code_rx) = oneshot::channel(); @@ -3360,6 +3466,7 @@ async fn launch_approval< let candidate_hash = candidate.hash(); let para_id = candidate.descriptor.para_id(); + let mut next_retry = None; gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data."); let timer = metrics.time_recover_and_approve(); @@ -3388,7 +3495,6 @@ async fn launch_approval< let background = async move { // Force the move of the timer into the background task. let _timer = timer; - let available_data = match a_rx.await { Err(_) => return ApprovalState::failed(validator_index, candidate_hash), Ok(Ok(a)) => a, @@ -3399,10 +3505,27 @@ async fn launch_approval< target: LOG_TARGET, ?para_id, ?candidate_hash, + attempts_remaining = retry.attempts_remaining, "Data unavailable for candidate {:?}", (candidate_hash, candidate.descriptor.para_id()), ); - // do nothing. we'll just be a no-show and that'll cause others to rise up. + // Availability could fail if we did not discover much of the network, so + // let's back off and order the subsystem to retry at a later point if the + // approval is still needed, because no-show wasn't covered yet. + if retry.attempts_remaining > 0 { + Delay::new(retry.backoff).await; + next_retry = Some(RetryApprovalInfo { + candidate, + backing_group, + executor_params, + core_index, + session_index, + attempts_remaining: retry.attempts_remaining - 1, + backoff: retry.backoff, + }); + } else { + next_retry = None; + } metrics_guard.take().on_approval_unavailable(); }, &RecoveryError::ChannelClosed => { @@ -3433,7 +3556,7 @@ async fn launch_approval< metrics_guard.take().on_approval_invalid(); }, } - return ApprovalState::failed(validator_index, candidate_hash) + return ApprovalState::failed_with_retry(validator_index, candidate_hash, next_retry) }, }; diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index be569a1de3ec..b72993fe1a94 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -78,6 +78,9 @@ const SLOT_DURATION_MILLIS: u64 = 5000; const TIMEOUT: Duration = Duration::from_millis(2000); +const NUM_APPROVAL_RETRIES: u32 = 3; +const RETRY_BACKOFF: Duration = Duration::from_millis(300); + #[derive(Clone)] struct TestSyncOracle { flag: Arc, @@ -573,6 +576,8 @@ fn test_harness>( Metrics::default(), clock.clone(), Arc::new(SpawnGlue(pool)), + NUM_APPROVAL_RETRIES, + RETRY_BACKOFF, ), assignment_criteria, backend, @@ -3202,6 +3207,20 @@ async fn recover_available_data(virtual_overseer: &mut VirtualOverseer) { ); } +async fn recover_available_data_failure(virtual_overseer: &mut VirtualOverseer) { + let available_data = RecoveryError::Unavailable; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityRecovery( + AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, _, tx) + ) => { + tx.send(Err(available_data)).unwrap(); + }, + "overseer did not receive recover available data message", + ); +} + struct TriggersAssignmentConfig { our_assigned_tranche: DelayTranche, assign_validator_tranche: F1, @@ -4791,6 +4810,133 @@ fn subsystem_relaunches_approval_work_on_restart() { }); } +/// Test that we retry the approval of candidate on availability failure, up to max retries. +#[test] +fn subsystem_relaunches_approval_work_on_availability_failure() { + let assignment_criteria = Box::new(MockAssignmentCriteria( + || { + let mut assignments = HashMap::new(); + + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFModuloCompact { + core_bitfield: vec![CoreIndex(0), CoreIndex(2)].try_into().unwrap(), + }), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + + let _ = assignments.insert( + CoreIndex(1), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFDelay { + core_index: CoreIndex(1), + }), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + assignments + }, + |_| Ok(0), + )); + let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build(); + let store = config.backend(); + + test_harness(config, |test_harness| async move { + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness; + + setup_overseer_with_blocks_with_two_assignments_triggered( + &mut virtual_overseer, + store, + &clock, + sync_oracle_handle, + ) + .await; + + // We have two candidates for one we are going to fail the availability for up to + // max_retries and for the other we are going to succeed on the last retry, so we should + // see the approval being distributed. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _ + )) => { + } + ); + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive { + exec_kind, + response_sender, + .. + }) if exec_kind == PvfExecKind::Approval => { + response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) + .unwrap(); + } + ); + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 1, + })); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_)) + ); + + // Assert that there are no more messages being sent by the subsystem + assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); + + virtual_overseer + }); +} + // Test that cached approvals, which are candidates that we approved but we didn't issue // the signature yet because we want to coalesce it with more candidate are sent after restart. #[test] diff --git a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs index 1b20960a3f8a..5f1689cb226b 100644 --- a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs @@ -891,6 +891,8 @@ fn build_overseer( state.approval_voting_parallel_metrics.approval_voting_metrics(), Arc::new(system_clock.clone()), Arc::new(SpawnGlue(spawn_task_handle.clone())), + 1, + Duration::from_secs(1), ); let approval_distribution = ApprovalDistribution::new_with_clock( diff --git a/prdoc/pr_6807.prdoc b/prdoc/pr_6807.prdoc new file mode 100644 index 000000000000..b9564dfb2fe2 --- /dev/null +++ b/prdoc/pr_6807.prdoc @@ -0,0 +1,19 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Retry approval on availability failure if the check is still needed + +doc: + - audience: Node Dev + description: | + Recovering the POV can fail in situation where the node just restart and the DHT topology + wasn't fully discovered yet, so the current node can't connect to most of its Peers. + This is bad because for gossiping the assignment you need to be connected to just a few + peers, so because we can't approve the candidate other nodes will see this as a no show. + Fix it by retrying to approve a candidate for a fixed number of atttempts if the block is + still needed. + + +crates: + - name: polkadot-node-core-approval-voting + bump: minor