Skip to content

Commit

Permalink
replace and fix timeout messages with a field
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Nov 25, 2024
1 parent 658ab16 commit e1c06db
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 15 deletions.
16 changes: 7 additions & 9 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3078,30 +3078,28 @@ where
}

// Keep track of expired messages for the application layer.
let failed_messages = self.failed_messages.entry(propagation_source).or_default();
failed_messages.timeout += 1;
match rpc {
RpcOut::Publish { .. } => {
self.failed_messages
.entry(propagation_source)
.or_default()
.publish += 1;
failed_messages.publish += 1;
}
RpcOut::Forward { .. } => {
self.failed_messages
.entry(propagation_source)
.or_default()
.forward += 1;
failed_messages.forward += 1;
}
_ => {} //
_ => {}
}

// Record metrics on the failure.
if let Some(metrics) = self.metrics.as_mut() {
match rpc {
RpcOut::Publish { message, .. } => {
metrics.publish_msg_dropped(&message.topic);
metrics.timeout_msg_dropped(&message.topic);
}
RpcOut::Forward { message, .. } => {
metrics.forward_msg_dropped(&message.topic);
metrics.timeout_msg_dropped(&message.topic);
}
_ => {}
}
Expand Down
15 changes: 15 additions & 0 deletions protocols/gossipsub/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ pub(crate) struct Metrics {
publish_messages_dropped: Family<TopicHash, Counter>,
/// The number of forward messages dropped by the sender.
forward_messages_dropped: Family<TopicHash, Counter>,
/// The number of messages that timed out and could not be sent.
timedout_messages_dropped: Family<TopicHash, Counter>,

/* Metrics regarding mesh state */
/// Number of peers in our mesh. This metric should be updated with the count of peers for a
Expand Down Expand Up @@ -241,6 +243,11 @@ impl Metrics {
"Number of forward messages dropped per topic"
);

let timedout_messages_dropped = register_family!(
"timedout_messages_dropped_per_topic",
"Number of timedout messages dropped per topic"
);

let mesh_peer_counts = register_family!(
"mesh_peer_counts",
"Number of peers in each topic in our mesh"
Expand Down Expand Up @@ -347,6 +354,7 @@ impl Metrics {
rejected_messages,
publish_messages_dropped,
forward_messages_dropped,
timedout_messages_dropped,
mesh_peer_counts,
mesh_peer_inclusion_events,
mesh_peer_churn_events,
Expand Down Expand Up @@ -508,6 +516,13 @@ impl Metrics {
}
}

/// Register dropping a message that timedout over a topic.
pub(crate) fn timeout_msg_dropped(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.timedout_messages_dropped.get_or_create(topic).inc();
}
}

/// Register that a message was received (and was not a duplicate).
pub(crate) fn msg_recvd(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
Expand Down
9 changes: 3 additions & 6 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::rpc_proto::proto;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

/// The type of messages that have expired while attempting to send to a peer.
/// Messages that have expired while attempting to be sent to a peer.
#[derive(Clone, Debug, Default)]
pub struct FailedMessages {
/// The number of publish messages that failed to be published in a heartbeat.
Expand All @@ -44,14 +44,11 @@ pub struct FailedMessages {
pub priority: usize,
/// The number of messages that were failed to be sent to the non-priority queue as it was full.
pub non_priority: usize,
/// The number of messages that timed out and could not be sent.
pub timeout: usize,
}

impl FailedMessages {
/// The total number of messages that expired due a timeout.
pub fn total_timeout(&self) -> usize {
self.publish + self.forward
}

/// The total number of messages that failed due to the queue being full.
pub fn total_queue_full(&self) -> usize {
self.priority + self.non_priority
Expand Down

0 comments on commit e1c06db

Please sign in to comment.