Skip to content

Commit

Permalink
reworks gossip run_listen and process_packets (#4971)
Browse files Browse the repository at this point in the history
The commit bypasses extend below
https://github.com/anza-xyz/agave/blob/0844bbca7/gossip/src/cluster_info.rs#L2188
by working with VecDeque<Vec<...>> type instead.

Filtering on shred-version is also done in-place, instead of collecting
a new vector:
https://github.com/anza-xyz/agave/blob/0844bbca7/gossip/src/cluster_info.rs#L1955-L1968

The filtering logic is also updated to ignore deprecated NodeInstance
and LegacyContactInfo types:
https://github.com/anza-xyz/agave/blob/0844bbca7/gossip/src/cluster_info.rs#L2879-L2900
  • Loading branch information
behzadnouri authored Feb 13, 2025
1 parent a1ed2b1 commit 52cf690
Showing 1 changed file with 85 additions and 94 deletions.
179 changes: 85 additions & 94 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1936,7 +1936,7 @@ impl ClusterInfo {

fn process_packets(
&self,
packets: VecDeque<(/*from:*/ SocketAddr, Protocol)>,
mut packets: VecDeque<Vec<(/*from:*/ SocketAddr, Protocol)>>,
thread_pool: &ThreadPool,
recycler: &PacketBatchRecycler,
response_sender: &PacketBatchSender,
Expand All @@ -1948,27 +1948,24 @@ impl ClusterInfo {
let self_pubkey = self.id();
// Filter out values if the shred-versions are different.
let self_shred_version = self.my_shred_version();
let packets = if self_shred_version == 0 {
packets
} else {
if self_shred_version != 0 {
let gossip_crds = self.gossip.crds.read().unwrap();
thread_pool.install(|| {
packets
.into_par_iter()
.with_min_len(1024)
.filter_map(|(from, msg)| {
let msg = filter_on_shred_version(
msg,
self_shred_version,
&gossip_crds,
&self.stats,
)?;
Some((from, msg))
})
.collect()
})
};

let discard_different_shred_version = |msg| {
discard_different_shred_version(msg, self_shred_version, &gossip_crds, &self.stats)
};
if packets.len() < 4 && packets.iter().map(Vec::len).sum::<usize>() < 16 {
for (_, msg) in packets.iter_mut().flatten() {
discard_different_shred_version(msg);
}
} else {
thread_pool.install(|| {
packets
.par_iter_mut()
.flatten()
.for_each(|(_, msg)| discard_different_shred_version(msg))
})
}
}
// Check if there is a duplicate instance of
// this node with more recent timestamp.
let check_duplicate_instance = {
Expand Down Expand Up @@ -2008,9 +2005,13 @@ impl ClusterInfo {
let mut prune_messages = vec![];
let mut ping_messages = vec![];
let mut pong_messages = vec![];
for (from_addr, packet) in packets {
for (from_addr, packet) in packets.into_iter().flatten() {
match packet {
Protocol::PullRequest(filter, caller) => {
if !check_pull_request_shred_version(self_shred_version, &caller) {
self.stats.skip_pull_shred_version.add_relaxed(1);
continue;
}
let request = PullRequest {
pubkey: caller.pubkey(),
addr: from_addr,
Expand Down Expand Up @@ -2183,15 +2184,23 @@ impl ClusterInfo {
let _st = ScopedTimer::from(&self.stats.gossip_listen_loop_time);
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
const SUBMIT_GOSSIP_STATS_INTERVAL: Duration = Duration::from_secs(2);
let mut packets = VecDeque::from(receiver.recv_timeout(RECV_TIMEOUT)?);
for payload in receiver.try_iter() {
packets.extend(payload);
let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC);
if excess_count > 0 {
packets.drain(0..excess_count);
let mut num_packets = 0;
let mut packets = VecDeque::with_capacity(2);
for pkts in receiver
.recv_timeout(RECV_TIMEOUT)
.map(std::iter::once)?
.chain(receiver.try_iter())
{
num_packets += pkts.len();
packets.push_back(pkts);
while num_packets > MAX_GOSSIP_TRAFFIC {
let Some(num) = packets.pop_front().as_ref().map(Vec::len) else {
break;
};
self.stats
.gossip_packets_dropped_count
.add_relaxed(excess_count as u64);
.add_relaxed(num as u64);
num_packets -= num;
}
}
let stakes = epoch_specs
Expand Down Expand Up @@ -2867,80 +2876,62 @@ pub fn push_messages_to_peer_for_tests(
Ok(())
}

// Filters out values from nodes with different shred-version.
fn filter_on_shred_version(
mut msg: Protocol,
// Checks shred-version of a pull-request caller and returns false if the
// pull-request should be ignored and discarded.
#[inline]
#[must_use]
fn check_pull_request_shred_version(self_shred_version: u16, caller: &CrdsValue) -> bool {
let shred_version = match caller.data() {
CrdsData::ContactInfo(node) => node.shred_version(),
CrdsData::LegacyContactInfo(node) => node.shred_version(),
_ => return false,
};
// Allow spy nodes with shred-verion == 0 to pull from other nodes.
shred_version == 0u16 || shred_version == self_shred_version
}

// Discards CrdsValues in PushMessages and PullResponses from nodes with
// different shred-version.
// ContactInfos are always exempted from shred-version check in order to:
// * Allow nodes to update their shred-version.
// * Prevent two running instances of the same identity key from
// cross-contaminating gossip across clusters; see check_duplicate_instance.
fn discard_different_shred_version(
msg: &mut Protocol,
self_shred_version: u16,
crds: &Crds,
stats: &GossipStats,
) -> Option<Protocol> {
let filter_values = |from: &Pubkey, values: &mut Vec<CrdsValue>, skipped_counter: &Counter| {
let num_values = values.len();
// Node-instances are always exempted from shred-version check so that:
// * their propagation across cluster is expedited.
// * prevent two running instances of the same identity key cross
// contaminate gossip between clusters.
if crds.get_shred_version(from) == Some(self_shred_version) {
values.retain(|value| match value.data() {
// Allow contact-infos so that shred-versions are updated.
CrdsData::ContactInfo(_) => true,
CrdsData::LegacyContactInfo(_) => true,
CrdsData::NodeInstance(_) => true,
// Only retain values with the same shred version.
_ => crds.get_shred_version(&value.pubkey()) == Some(self_shred_version),
})
} else {
values.retain(|value| match value.data() {
// Allow node to update its own contact info in case their
// shred-version changes
CrdsData::ContactInfo(node) => node.pubkey() == from,
CrdsData::LegacyContactInfo(node) => node.pubkey() == from,
CrdsData::NodeInstance(_) => true,
_ => false,
})
}
let num_skipped = num_values - values.len();
if num_skipped != 0 {
skipped_counter.add_relaxed(num_skipped as u64);
}
};
match &mut msg {
Protocol::PullRequest(_, caller) => match caller.data() {
// Allow spy nodes with shred-verion == 0 to pull from other nodes.
CrdsData::LegacyContactInfo(node)
if node.shred_version() == 0 || node.shred_version() == self_shred_version =>
{
Some(msg)
}
CrdsData::ContactInfo(node)
if node.shred_version() == 0 || node.shred_version() == self_shred_version =>
{
Some(msg)
}
_ => {
stats.skip_pull_shred_version.add_relaxed(1);
None
}
},
) {
let (from, values, skip_shred_version_counter) = match msg {
Protocol::PullResponse(from, values) => {
filter_values(from, values, &stats.skip_pull_response_shred_version);
if values.is_empty() {
None
} else {
Some(msg)
}
(from, values, &stats.skip_pull_response_shred_version)
}
Protocol::PushMessage(from, values) => {
filter_values(from, values, &stats.skip_push_message_shred_version);
if values.is_empty() {
None
} else {
Some(msg)
}
(from, values, &stats.skip_push_message_shred_version)
}
// Shred-version on pull-request callers can be checked without a lock
// on CRDS table and is so verified separately (by
// check_pull_request_shred_version).
Protocol::PullRequest(..) => return,
// No CRDS values in Prune, Ping and Pong messages.
Protocol::PruneMessage(_, _) | Protocol::PingMessage(_) | Protocol::PongMessage(_) => {
Some(msg)
return
}
};
let num_values = values.len();
if crds.get_shred_version(from) == Some(self_shred_version) {
// Retain ContactInfos or values with the same shred version.
values.retain(|value| {
matches!(value.data(), CrdsData::ContactInfo(_))
|| crds.get_shred_version(&value.pubkey()) == Some(self_shred_version)
})
} else {
// Only retain ContactInfos.
values.retain(|value| matches!(value.data(), CrdsData::ContactInfo(_)));
}
let num_skipped = num_values - values.len();
if num_skipped != 0 {
skip_shred_version_counter.add_relaxed(num_skipped as u64);
}
}

Expand Down

0 comments on commit 52cf690

Please sign in to comment.