Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing retry turnaround #2307

Merged
merged 5 commits into from
Feb 7, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 28 additions & 24 deletions zilliqa/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,21 @@ impl Sync {
///
/// We get a plain ACK in certain cases - treated as an empty response.
pub fn handle_acknowledgement(&mut self, from: PeerId) -> Result<()> {
self.empty_count = self.empty_count.saturating_add(1);
if let Some((peer, _)) = self.in_flight.as_ref() {
// downgrade peer due to empty response
if peer.peer_id == from {
tracing::warn!(to = %peer.peer_id,
"sync::Acknowledgement : empty response"
);
self.empty_count = self.empty_count.saturating_add(1);

self.peers
.done_with_peer(self.in_flight.take(), DownGrade::Empty);
// Retry if failed in Phase 2 for whatever reason
match self.state {
SyncState::Phase1(_) if Self::DO_SPECULATIVE => {
self.request_missing_metadata(None)?
}
// Retry if failed in Phase 2 for whatever reason
SyncState::Phase2(_) => self.state = SyncState::Retry1,
_ => {}
}
Expand All @@ -177,22 +177,22 @@ impl Sync {
///
/// This gets called for any libp2p request failure - treated as a network failure
pub fn handle_request_failure(&mut self, failure: OutgoingMessageFailure) -> Result<()> {
self.timeout_count = self.timeout_count.saturating_add(1);
// check if the request is a sync messages
if let Some((peer, req_id)) = self.in_flight.as_ref() {
// downgrade peer due to network failure
if peer.peer_id == failure.peer && *req_id == failure.request_id {
tracing::warn!(to = %peer.peer_id, err = %failure.error,
"sync::RequestFailure : network error"
);
self.timeout_count = self.timeout_count.saturating_add(1);

self.peers
.done_with_peer(self.in_flight.take(), DownGrade::Timeout);
// Retry if failed in Phase 2 for whatever reason
match self.state {
SyncState::Phase1(_) if Self::DO_SPECULATIVE => {
self.request_missing_metadata(None)?
}
// Retry if failed in Phase 2 for whatever reason
SyncState::Phase2(_) => self.state = SyncState::Retry1,
_ => {}
}
Expand Down Expand Up @@ -243,20 +243,10 @@ impl Sync {
if !self.db.contains_block(&parent_hash)? {
// No parent block, trigger sync
tracing::info!("sync::SyncProposal : syncing from {parent_hash}",);
let meta = self.recent_proposals.back().unwrap().header;

let highest_block = self
.db
.get_canonical_block_by_number(
self.db
.get_highest_canonical_block_number()?
.expect("no highest block"),
)?
.expect("missing highest block");
self.started_at_block_number = highest_block.number();

self.update_started_at()?;
// Ensure started_at_block_number is set before running this.
// https://github.com/Zilliqa/zq2/issues/2252#issuecomment-2636036676
let meta = self.recent_proposals.back().unwrap().header;
self.request_missing_metadata(Some(meta))?;
}
}
Expand Down Expand Up @@ -287,6 +277,8 @@ impl Sync {
}
// Retry to fix sync issues e.g. peers that are now offline
SyncState::Retry1 if self.in_pipeline == 0 => {
self.update_started_at()?;
// Ensure started is updated - https://github.com/Zilliqa/zq2/issues/2306
self.retry_phase1()?;
}
_ => {
Expand All @@ -297,6 +289,22 @@ impl Sync {
Ok(())
}

/// Update the startingBlock value.
///
/// Must be called before starting/re-starting Phase 1.
fn update_started_at(&mut self) -> Result<()> {
let highest_block = self
.db
.get_canonical_block_by_number(
self.db
.get_highest_canonical_block_number()?
.expect("no highest block"),
)?
.expect("missing highest block");
self.started_at_block_number = highest_block.number();
Ok(())
}

/// Convenience function to convert a block to a proposal (add full txs)
/// Should only be used for syncing history, not for consensus messages regarding new blocks.
fn block_to_proposal(&self, block: Block) -> Proposal {
Expand All @@ -323,6 +331,7 @@ impl Sync {
/// This will rebuild history from the previous marker, with another peer.
/// If this function is called many times, it will eventually restart from Phase 0.
fn retry_phase1(&mut self) -> Result<()> {
self.retry_count = self.retry_count.saturating_add(1);
if self.db.count_sync_segments()? == 0 {
tracing::error!("sync::RetryPhase1 : cannot retry phase 1 without chain segments!");
self.state = SyncState::Phase0;
Expand All @@ -333,7 +342,6 @@ impl Sync {
"sync::RetryPhase1 : retrying segment #{}",
self.db.count_sync_segments()?,
);
self.retry_count = self.retry_count.saturating_add(1);

// remove the last segment from the chain metadata
let (meta, _) = self.db.last_sync_segment()?.unwrap();
Expand Down Expand Up @@ -720,19 +728,15 @@ impl Sync {

// TODO: Implement dynamic sub-segments - https://github.com/Zilliqa/zq2/issues/2158

// Record the oldest block in the chain's parent
// Record the oldest block in the segment
self.state = SyncState::Phase1(segment.last().cloned().unwrap());

// If the check-point/starting-point is in this segment
let checkpointed = segment.iter().any(|b| b.hash == self.checkpoint_hash);
let range = std::ops::RangeInclusive::new(
segment.last().as_ref().unwrap().number,
segment.first().as_ref().unwrap().number,
);
let started = range.contains(&self.started_at_block_number);
let block_hash = segment.last().as_ref().unwrap().hash;

// If the segment hits our history, turnaround to Phase 2.
if started || checkpointed {
if checkpointed || self.db.contains_block(&block_hash)? {
self.state = SyncState::Phase2(Hash::ZERO);
} else if Self::DO_SPECULATIVE {
self.request_missing_metadata(None)?;
Expand Down
Loading