diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 7cea22d1a6a7..27361df37310 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -21,6 +21,7 @@ //! of others. It uses this information to determine when candidates and blocks have //! been sufficiently approved to finalize. +use futures_timer::Delay; use polkadot_node_primitives::{ approval::{ v1::{BlockApprovalMeta, DelayTranche}, @@ -122,6 +123,9 @@ const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120); const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500); const APPROVAL_CACHE_SIZE: u32 = 1024; +/// The maximum number of times we retry to approve a block if is still needed. +const MAX_APPROVAL_RETRIES: u32 = 16; + const APPROVAL_DELAY: Tick = 2; pub(crate) const LOG_TARGET: &str = "parachain::approval-voting"; @@ -165,6 +169,10 @@ pub struct ApprovalVotingSubsystem { metrics: Metrics, clock: Arc, spawner: Arc, + /// The maximum time we retry to approve a block if it is still needed and PoV fetch failed. + max_approval_retries: u32, + /// The backoff before we retry the approval. + retry_backoff: Duration, } #[derive(Clone)] @@ -493,6 +501,8 @@ impl ApprovalVotingSubsystem { metrics, Arc::new(SystemClock {}), spawner, + MAX_APPROVAL_RETRIES, + APPROVAL_CHECKING_TIMEOUT / 2, ) } @@ -505,6 +515,8 @@ impl ApprovalVotingSubsystem { metrics: Metrics, clock: Arc, spawner: Arc, + max_approval_retries: u32, + retry_backoff: Duration, ) -> Self { ApprovalVotingSubsystem { keystore, @@ -515,6 +527,8 @@ impl ApprovalVotingSubsystem { metrics, clock, spawner, + max_approval_retries, + retry_backoff, } } @@ -706,18 +720,53 @@ enum ApprovalOutcome { TimedOut, } +#[derive(Clone)] +struct RetryApprovalInfo { + candidate: CandidateReceipt, + backing_group: GroupIndex, + executor_params: ExecutorParams, + core_index: Option, + session_index: SessionIndex, + attempts_remaining: u32, + backoff: Duration, +} + struct ApprovalState { validator_index: ValidatorIndex, candidate_hash: CandidateHash, approval_outcome: ApprovalOutcome, + retry_info: Option, } impl ApprovalState { fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self { - Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Approved } + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Approved, + retry_info: None, + } } fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self { - Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Failed } + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Failed, + retry_info: None, + } + } + + fn failed_with_retry( + validator_index: ValidatorIndex, + candidate_hash: CandidateHash, + retry_info: Option, + ) -> Self { + Self { + validator_index, + candidate_hash, + approval_outcome: ApprovalOutcome::Failed, + retry_info, + } } } @@ -757,6 +806,7 @@ impl CurrentlyCheckingSet { candidate_hash, validator_index, approval_outcome: ApprovalOutcome::TimedOut, + retry_info: None, }, Some(approval_state) => approval_state, } @@ -1271,18 +1321,19 @@ where validator_index, candidate_hash, approval_outcome, + retry_info, } ) = approval_state; if matches!(approval_outcome, ApprovalOutcome::Approved) { let mut approvals: Vec = relay_block_hashes - .into_iter() + .iter() .map(|block_hash| Action::IssueApproval( candidate_hash, ApprovalVoteRequest { validator_index, - block_hash, + block_hash: *block_hash, }, ) ) @@ -1290,6 +1341,43 @@ where actions.append(&mut approvals); } + if let Some(retry_info) = retry_info { + for block_hash in relay_block_hashes { + if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) { + let sender = to_other_subsystems.clone(); + let spawn_handle = subsystem.spawner.clone(); + let metrics = subsystem.metrics.clone(); + let retry_info = retry_info.clone(); + let executor_params = retry_info.executor_params.clone(); + let candidate = retry_info.candidate.clone(); + + currently_checking_set + .insert_relay_block_hash( + candidate_hash, + validator_index, + block_hash, + async move { + launch_approval( + sender, + spawn_handle, + metrics, + retry_info.session_index, + candidate, + validator_index, + block_hash, + retry_info.backing_group, + executor_params, + retry_info.core_index, + retry_info, + ) + .await + }, + ) + .await?; + } + } + } + actions }, (block_hash, validator_index) = delayed_approvals_timers.select_next_some() => { @@ -1340,6 +1428,8 @@ where &mut approvals_cache, &mut subsystem.mode, actions, + subsystem.max_approval_retries, + subsystem.retry_backoff, ) .await? { @@ -1389,6 +1479,8 @@ pub async fn start_approval_worker< metrics, clock, spawner, + MAX_APPROVAL_RETRIES, + APPROVAL_CHECKING_TIMEOUT / 2, ); let backend = DbBackend::new(db.clone(), approval_voting.db_config); let spawner = approval_voting.spawner.clone(); @@ -1456,6 +1548,8 @@ async fn handle_actions< approvals_cache: &mut LruMap, mode: &mut Mode, actions: Vec, + max_approval_retries: u32, + retry_backoff: Duration, ) -> SubsystemResult { let mut conclude = false; let mut actions_iter = actions.into_iter(); @@ -1542,6 +1636,16 @@ async fn handle_actions< let sender = sender.clone(); let spawn_handle = spawn_handle.clone(); + let retry = RetryApprovalInfo { + candidate: candidate.clone(), + backing_group, + executor_params: executor_params.clone(), + core_index, + session_index: session, + attempts_remaining: max_approval_retries, + backoff: retry_backoff, + }; + currently_checking_set .insert_relay_block_hash( candidate_hash, @@ -1559,6 +1663,7 @@ async fn handle_actions< backing_group, executor_params, core_index, + retry, ) .await }, @@ -3329,6 +3434,7 @@ async fn launch_approval< backing_group: GroupIndex, executor_params: ExecutorParams, core_index: Option, + retry: RetryApprovalInfo, ) -> SubsystemResult> { let (a_tx, a_rx) = oneshot::channel(); let (code_tx, code_rx) = oneshot::channel(); @@ -3360,6 +3466,7 @@ async fn launch_approval< let candidate_hash = candidate.hash(); let para_id = candidate.descriptor.para_id(); + let mut next_retry = None; gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data."); let timer = metrics.time_recover_and_approve(); @@ -3388,7 +3495,6 @@ async fn launch_approval< let background = async move { // Force the move of the timer into the background task. let _timer = timer; - let available_data = match a_rx.await { Err(_) => return ApprovalState::failed(validator_index, candidate_hash), Ok(Ok(a)) => a, @@ -3399,10 +3505,27 @@ async fn launch_approval< target: LOG_TARGET, ?para_id, ?candidate_hash, + attempts_remaining = retry.attempts_remaining, "Data unavailable for candidate {:?}", (candidate_hash, candidate.descriptor.para_id()), ); - // do nothing. we'll just be a no-show and that'll cause others to rise up. + // Availability could fail if we did not discover much of the network, so + // let's back off and order the subsystem to retry at a later point if the + // approval is still needed, because no-show wasn't covered yet. + if retry.attempts_remaining > 0 { + Delay::new(retry.backoff).await; + next_retry = Some(RetryApprovalInfo { + candidate, + backing_group, + executor_params, + core_index, + session_index, + attempts_remaining: retry.attempts_remaining - 1, + backoff: retry.backoff, + }); + } else { + next_retry = None; + } metrics_guard.take().on_approval_unavailable(); }, &RecoveryError::ChannelClosed => { @@ -3433,7 +3556,7 @@ async fn launch_approval< metrics_guard.take().on_approval_invalid(); }, } - return ApprovalState::failed(validator_index, candidate_hash) + return ApprovalState::failed_with_retry(validator_index, candidate_hash, next_retry) }, }; diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index be569a1de3ec..b72993fe1a94 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -78,6 +78,9 @@ const SLOT_DURATION_MILLIS: u64 = 5000; const TIMEOUT: Duration = Duration::from_millis(2000); +const NUM_APPROVAL_RETRIES: u32 = 3; +const RETRY_BACKOFF: Duration = Duration::from_millis(300); + #[derive(Clone)] struct TestSyncOracle { flag: Arc, @@ -573,6 +576,8 @@ fn test_harness>( Metrics::default(), clock.clone(), Arc::new(SpawnGlue(pool)), + NUM_APPROVAL_RETRIES, + RETRY_BACKOFF, ), assignment_criteria, backend, @@ -3202,6 +3207,20 @@ async fn recover_available_data(virtual_overseer: &mut VirtualOverseer) { ); } +async fn recover_available_data_failure(virtual_overseer: &mut VirtualOverseer) { + let available_data = RecoveryError::Unavailable; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityRecovery( + AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, _, tx) + ) => { + tx.send(Err(available_data)).unwrap(); + }, + "overseer did not receive recover available data message", + ); +} + struct TriggersAssignmentConfig { our_assigned_tranche: DelayTranche, assign_validator_tranche: F1, @@ -4791,6 +4810,133 @@ fn subsystem_relaunches_approval_work_on_restart() { }); } +/// Test that we retry the approval of candidate on availability failure, up to max retries. +#[test] +fn subsystem_relaunches_approval_work_on_availability_failure() { + let assignment_criteria = Box::new(MockAssignmentCriteria( + || { + let mut assignments = HashMap::new(); + + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFModuloCompact { + core_bitfield: vec![CoreIndex(0), CoreIndex(2)].try_into().unwrap(), + }), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + + let _ = assignments.insert( + CoreIndex(1), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFDelay { + core_index: CoreIndex(1), + }), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + assignments + }, + |_| Ok(0), + )); + let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build(); + let store = config.backend(); + + test_harness(config, |test_harness| async move { + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness; + + setup_overseer_with_blocks_with_two_assignments_triggered( + &mut virtual_overseer, + store, + &clock, + sync_oracle_handle, + ) + .await; + + // We have two candidates for one we are going to fail the availability for up to + // max_retries and for the other we are going to succeed on the last retry, so we should + // see the approval being distributed. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _ + )) => { + } + ); + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data_failure(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + recover_available_data(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive { + exec_kind, + response_sender, + .. + }) if exec_kind == PvfExecKind::Approval => { + response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) + .unwrap(); + } + ); + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 1, + })); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_)) + ); + + // Assert that there are no more messages being sent by the subsystem + assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); + + virtual_overseer + }); +} + // Test that cached approvals, which are candidates that we approved but we didn't issue // the signature yet because we want to coalesce it with more candidate are sent after restart. #[test] diff --git a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs index 1b20960a3f8a..5f1689cb226b 100644 --- a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs @@ -891,6 +891,8 @@ fn build_overseer( state.approval_voting_parallel_metrics.approval_voting_metrics(), Arc::new(system_clock.clone()), Arc::new(SpawnGlue(spawn_task_handle.clone())), + 1, + Duration::from_secs(1), ); let approval_distribution = ApprovalDistribution::new_with_clock( diff --git a/prdoc/pr_6807.prdoc b/prdoc/pr_6807.prdoc new file mode 100644 index 000000000000..b9564dfb2fe2 --- /dev/null +++ b/prdoc/pr_6807.prdoc @@ -0,0 +1,19 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Retry approval on availability failure if the check is still needed + +doc: + - audience: Node Dev + description: | + Recovering the POV can fail in situation where the node just restart and the DHT topology + wasn't fully discovered yet, so the current node can't connect to most of its Peers. + This is bad because for gossiping the assignment you need to be connected to just a few + peers, so because we can't approve the candidate other nodes will see this as a no show. + Fix it by retrying to approve a candidate for a fixed number of atttempts if the block is + still needed. + + +crates: + - name: polkadot-node-core-approval-voting + bump: minor