Skip to content

Commit

Permalink
split send_message into multiple RpcSender methods,
Browse files Browse the repository at this point in the history
to allow for better handling of each message send.
  • Loading branch information
jxs committed Nov 27, 2023
1 parent dd13fcd commit 92a171c
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 79 deletions.
129 changes: 76 additions & 53 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use crate::types::{
ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription,
SubscriptionAction,
};
use crate::types::{PeerConnections, PeerKind, RpcOut};
use crate::types::{PeerConnections, PeerKind};
use crate::{backoff::BackoffStorage, types::RpcSender};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
Expand Down Expand Up @@ -538,11 +538,14 @@ where
}

// send subscription request to all peers
for peer in self.peer_topics.keys().copied().collect::<Vec<_>>() {
for peer in self.peer_topics.keys() {
tracing::debug!(%peer, "Sending SUBSCRIBE to peer");
let event = RpcOut::Subscribe(topic_hash.clone());
self.send_message(peer, event)
.expect("Subscribe messages should be always sent");
let sender = self
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist");

sender.subscribe(topic_hash.clone());
}

// call JOIN(topic)
Expand All @@ -566,11 +569,14 @@ where
}

// announce to all peers
for peer in self.peer_topics.keys().copied().collect::<Vec<_>>() {
for peer in self.peer_topics.keys() {
tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
let event = RpcOut::Unsubscribe(topic_hash.clone());
self.send_message(peer, event)
.expect("Subscribe messages should be always sent");
let sender = self
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist");

sender.unsubscribe(topic_hash.clone());
}

// call LEAVE(topic)
Expand Down Expand Up @@ -720,8 +726,13 @@ where
let mut errors = 0;
for peer_id in recipient_peers.iter() {
tracing::trace!(peer=%peer_id, "Sending message to peer");
if self
.send_message(*peer_id, RpcOut::Publish(raw_message.clone()))
let sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist");

if sender
.publish(raw_message.clone(), self.metrics.as_mut())
.is_err()
{
errors += 1;
Expand Down Expand Up @@ -1326,7 +1337,12 @@ where
);
} else {
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
let _ = self.send_message(*peer_id, RpcOut::Forward(msg));
let sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist");

sender.forward(msg, self.metrics.as_mut());
}
}
}
Expand Down Expand Up @@ -1479,13 +1495,17 @@ where
if !to_prune_topics.is_empty() {
// build the prune messages to send
let on_unsubscribe = false;
let mut sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist")
.clone();

for action in to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
.collect::<Vec<_>>()
{
self.send_message(*peer_id, RpcOut::Control(action))
.expect("PRUNE messages should always be sent");
sender.control(action);
}
// Send the prune messages to the peer
tracing::debug!(
Expand Down Expand Up @@ -1980,13 +2000,16 @@ where

// If we need to send grafts to peer, do so immediately, rather than waiting for the
// heartbeat.
let sender = self
.handler_send_queues
.get_mut(propagation_source)
.expect("Peerid should exist");

for action in topics_to_graft
.into_iter()
.map(|topic_hash| ControlAction::Graft { topic_hash })
.collect::<Vec<_>>()
{
self.send_message(*propagation_source, RpcOut::Control(action))
.expect("GRAFT messages should always be sent");
sender.control(action);
}

// Notify the application of the subscriptions
Expand Down Expand Up @@ -2521,6 +2544,13 @@ where
// It therefore must be in at least one mesh and we do not need to inform the handler
// of its removal from another.

// send the control messages
let mut sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist")
.clone();

// The following prunes are not due to unsubscribing.
let prunes = to_prune
.remove(&peer)
Expand All @@ -2535,10 +2565,8 @@ where
)
});

// send the control messages
for msg in control_msgs.chain(prunes).collect::<Vec<_>>() {
self.send_message(peer, RpcOut::Control(msg))
.expect("PRUNE messages should always be sent");
for msg in control_msgs.chain(prunes) {
sender.control(msg);
}
}

Expand All @@ -2552,8 +2580,13 @@ where
self.config.do_px() && !no_px.contains(peer),
false,
);
self.send_message(*peer, RpcOut::Control(prune))
.expect("PRUNE messages should always be sent");
let mut sender = self
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist")
.clone();

sender.control(prune);

// inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -2621,11 +2654,13 @@ where

// forward the message to peers
if !recipient_peers.is_empty() {
let event = RpcOut::Forward(message.clone());

for peer in recipient_peers.iter() {
tracing::debug!(%peer, message=%msg_id, "Sending message to peer");
let _ = self.send_message(*peer, event.clone());
let sender = self
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist");
sender.forward(message.clone(), self.metrics.as_mut());
}
tracing::debug!("Completed forwarding message");
Ok(true)
Expand Down Expand Up @@ -2739,36 +2774,19 @@ where
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
for msg in controls {
let _ = self.send_message(peer, RpcOut::Control(msg));
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");

sender.control(msg);
}
}

// This clears all pending IWANT messages
self.pending_iwant_msgs.clear();
}

/// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it
/// is not already an arc.
fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> Result<(), ()> {
let sender = self
.handler_send_queues
.get_mut(&peer_id)
.expect("Peerid should exist");

if sender.try_send(rpc.clone()).is_err() {
tracing::debug!(peer=%peer_id, "Dropping message as peer is full");
return Err(());
}

if let Some(m) = self.metrics.as_mut() {
if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc {
// register bytes sent on the internal metrics.
m.msg_sent(&message.topic, message.raw_protobuf_len());
}
}
Ok(())
}

fn on_connection_established(
&mut self,
ConnectionEstablished {
Expand Down Expand Up @@ -2834,9 +2852,14 @@ where

tracing::debug!(peer=%peer_id, "New peer connected");
// We need to send our subscriptions to the newly-connected node.
let mut sender = self
.handler_send_queues
.get_mut(&peer_id)
.expect("Peerid should exist")
.clone();

for topic_hash in self.mesh.clone().into_keys() {
self.send_message(peer_id, RpcOut::Subscribe(topic_hash))
.expect("Subscribe messages should be always sent");
sender.subscribe(topic_hash);
}
}

Expand Down Expand Up @@ -3420,7 +3443,7 @@ impl fmt::Debug for PublishConfig {
#[cfg(test)]
mod local_test {
use super::*;
use crate::IdentTopic;
use crate::{types::RpcOut, IdentTopic};
use quickcheck::*;

fn test_message() -> RawMessage {
Expand Down
87 changes: 61 additions & 26 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// DEALINGS IN THE SOFTWARE.

//! A collection of types using the Gossipsub system.
use crate::metrics::Metrics;
use crate::TopicHash;
use async_channel::{Receiver, Sender};
use futures::Stream;
Expand Down Expand Up @@ -555,35 +556,69 @@ impl RpcSender {
self.receiver.clone()
}

/// Send `RpcOut`s to the `ConnectionHandler` according to their priority.
pub(crate) fn try_send(&mut self, rpc: RpcOut) -> Result<(), ()> {
// Forward messages, IWANT and IHAVE control messages are regarded as low priority.
match rpc {
rpc @ RpcOut::Forward(_)
| rpc @ RpcOut::Control(ControlAction::IHave { .. })
| rpc @ RpcOut::Control(ControlAction::IWant { .. }) => {
if let Err(err) = self.non_priority.try_send(rpc) {
let rpc = err.into_inner();
tracing::trace!("{rpc:?} message dropped, queue is full");
}
}
// GRAFT and PRUNE control messages, Subscription, and Publishes messages.
// Publish messages are limited to the capacity of the queue.
rpc @ RpcOut::Control(_)
| rpc @ RpcOut::Subscribe(_)
| rpc @ RpcOut::Unsubscribe(_) => {
self.priority.try_send(rpc).expect("Channel is unbounded");
}
rpc @ RpcOut::Publish(_) => {
if self.len.load(Ordering::Relaxed) >= self.cap {
return Err(());
}
self.priority.try_send(rpc).expect("Channel is unbounded");
self.len.fetch_add(1, Ordering::Relaxed);
}
/// Send a `RpcOut::Control` message to the `RpcReceiver`
/// this is high priority.
pub(crate) fn control(&mut self, control: ControlAction) {
self.priority
.try_send(RpcOut::Control(control))
.expect("Channel is unbounded and should always be open");
}

/// Send a `RpcOut::Subscribe` message to the `RpcReceiver`
/// this is high priority.
pub(crate) fn subscribe(&mut self, topic: TopicHash) {
self.priority
.try_send(RpcOut::Subscribe(topic))
.expect("Channel is unbounded and should always be open");
}

/// Send a `RpcOut::Unsubscribe` message to the `RpcReceiver`
/// this is high priority.
pub(crate) fn unsubscribe(&mut self, topic: TopicHash) {
self.priority
.try_send(RpcOut::Unsubscribe(topic))
.expect("Channel is unbounded and should always be open");
}

/// Send a `RpcOut::Publish` message to the `RpcReceiver`
/// this is high priority. If message sending fails, an `Err` is returned.
pub(crate) fn publish(
&mut self,
message: RawMessage,
metrics: Option<&mut Metrics>,
) -> Result<(), ()> {
if self.len.load(Ordering::Relaxed) >= self.cap {
return Err(());
}
self.priority
.try_send(RpcOut::Publish(message.clone()))
.expect("Channel is unbounded and Should always be open");
self.len.fetch_add(1, Ordering::Relaxed);

if let Some(m) = metrics {
m.msg_sent(&message.topic, message.raw_protobuf_len());
}

Ok(())
}

/// Send a `RpcOut::Forward` message to the `RpcReceiver`
/// this is high priority. If the queue is full the message is discarded.
pub(crate) fn forward(&mut self, message: RawMessage, metrics: Option<&mut Metrics>) {
if let Err(err) = self.non_priority.try_send(RpcOut::Forward(message.clone())) {
let rpc = err.into_inner();
tracing::trace!(
"{:?} message to peer {} dropped, queue is full",
rpc,
self.peer_id
);
return;
}

if let Some(m) = metrics {
m.msg_sent(&message.topic, message.raw_protobuf_len());
}
}
}

/// `RpcOut` sender that is priority aware.
Expand Down

0 comments on commit 92a171c

Please sign in to comment.