diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 823db7845e..9952ea671a 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -665,6 +665,8 @@ impl NetworkBuilder { local: self.local, is_client, is_behind_home_network: self.is_behind_home_network, + #[cfg(feature = "open-metrics")] + close_group: Vec::with_capacity(CLOSE_GROUP_SIZE), peers_in_rt: 0, bootstrap, relay_manager, @@ -715,6 +717,8 @@ pub struct SwarmDriver { pub(crate) local: bool, pub(crate) is_client: bool, pub(crate) is_behind_home_network: bool, + #[cfg(feature = "open-metrics")] + pub(crate) close_group: Vec, pub(crate) peers_in_rt: usize, pub(crate) bootstrap: ContinuousBootstrap, pub(crate) external_address_manager: ExternalAddressManager, @@ -991,6 +995,13 @@ impl SwarmDriver { metrics_recorder.record_from_marker(marker) } } + #[cfg(feature = "open-metrics")] + /// Updates metrics that rely on our current close group. + pub(crate) fn record_change_in_close_group(&self, new_close_group: Vec) { + if let Some(metrics_recorder) = self.metrics_recorder.as_ref() { + metrics_recorder.record_change_in_close_group(new_close_group); + } + } /// Listen on the provided address. Also records it within RelayManager pub(crate) fn listen_on(&mut self, addr: Multiaddr) -> Result<()> { diff --git a/sn_networking/src/event/mod.rs b/sn_networking/src/event/mod.rs index ede545ae9e..7ad5db07c7 100644 --- a/sn_networking/src/event/mod.rs +++ b/sn_networking/src/event/mod.rs @@ -23,11 +23,11 @@ use libp2p::{ use sn_protocol::{ messages::{Query, Request, Response}, - NetworkAddress, PrettyPrintRecordKey, + NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use sn_transfers::PaymentQuote; use std::{ - collections::BTreeSet, + collections::{BTreeSet, HashSet}, fmt::{Debug, Formatter}, }; use tokio::sync::oneshot; @@ -216,6 +216,28 @@ impl Debug for NetworkEvent { } impl SwarmDriver { + /// Check for changes in our close group + #[cfg(feature = "open-metrics")] + pub(crate) fn check_for_change_in_our_close_group(&mut self) { + // this includes self + let closest_k_peers = self.get_closest_k_value_local_peers(); + + let new_closest_peers: Vec<_> = + closest_k_peers.into_iter().take(CLOSE_GROUP_SIZE).collect(); + + let old = self.close_group.iter().cloned().collect::>(); + let new_members: Vec<_> = new_closest_peers + .iter() + .filter(|p| !old.contains(p)) + .collect(); + if !new_members.is_empty() { + debug!("The close group has been updated. The new members are {new_members:?}"); + debug!("New close group: {new_closest_peers:?}"); + self.close_group = new_closest_peers.clone(); + self.record_change_in_close_group(new_closest_peers); + } + } + /// Update state on addition of a peer to the routing table. pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId) { self.peers_in_rt = self.peers_in_rt.saturating_add(1); @@ -226,6 +248,11 @@ impl SwarmDriver { self.log_kbuckets(&added_peer); self.send_event(NetworkEvent::PeerAdded(added_peer, self.peers_in_rt)); + #[cfg(feature = "open-metrics")] + if self.metrics_recorder.is_some() { + self.check_for_change_in_our_close_group(); + } + #[cfg(feature = "open-metrics")] if let Some(metrics_recorder) = &self.metrics_recorder { metrics_recorder @@ -244,6 +271,11 @@ impl SwarmDriver { self.log_kbuckets(&removed_peer); self.send_event(NetworkEvent::PeerRemoved(removed_peer, self.peers_in_rt)); + #[cfg(feature = "open-metrics")] + if self.metrics_recorder.is_some() { + self.check_for_change_in_our_close_group(); + } + #[cfg(feature = "open-metrics")] if let Some(metrics_recorder) = &self.metrics_recorder { metrics_recorder diff --git a/sn_networking/src/metrics/bad_node.rs b/sn_networking/src/metrics/bad_node.rs index 578ba25cce..7b64e248ec 100644 --- a/sn_networking/src/metrics/bad_node.rs +++ b/sn_networking/src/metrics/bad_node.rs @@ -7,20 +7,52 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::target_arch::interval; -use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue}; -use prometheus_client::metrics::{family::Family, gauge::Gauge}; -use std::time::{Duration, Instant}; +use libp2p::PeerId; +use prometheus_client::{ + encoding::{EncodeLabelSet, EncodeLabelValue}, + metrics::{family::Family, gauge::Gauge}, +}; +use sn_protocol::CLOSE_GROUP_SIZE; +use std::{ + collections::{HashSet, VecDeque}, + time::{Duration, Instant}, +}; use strum::IntoEnumIterator; const UPDATE_INTERVAL: Duration = Duration::from_secs(20); +#[cfg(not(test))] +const MAX_EVICTED_CLOSE_GROUP_PEERS: usize = 5 * CLOSE_GROUP_SIZE; +#[cfg(test)] +const MAX_EVICTED_CLOSE_GROUP_PEERS: usize = CLOSE_GROUP_SIZE + 2; + +pub struct BadNodeMetrics { + shunned_count_across_time_frames: ShunnedCountAcrossTimeFrames, + shunned_by_close_group: ShunnedByCloseGroup, +} + +pub enum BadNodeMetricsMsg { + ShunnedByPeer(PeerId), + CloseGroupUpdated(Vec), +} + +struct ShunnedByCloseGroup { + metric_current_group: Gauge, + metric_old_group: Gauge, + + // trackers + close_group_peers: Vec, + old_close_group_peers: VecDeque, + old_new_group_shunned_list: HashSet, +} + /// A struct to record the the number of reports against our node across different time frames. -pub struct ShunnedCountAcrossTimeFrames { +struct ShunnedCountAcrossTimeFrames { metric: Family, - tracked_values: Vec, + shunned_report_tracker: Vec, } -struct TrackedValue { +struct ShunnedReportTracker { time: Instant, least_bucket_it_fits_in: TimeFrameType, } @@ -77,38 +109,122 @@ impl TimeFrameType { } } -impl ShunnedCountAcrossTimeFrames { +impl BadNodeMetrics { pub fn spawn_background_task( time_based_shunned_count: Family, - ) -> tokio::sync::mpsc::Sender<()> { - let (tx, mut rx) = tokio::sync::mpsc::channel(10); + shunned_by_close_group: Gauge, + shunned_by_old_close_group: Gauge, + ) -> tokio::sync::mpsc::Sender { + let mut bad_node_metrics = BadNodeMetrics { + shunned_count_across_time_frames: ShunnedCountAcrossTimeFrames { + metric: time_based_shunned_count, + shunned_report_tracker: Vec::new(), + }, + shunned_by_close_group: ShunnedByCloseGroup { + metric_current_group: shunned_by_close_group, + metric_old_group: shunned_by_old_close_group, + + close_group_peers: Vec::new(), + old_close_group_peers: VecDeque::new(), + // Shunned by old or new close group + old_new_group_shunned_list: HashSet::new(), + }, + }; + let (tx, mut rx) = tokio::sync::mpsc::channel(10); tokio::spawn(async move { - let mut shunned_metrics = ShunnedCountAcrossTimeFrames { - metric: time_based_shunned_count, - tracked_values: Vec::new(), - }; let mut update_interval = interval(UPDATE_INTERVAL); update_interval.tick().await; loop { tokio::select! { - _ = rx.recv() => { - shunned_metrics.record_shunned_metric(); + msg = rx.recv() => { + match msg { + Some(BadNodeMetricsMsg::ShunnedByPeer(peer)) => { + bad_node_metrics.shunned_count_across_time_frames.record_shunned_metric(); + bad_node_metrics.shunned_by_close_group.record_shunned_metric(peer); + + } + Some(BadNodeMetricsMsg::CloseGroupUpdated(new_closest_peers)) => { + bad_node_metrics.shunned_by_close_group.update_close_group_peers(new_closest_peers); + } + None => break, + } + } _ = update_interval.tick() => { - shunned_metrics.update(); + bad_node_metrics.shunned_count_across_time_frames.try_update_state(); } } } }); tx } +} - pub fn record_shunned_metric(&mut self) { +impl ShunnedByCloseGroup { + pub(crate) fn record_shunned_metric(&mut self, peer: PeerId) { + // increment the metric if the peer is in the close group (new or old) and hasn't shunned us before + if !self.old_new_group_shunned_list.contains(&peer) { + if self.close_group_peers.contains(&peer) { + self.metric_current_group.inc(); + self.old_new_group_shunned_list.insert(peer); + } else if self.old_close_group_peers.contains(&peer) { + self.metric_old_group.inc(); + self.old_new_group_shunned_list.insert(peer); + } + } + } + + pub(crate) fn update_close_group_peers(&mut self, new_closest_peers: Vec) { + let new_members: Vec = new_closest_peers + .iter() + .filter(|p| !self.close_group_peers.contains(p)) + .cloned() + .collect(); + let evicted_members: Vec = self + .close_group_peers + .iter() + .filter(|p| !new_closest_peers.contains(p)) + .cloned() + .collect(); + for new_member in &new_members { + // if it has shunned us before, update the metrics. + if self.old_new_group_shunned_list.contains(new_member) { + self.metric_old_group.dec(); + self.metric_current_group.inc(); + } + } + + for evicted_member in &evicted_members { + self.old_close_group_peers.push_back(*evicted_member); + // if it has shunned us before, update the metrics. + if self.old_new_group_shunned_list.contains(evicted_member) { + self.metric_current_group.dec(); + self.metric_old_group.inc(); + } + } + + if !new_members.is_empty() { + debug!("The close group has been updated. The new members are {new_members:?}. The evicted members are {evicted_members:?}"); + self.close_group_peers = new_closest_peers; + + while self.old_close_group_peers.len() > MAX_EVICTED_CLOSE_GROUP_PEERS { + if let Some(removed_peer) = self.old_close_group_peers.pop_front() { + if self.old_new_group_shunned_list.remove(&removed_peer) { + self.metric_old_group.dec(); + } + } + } + } + } +} + +impl ShunnedCountAcrossTimeFrames { + fn record_shunned_metric(&mut self) { let now = Instant::now(); - self.tracked_values.push(TrackedValue { + self.shunned_report_tracker.push(ShunnedReportTracker { time: now, least_bucket_it_fits_in: TimeFrameType::LastTenMinutes, }); @@ -121,11 +237,11 @@ impl ShunnedCountAcrossTimeFrames { } } - pub fn update(&mut self) { + fn try_update_state(&mut self) { let now = Instant::now(); let mut idx_to_remove = Vec::new(); - for (idx, tracked_value) in self.tracked_values.iter_mut().enumerate() { + for (idx, tracked_value) in self.shunned_report_tracker.iter_mut().enumerate() { let time_elapsed_since_adding = now.duration_since(tracked_value.time).as_secs(); if time_elapsed_since_adding > tracked_value.least_bucket_it_fits_in.get_duration_sec() @@ -145,7 +261,7 @@ impl ShunnedCountAcrossTimeFrames { } // remove the ones that are now indefinite for idx in idx_to_remove { - self.tracked_values.remove(idx); + self.shunned_report_tracker.remove(idx); } } } @@ -153,16 +269,17 @@ impl ShunnedCountAcrossTimeFrames { #[cfg(test)] mod tests { use super::*; + use eyre::Result; #[test] - fn update_should_move_to_next_state() -> eyre::Result<()> { + fn update_should_move_to_next_timeframe() -> Result<()> { let mut shunned_metrics = ShunnedCountAcrossTimeFrames { metric: Family::default(), - tracked_values: Vec::new(), + shunned_report_tracker: Vec::new(), }; shunned_metrics.record_shunned_metric(); - let current_state = shunned_metrics.tracked_values[0].least_bucket_it_fits_in; + let current_state = shunned_metrics.shunned_report_tracker[0].least_bucket_it_fits_in; assert!(matches!(current_state, TimeFrameType::LastTenMinutes)); // all the counters should be 1 for variant in TimeFrameType::iter() { @@ -179,8 +296,8 @@ mod tests { std::thread::sleep(std::time::Duration::from_secs( current_state.get_duration_sec() + 1, )); - shunned_metrics.update(); - let current_state = shunned_metrics.tracked_values[0].least_bucket_it_fits_in; + shunned_metrics.try_update_state(); + let current_state = shunned_metrics.shunned_report_tracker[0].least_bucket_it_fits_in; assert!(matches!(current_state, TimeFrameType::LastHour)); // all the counters except LastTenMinutes should be 1 for variant in TimeFrameType::iter() { @@ -201,8 +318,8 @@ mod tests { std::thread::sleep(std::time::Duration::from_secs( current_state.get_duration_sec() + 1, )); - shunned_metrics.update(); - let current_state = shunned_metrics.tracked_values[0].least_bucket_it_fits_in; + shunned_metrics.try_update_state(); + let current_state = shunned_metrics.shunned_report_tracker[0].least_bucket_it_fits_in; assert!(matches!(current_state, TimeFrameType::LastSixHours)); // all the counters except LastTenMinutes and LastHour should be 1 for variant in TimeFrameType::iter() { @@ -223,8 +340,8 @@ mod tests { std::thread::sleep(std::time::Duration::from_secs( current_state.get_duration_sec() + 1, )); - shunned_metrics.update(); - let current_state = shunned_metrics.tracked_values[0].least_bucket_it_fits_in; + shunned_metrics.try_update_state(); + let current_state = shunned_metrics.shunned_report_tracker[0].least_bucket_it_fits_in; assert!(matches!(current_state, TimeFrameType::LastDay)); // all the counters except LastTenMinutes, LastHour and LastSixHours should be 1 for variant in TimeFrameType::iter() { @@ -248,8 +365,8 @@ mod tests { std::thread::sleep(std::time::Duration::from_secs( current_state.get_duration_sec() + 1, )); - shunned_metrics.update(); - let current_state = shunned_metrics.tracked_values[0].least_bucket_it_fits_in; + shunned_metrics.try_update_state(); + let current_state = shunned_metrics.shunned_report_tracker[0].least_bucket_it_fits_in; assert!(matches!(current_state, TimeFrameType::LastWeek)); // all the counters except LastTenMinutes, LastHour, LastSixHours and LastDay should be 1 for variant in TimeFrameType::iter() { @@ -274,8 +391,8 @@ mod tests { std::thread::sleep(std::time::Duration::from_secs( current_state.get_duration_sec() + 1, )); - shunned_metrics.update(); - assert_eq!(shunned_metrics.tracked_values.len(), 0); + shunned_metrics.try_update_state(); + assert_eq!(shunned_metrics.shunned_report_tracker.len(), 0); // all the counters except Indefinite should be 0 for variant in TimeFrameType::iter() { let time_frame = TimeFrame { @@ -290,4 +407,215 @@ mod tests { Ok(()) } + + #[test] + fn metrics_should_not_be_updated_if_close_group_is_not_set() -> Result<()> { + let mut close_group_shunned = ShunnedByCloseGroup { + metric_current_group: Gauge::default(), + metric_old_group: Gauge::default(), + + close_group_peers: Vec::new(), + old_close_group_peers: VecDeque::new(), + old_new_group_shunned_list: HashSet::new(), + }; + + close_group_shunned.record_shunned_metric(PeerId::random()); + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + Ok(()) + } + + #[test] + fn close_group_shunned_metric_should_be_updated_on_new_report() -> Result<()> { + let mut close_group_shunned = ShunnedByCloseGroup { + metric_current_group: Gauge::default(), + metric_old_group: Gauge::default(), + + close_group_peers: Vec::new(), + old_close_group_peers: VecDeque::new(), + old_new_group_shunned_list: HashSet::new(), + }; + close_group_shunned.update_close_group_peers(vec![ + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + ]); + // report by a peer in the close group should increment the metric + close_group_shunned.record_shunned_metric(close_group_shunned.close_group_peers[0]); + assert_eq!(close_group_shunned.metric_current_group.get(), 1); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + // report by same peer should not increment the metric + close_group_shunned.record_shunned_metric(close_group_shunned.close_group_peers[0]); + assert_eq!(close_group_shunned.metric_current_group.get(), 1); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + // report by a different peer should increment the metric + close_group_shunned.record_shunned_metric(close_group_shunned.close_group_peers[1]); + assert_eq!(close_group_shunned.metric_current_group.get(), 2); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + // report by a peer that is not in the close group should not increment the metric + close_group_shunned.record_shunned_metric(PeerId::random()); + assert_eq!(close_group_shunned.metric_current_group.get(), 2); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + Ok(()) + } + + #[test] + fn change_in_close_group_should_update_the_metrics() -> Result<()> { + let mut close_group_shunned = ShunnedByCloseGroup { + metric_current_group: Gauge::default(), + metric_old_group: Gauge::default(), + + close_group_peers: Vec::new(), + old_close_group_peers: VecDeque::new(), + old_new_group_shunned_list: HashSet::new(), + }; + close_group_shunned.update_close_group_peers(vec![ + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + ]); + let old_member = close_group_shunned.close_group_peers[0]; + close_group_shunned.record_shunned_metric(old_member); + assert_eq!(close_group_shunned.metric_current_group.get(), 1); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + // update close group + close_group_shunned.update_close_group_peers(vec![ + PeerId::random(), + close_group_shunned.close_group_peers[1], + close_group_shunned.close_group_peers[2], + close_group_shunned.close_group_peers[3], + close_group_shunned.close_group_peers[4], + ]); + + // the peer that shunned us before should now be in the old group + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 1); + + // report by the old member should not increment the metric + close_group_shunned.record_shunned_metric(old_member); + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 1); + + // update close group with old member + close_group_shunned.update_close_group_peers(vec![ + old_member, + close_group_shunned.close_group_peers[1], + close_group_shunned.close_group_peers[2], + close_group_shunned.close_group_peers[3], + close_group_shunned.close_group_peers[4], + ]); + + // the metrics of current_group and old_group should be updated + assert_eq!(close_group_shunned.metric_current_group.get(), 1); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + Ok(()) + } + + #[test] + fn update_close_group_metrics_on_reaching_max_evicted_peer_count() -> Result<()> { + let mut close_group_shunned = ShunnedByCloseGroup { + metric_current_group: Gauge::default(), + metric_old_group: Gauge::default(), + + close_group_peers: Vec::new(), + old_close_group_peers: VecDeque::new(), + old_new_group_shunned_list: HashSet::new(), + }; + close_group_shunned.update_close_group_peers(vec![ + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + ]); + + // evict 1 members + let old_member_1 = close_group_shunned.close_group_peers[0]; + close_group_shunned.update_close_group_peers(vec![ + close_group_shunned.close_group_peers[1], + close_group_shunned.close_group_peers[2], + close_group_shunned.close_group_peers[3], + close_group_shunned.close_group_peers[4], + PeerId::random(), + ]); + + // evict 1 members + let old_member_2 = close_group_shunned.close_group_peers[0]; + close_group_shunned.update_close_group_peers(vec![ + close_group_shunned.close_group_peers[1], + close_group_shunned.close_group_peers[2], + close_group_shunned.close_group_peers[3], + close_group_shunned.close_group_peers[4], + PeerId::random(), + ]); + + // report by the evicted members should increment the old group metric + close_group_shunned.record_shunned_metric(old_member_1); + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 1); + close_group_shunned.record_shunned_metric(old_member_2); + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 2); + + // evict all the members + close_group_shunned.update_close_group_peers(vec![ + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + ]); + + // the metrics should still remain the same + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 2); + + // evict 1 more members to cross the threshold + close_group_shunned.update_close_group_peers(vec![ + close_group_shunned.close_group_peers[1], + close_group_shunned.close_group_peers[2], + close_group_shunned.close_group_peers[3], + close_group_shunned.close_group_peers[4], + PeerId::random(), + ]); + + // the metric from the member_1 should be removed + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 1); + assert!(!close_group_shunned + .old_close_group_peers + .contains(&old_member_1)); + assert!(close_group_shunned + .old_close_group_peers + .contains(&old_member_2)); + + // evict 1 more member + close_group_shunned.update_close_group_peers(vec![ + close_group_shunned.close_group_peers[1], + close_group_shunned.close_group_peers[2], + close_group_shunned.close_group_peers[3], + close_group_shunned.close_group_peers[4], + PeerId::random(), + ]); + + // the metric from the member_2 should be removed + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + assert!(!close_group_shunned + .old_close_group_peers + .contains(&old_member_1)); + + Ok(()) + } } diff --git a/sn_networking/src/metrics/mod.rs b/sn_networking/src/metrics/mod.rs index 6e8fa60812..b2a701b576 100644 --- a/sn_networking/src/metrics/mod.rs +++ b/sn_networking/src/metrics/mod.rs @@ -12,11 +12,13 @@ pub mod service; #[cfg(feature = "upnp")] mod upnp; -#[cfg(feature = "open-metrics")] use crate::MetricsRegistries; use crate::{log_markers::Marker, target_arch::sleep}; -use bad_node::{ShunnedCountAcrossTimeFrames, TimeFrame}; -use libp2p::metrics::{Metrics as Libp2pMetrics, Recorder}; +use bad_node::{BadNodeMetrics, BadNodeMetricsMsg, TimeFrame}; +use libp2p::{ + metrics::{Metrics as Libp2pMetrics, Recorder}, + PeerId, +}; use prometheus_client::{ metrics::family::Family, metrics::{counter::Counter, gauge::Gauge}, @@ -52,16 +54,20 @@ pub(crate) struct NetworkMetricsRecorder { // bad node metrics bad_peers_count: Counter, - #[allow(dead_code)] // This is updated by the background task - shunned_across_time_frames: Family, shunned_count: Counter, + #[allow(dead_code)] // updated by background task + shunned_count_across_time_frames: Family, + #[allow(dead_code)] + shunned_by_close_group: Gauge, + #[allow(dead_code)] + shunned_by_old_close_group: Gauge, // system info process_memory_used_mb: Gauge, process_cpu_usage_percentage: Gauge, // helpers - shunned_report_notifier: tokio::sync::mpsc::Sender<()>, + bad_nodes_notifier: tokio::sync::mpsc::Sender, } impl NetworkMetricsRecorder { @@ -181,13 +187,29 @@ impl NetworkMetricsRecorder { .extended_metrics .sub_registry_with_prefix("sn_networking"); let shunned_count_across_time_frames = Family::default(); - let shunned_report_notifier = ShunnedCountAcrossTimeFrames::spawn_background_task( + extended_metrics_sub_registry.register( + "shunned_count_across_time_frames", + "The number of times our node has been shunned by other nodes across different time frames", shunned_count_across_time_frames.clone(), ); + + let shunned_by_close_group = Gauge::default(); extended_metrics_sub_registry.register( - "shunned_count_across_time_frames", - "The number of peers that have been shunned across different time frames", + "shunned_by_close_group", + "The number of close group peers that have shunned our node", + shunned_by_close_group.clone(), + ); + + let shunned_by_old_close_group = Gauge::default(); + extended_metrics_sub_registry.register( + "shunned_by_old_close_group", + "The number of close group peers that have shunned our node. This contains the peers that were once in our close group but have since been evicted.", + shunned_by_old_close_group.clone(), + ); + let bad_nodes_notifier = BadNodeMetrics::spawn_background_task( shunned_count_across_time_frames.clone(), + shunned_by_close_group.clone(), + shunned_by_old_close_group.clone(), ); let network_metrics = Self { @@ -207,13 +229,15 @@ impl NetworkMetricsRecorder { live_time, bad_peers_count, - shunned_across_time_frames: shunned_count_across_time_frames, + shunned_count_across_time_frames, shunned_count, + shunned_by_close_group, + shunned_by_old_close_group, process_memory_used_mb, process_cpu_usage_percentage, - shunned_report_notifier, + bad_nodes_notifier, }; network_metrics.system_metrics_recorder_task(); @@ -255,11 +279,15 @@ impl NetworkMetricsRecorder { Marker::PeerConsideredAsBad { .. } => { let _ = self.bad_peers_count.inc(); } - Marker::FlaggedAsBadNode { .. } => { + Marker::FlaggedAsBadNode { flagged_by } => { let _ = self.shunned_count.inc(); - let shunned_report_notifier = self.shunned_report_notifier.clone(); + let bad_nodes_notifier = self.bad_nodes_notifier.clone(); + let flagged_by = *flagged_by; crate::target_arch::spawn(async move { - if let Err(err) = shunned_report_notifier.send(()).await { + if let Err(err) = bad_nodes_notifier + .send(BadNodeMetricsMsg::ShunnedByPeer(flagged_by)) + .await + { error!("Failed to send shunned report via notifier: {err:?}"); } }); @@ -281,6 +309,18 @@ impl NetworkMetricsRecorder { _ => {} } } + + pub(crate) fn record_change_in_close_group(&self, new_close_group: Vec) { + let bad_nodes_notifier = self.bad_nodes_notifier.clone(); + crate::target_arch::spawn(async move { + if let Err(err) = bad_nodes_notifier + .send(BadNodeMetricsMsg::CloseGroupUpdated(new_close_group)) + .await + { + error!("Failed to send shunned report via notifier: {err:?}"); + } + }); + } } /// Impl the Recorder traits again for our struct.