diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 523687e3852..c822e02c2c5 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -60,7 +60,7 @@ use crate::types::{ ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription, SubscriptionAction, }; -use crate::types::{PeerConnections, PeerKind, RpcOut}; +use crate::types::{PeerConnections, PeerKind}; use crate::{backoff::BackoffStorage, types::RpcSender}; use crate::{rpc_proto::proto, TopicScoreParams}; use crate::{PublishError, SubscriptionError, ValidationError}; @@ -538,11 +538,14 @@ where } // send subscription request to all peers - for peer in self.peer_topics.keys().copied().collect::>() { + for peer in self.peer_topics.keys() { tracing::debug!(%peer, "Sending SUBSCRIBE to peer"); - let event = RpcOut::Subscribe(topic_hash.clone()); - self.send_message(peer, event) - .expect("Subscribe messages should be always sent"); + let sender = self + .handler_send_queues + .get_mut(peer) + .expect("Peerid should exist"); + + sender.subscribe(topic_hash.clone()); } // call JOIN(topic) @@ -566,11 +569,14 @@ where } // announce to all peers - for peer in self.peer_topics.keys().copied().collect::>() { + for peer in self.peer_topics.keys() { tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer"); - let event = RpcOut::Unsubscribe(topic_hash.clone()); - self.send_message(peer, event) - .expect("Subscribe messages should be always sent"); + let sender = self + .handler_send_queues + .get_mut(peer) + .expect("Peerid should exist"); + + sender.unsubscribe(topic_hash.clone()); } // call LEAVE(topic) @@ -720,8 +726,13 @@ where let mut errors = 0; for peer_id in recipient_peers.iter() { tracing::trace!(peer=%peer_id, "Sending message to peer"); - if self - .send_message(*peer_id, RpcOut::Publish(raw_message.clone())) + let sender = self + .handler_send_queues + .get_mut(peer_id) + .expect("Peerid should exist"); + + if sender + .publish(raw_message.clone(), self.metrics.as_mut()) .is_err() { errors += 1; @@ -1326,7 +1337,12 @@ where ); } else { tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); - let _ = self.send_message(*peer_id, RpcOut::Forward(msg)); + let sender = self + .handler_send_queues + .get_mut(peer_id) + .expect("Peerid should exist"); + + sender.forward(msg, self.metrics.as_mut()); } } } @@ -1479,13 +1495,17 @@ where if !to_prune_topics.is_empty() { // build the prune messages to send let on_unsubscribe = false; + let mut sender = self + .handler_send_queues + .get_mut(peer_id) + .expect("Peerid should exist") + .clone(); + for action in to_prune_topics .iter() .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) - .collect::>() { - self.send_message(*peer_id, RpcOut::Control(action)) - .expect("PRUNE messages should always be sent"); + sender.control(action); } // Send the prune messages to the peer tracing::debug!( @@ -1980,13 +2000,16 @@ where // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. + let sender = self + .handler_send_queues + .get_mut(propagation_source) + .expect("Peerid should exist"); + for action in topics_to_graft .into_iter() .map(|topic_hash| ControlAction::Graft { topic_hash }) - .collect::>() { - self.send_message(*propagation_source, RpcOut::Control(action)) - .expect("GRAFT messages should always be sent"); + sender.control(action); } // Notify the application of the subscriptions @@ -2521,6 +2544,13 @@ where // It therefore must be in at least one mesh and we do not need to inform the handler // of its removal from another. + // send the control messages + let mut sender = self + .handler_send_queues + .get_mut(&peer) + .expect("Peerid should exist") + .clone(); + // The following prunes are not due to unsubscribing. let prunes = to_prune .remove(&peer) @@ -2535,10 +2565,8 @@ where ) }); - // send the control messages - for msg in control_msgs.chain(prunes).collect::>() { - self.send_message(peer, RpcOut::Control(msg)) - .expect("PRUNE messages should always be sent"); + for msg in control_msgs.chain(prunes) { + sender.control(msg); } } @@ -2552,8 +2580,13 @@ where self.config.do_px() && !no_px.contains(peer), false, ); - self.send_message(*peer, RpcOut::Control(prune)) - .expect("PRUNE messages should always be sent"); + let mut sender = self + .handler_send_queues + .get_mut(peer) + .expect("Peerid should exist") + .clone(); + + sender.control(prune); // inform the handler peer_removed_from_mesh( @@ -2621,11 +2654,13 @@ where // forward the message to peers if !recipient_peers.is_empty() { - let event = RpcOut::Forward(message.clone()); - for peer in recipient_peers.iter() { tracing::debug!(%peer, message=%msg_id, "Sending message to peer"); - let _ = self.send_message(*peer, event.clone()); + let sender = self + .handler_send_queues + .get_mut(peer) + .expect("Peerid should exist"); + sender.forward(message.clone(), self.metrics.as_mut()); } tracing::debug!("Completed forwarding message"); Ok(true) @@ -2739,7 +2774,12 @@ where fn flush_control_pool(&mut self) { for (peer, controls) in self.control_pool.drain().collect::>() { for msg in controls { - let _ = self.send_message(peer, RpcOut::Control(msg)); + let sender = self + .handler_send_queues + .get_mut(&peer) + .expect("Peerid should exist"); + + sender.control(msg); } } @@ -2747,28 +2787,6 @@ where self.pending_iwant_msgs.clear(); } - /// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it - /// is not already an arc. - fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> Result<(), ()> { - let sender = self - .handler_send_queues - .get_mut(&peer_id) - .expect("Peerid should exist"); - - if sender.try_send(rpc.clone()).is_err() { - tracing::debug!(peer=%peer_id, "Dropping message as peer is full"); - return Err(()); - } - - if let Some(m) = self.metrics.as_mut() { - if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc { - // register bytes sent on the internal metrics. - m.msg_sent(&message.topic, message.raw_protobuf_len()); - } - } - Ok(()) - } - fn on_connection_established( &mut self, ConnectionEstablished { @@ -2834,9 +2852,14 @@ where tracing::debug!(peer=%peer_id, "New peer connected"); // We need to send our subscriptions to the newly-connected node. + let mut sender = self + .handler_send_queues + .get_mut(&peer_id) + .expect("Peerid should exist") + .clone(); + for topic_hash in self.mesh.clone().into_keys() { - self.send_message(peer_id, RpcOut::Subscribe(topic_hash)) - .expect("Subscribe messages should be always sent"); + sender.subscribe(topic_hash); } } @@ -3420,7 +3443,7 @@ impl fmt::Debug for PublishConfig { #[cfg(test)] mod local_test { use super::*; - use crate::IdentTopic; + use crate::{types::RpcOut, IdentTopic}; use quickcheck::*; fn test_message() -> RawMessage { diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index f503fb61a63..6799b6a2b15 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -19,6 +19,7 @@ // DEALINGS IN THE SOFTWARE. //! A collection of types using the Gossipsub system. +use crate::metrics::Metrics; use crate::TopicHash; use async_channel::{Receiver, Sender}; use futures::Stream; @@ -555,35 +556,69 @@ impl RpcSender { self.receiver.clone() } - /// Send `RpcOut`s to the `ConnectionHandler` according to their priority. - pub(crate) fn try_send(&mut self, rpc: RpcOut) -> Result<(), ()> { - // Forward messages, IWANT and IHAVE control messages are regarded as low priority. - match rpc { - rpc @ RpcOut::Forward(_) - | rpc @ RpcOut::Control(ControlAction::IHave { .. }) - | rpc @ RpcOut::Control(ControlAction::IWant { .. }) => { - if let Err(err) = self.non_priority.try_send(rpc) { - let rpc = err.into_inner(); - tracing::trace!("{rpc:?} message dropped, queue is full"); - } - } - // GRAFT and PRUNE control messages, Subscription, and Publishes messages. - // Publish messages are limited to the capacity of the queue. - rpc @ RpcOut::Control(_) - | rpc @ RpcOut::Subscribe(_) - | rpc @ RpcOut::Unsubscribe(_) => { - self.priority.try_send(rpc).expect("Channel is unbounded"); - } - rpc @ RpcOut::Publish(_) => { - if self.len.load(Ordering::Relaxed) >= self.cap { - return Err(()); - } - self.priority.try_send(rpc).expect("Channel is unbounded"); - self.len.fetch_add(1, Ordering::Relaxed); - } + /// Send a `RpcOut::Control` message to the `RpcReceiver` + /// this is high priority. + pub(crate) fn control(&mut self, control: ControlAction) { + self.priority + .try_send(RpcOut::Control(control)) + .expect("Channel is unbounded and should always be open"); + } + + /// Send a `RpcOut::Subscribe` message to the `RpcReceiver` + /// this is high priority. + pub(crate) fn subscribe(&mut self, topic: TopicHash) { + self.priority + .try_send(RpcOut::Subscribe(topic)) + .expect("Channel is unbounded and should always be open"); + } + + /// Send a `RpcOut::Unsubscribe` message to the `RpcReceiver` + /// this is high priority. + pub(crate) fn unsubscribe(&mut self, topic: TopicHash) { + self.priority + .try_send(RpcOut::Unsubscribe(topic)) + .expect("Channel is unbounded and should always be open"); + } + + /// Send a `RpcOut::Publish` message to the `RpcReceiver` + /// this is high priority. If message sending fails, an `Err` is returned. + pub(crate) fn publish( + &mut self, + message: RawMessage, + metrics: Option<&mut Metrics>, + ) -> Result<(), ()> { + if self.len.load(Ordering::Relaxed) >= self.cap { + return Err(()); } + self.priority + .try_send(RpcOut::Publish(message.clone())) + .expect("Channel is unbounded and Should always be open"); + self.len.fetch_add(1, Ordering::Relaxed); + + if let Some(m) = metrics { + m.msg_sent(&message.topic, message.raw_protobuf_len()); + } + Ok(()) } + + /// Send a `RpcOut::Forward` message to the `RpcReceiver` + /// this is high priority. If the queue is full the message is discarded. + pub(crate) fn forward(&mut self, message: RawMessage, metrics: Option<&mut Metrics>) { + if let Err(err) = self.non_priority.try_send(RpcOut::Forward(message.clone())) { + let rpc = err.into_inner(); + tracing::trace!( + "{:?} message to peer {} dropped, queue is full", + rpc, + self.peer_id + ); + return; + } + + if let Some(m) = metrics { + m.msg_sent(&message.topic, message.raw_protobuf_len()); + } + } } /// `RpcOut` sender that is priority aware.