Skip to content

Commit

Permalink
fix(node): avoid obsoleted fetch request blame bad_node
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Jan 28, 2025
1 parent 9463264 commit 1224c87
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 14 deletions.
4 changes: 2 additions & 2 deletions ant-networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use ant_protocol::{
#[cfg(feature = "open-metrics")]
use std::collections::HashSet;
use std::{
collections::BTreeSet,
collections::BTreeMap,
fmt::{Debug, Formatter},
};
use tokio::sync::oneshot;
Expand Down Expand Up @@ -131,7 +131,7 @@ pub enum NetworkEvent {
/// Terminate Node on unrecoverable errors
TerminateNode { reason: TerminateNodeReason },
/// List of peer nodes that failed to fetch replication copy from.
FailedToFetchHolders(BTreeSet<PeerId>),
FailedToFetchHolders(BTreeMap<PeerId, RecordKey>),
/// Quotes to be verified
QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> },
/// Fresh replicate to fetch
Expand Down
12 changes: 6 additions & 6 deletions ant-networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use libp2p::{
kad::{KBucketDistance as Distance, RecordKey, K_VALUE},
PeerId,
};
use std::collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque};
use std::collections::{hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque};
use tokio::{sync::mpsc, time::Duration};

// Max parallel fetches that can be undertaken at the same time.
Expand Down Expand Up @@ -528,19 +528,19 @@ impl ReplicationFetcher {
}
});

let mut failed_holders = BTreeSet::new();
let mut failed_holders = BTreeMap::new();

for (record_key, peer_id) in failed_fetches {
error!(
"Failed to fetch {:?} from {peer_id:?}",
debug!(
"Replication_fetcher has outdated fetch of {:?} from {peer_id:?}",
PrettyPrintRecordKey::from(&record_key)
);
let _ = failed_holders.insert(peer_id);
let _ = failed_holders.insert(peer_id, record_key);
}

// now to clear any failed nodes from our lists.
self.to_be_fetched
.retain(|(_, _, holder), _| !failed_holders.contains(holder));
.retain(|(_, _, holder), _| !failed_holders.contains_key(holder));

// Such failed_hodlers (if any) shall be reported back and be excluded from RT.
if !failed_holders.is_empty() {
Expand Down
15 changes: 12 additions & 3 deletions ant-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,19 @@ impl Node {
// Note: this log will be checked in CI, and expecting `not appear`.
// any change to the keyword `failed to fetch` shall incur
// correspondent CI script change as well.
error!("Received notification from replication_fetcher, notifying {bad_nodes:?} failed to fetch replication copies from.");
debug!("Received notification from replication_fetcher, notifying {bad_nodes:?} failed to fetch replication copies from.");
let _handle = spawn(async move {
for peer_id in bad_nodes {
network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
for (peer_id, record_key) in bad_nodes {
// Obsoleted fetch request (due to flooded in fresh replicates) could result
// in peer to be claimed as bad, as local copy blocks the entry to be cleared.
if let Ok(false) = network.is_record_key_present_locally(&record_key).await
{
error!(
"From peer {peer_id:?}, failed to fetch record {:?}",
PrettyPrintRecordKey::from(&record_key)
);
network.record_node_issues(peer_id, NodeIssue::ReplicationFailure);
}
}
});
}
Expand Down
6 changes: 3 additions & 3 deletions ant-node/tests/data_with_churn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ async fn data_availability_during_churn() -> Result<()> {
fn create_graph_entry_task(
client: Client,
wallet: Wallet,
content: ContentList,
content_list: ContentList,
churn_period: Duration,
) -> JoinHandle<Result<()>> {
let handle: JoinHandle<Result<()>> = tokio::spawn(async move {
Expand Down Expand Up @@ -412,7 +412,7 @@ fn create_graph_entry_task(
Ok((cost, addr)) => {
println!("Uploaded graph_entry to {addr:?} with cost of {cost:?} after a delay of: {delay:?}");
let net_addr = NetworkAddress::GraphEntryAddress(addr);
content.write().await.push_back(net_addr);
content_list.write().await.push_back(net_addr);
break;
}
Err(err) => {
Expand Down Expand Up @@ -745,7 +745,7 @@ async fn final_retry_query_content(
} else {
attempts += 1;
let delay = 2 * churn_period;
debug!("Delaying last check for {delay:?} ...");
debug!("Delaying last check of {net_addr:?} for {delay:?} ...");
sleep(delay).await;
continue;
}
Expand Down

0 comments on commit 1224c87

Please sign in to comment.