From 1224c870a47472f2f888f3d945a1b69a6f8ae71b Mon Sep 17 00:00:00 2001 From: qima Date: Tue, 28 Jan 2025 20:38:16 +0800 Subject: [PATCH] fix(node): avoid obsoleted fetch request blame bad_node --- ant-networking/src/event/mod.rs | 4 ++-- ant-networking/src/replication_fetcher.rs | 12 ++++++------ ant-node/src/node.rs | 15 ++++++++++++--- ant-node/tests/data_with_churn.rs | 6 +++--- 4 files changed, 23 insertions(+), 14 deletions(-) diff --git a/ant-networking/src/event/mod.rs b/ant-networking/src/event/mod.rs index d247c27f6c..5932ce7ad6 100644 --- a/ant-networking/src/event/mod.rs +++ b/ant-networking/src/event/mod.rs @@ -30,7 +30,7 @@ use ant_protocol::{ #[cfg(feature = "open-metrics")] use std::collections::HashSet; use std::{ - collections::BTreeSet, + collections::BTreeMap, fmt::{Debug, Formatter}, }; use tokio::sync::oneshot; @@ -131,7 +131,7 @@ pub enum NetworkEvent { /// Terminate Node on unrecoverable errors TerminateNode { reason: TerminateNodeReason }, /// List of peer nodes that failed to fetch replication copy from. - FailedToFetchHolders(BTreeSet), + FailedToFetchHolders(BTreeMap), /// Quotes to be verified QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> }, /// Fresh replicate to fetch diff --git a/ant-networking/src/replication_fetcher.rs b/ant-networking/src/replication_fetcher.rs index 98e3165139..142e3565f7 100644 --- a/ant-networking/src/replication_fetcher.rs +++ b/ant-networking/src/replication_fetcher.rs @@ -17,7 +17,7 @@ use libp2p::{ kad::{KBucketDistance as Distance, RecordKey, K_VALUE}, PeerId, }; -use std::collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque}; +use std::collections::{hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque}; use tokio::{sync::mpsc, time::Duration}; // Max parallel fetches that can be undertaken at the same time. @@ -528,19 +528,19 @@ impl ReplicationFetcher { } }); - let mut failed_holders = BTreeSet::new(); + let mut failed_holders = BTreeMap::new(); for (record_key, peer_id) in failed_fetches { - error!( - "Failed to fetch {:?} from {peer_id:?}", + debug!( + "Replication_fetcher has outdated fetch of {:?} from {peer_id:?}", PrettyPrintRecordKey::from(&record_key) ); - let _ = failed_holders.insert(peer_id); + let _ = failed_holders.insert(peer_id, record_key); } // now to clear any failed nodes from our lists. self.to_be_fetched - .retain(|(_, _, holder), _| !failed_holders.contains(holder)); + .retain(|(_, _, holder), _| !failed_holders.contains_key(holder)); // Such failed_hodlers (if any) shall be reported back and be excluded from RT. if !failed_holders.is_empty() { diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index ce3186946a..8647504319 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -538,10 +538,19 @@ impl Node { // Note: this log will be checked in CI, and expecting `not appear`. // any change to the keyword `failed to fetch` shall incur // correspondent CI script change as well. - error!("Received notification from replication_fetcher, notifying {bad_nodes:?} failed to fetch replication copies from."); + debug!("Received notification from replication_fetcher, notifying {bad_nodes:?} failed to fetch replication copies from."); let _handle = spawn(async move { - for peer_id in bad_nodes { - network.record_node_issues(peer_id, NodeIssue::ReplicationFailure); + for (peer_id, record_key) in bad_nodes { + // Obsoleted fetch request (due to flooded in fresh replicates) could result + // in peer to be claimed as bad, as local copy blocks the entry to be cleared. + if let Ok(false) = network.is_record_key_present_locally(&record_key).await + { + error!( + "From peer {peer_id:?}, failed to fetch record {:?}", + PrettyPrintRecordKey::from(&record_key) + ); + network.record_node_issues(peer_id, NodeIssue::ReplicationFailure); + } } }); } diff --git a/ant-node/tests/data_with_churn.rs b/ant-node/tests/data_with_churn.rs index 77f22bff7a..688d83bb82 100644 --- a/ant-node/tests/data_with_churn.rs +++ b/ant-node/tests/data_with_churn.rs @@ -293,7 +293,7 @@ async fn data_availability_during_churn() -> Result<()> { fn create_graph_entry_task( client: Client, wallet: Wallet, - content: ContentList, + content_list: ContentList, churn_period: Duration, ) -> JoinHandle> { let handle: JoinHandle> = tokio::spawn(async move { @@ -412,7 +412,7 @@ fn create_graph_entry_task( Ok((cost, addr)) => { println!("Uploaded graph_entry to {addr:?} with cost of {cost:?} after a delay of: {delay:?}"); let net_addr = NetworkAddress::GraphEntryAddress(addr); - content.write().await.push_back(net_addr); + content_list.write().await.push_back(net_addr); break; } Err(err) => { @@ -745,7 +745,7 @@ async fn final_retry_query_content( } else { attempts += 1; let delay = 2 * churn_period; - debug!("Delaying last check for {delay:?} ..."); + debug!("Delaying last check of {net_addr:?} for {delay:?} ..."); sleep(delay).await; continue; }