Skip to content

Commit

Permalink
Queue metrics (#558)
Browse files Browse the repository at this point in the history
* Add metrics for queue lenghts

* Add changelog
  • Loading branch information
AgeManning authored Dec 6, 2023
1 parent 8a350f5 commit 107effb
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 0 deletions.
2 changes: 2 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
## 0.46.1 - unreleased
- Adds metrics for priority and non-priority queue lengths.

- Implement publish and forward message dropping.

- Implement backpressure by diferentiating between priority and non priority messages.
Expand Down
10 changes: 10 additions & 0 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2042,6 +2042,16 @@ where
tracing::debug!("Starting heartbeat");
let start = Instant::now();

// Every heartbeat we sample the send queues to add to our metrics. We do this intentionally
// before we add all the gossip from this heartbeat in order to gain a true measure of
// steady-state size of the queues.
if let Some(m) = &mut self.metrics {
for sender_queue in self.handler_send_queues.values() {
m.observe_priority_queue_size(sender_queue.priority_len());
m.observe_non_priority_queue_size(sender_queue.non_priority_len());
}
}

self.heartbeat_ticks += 1;

let mut to_graft = HashMap::new();
Expand Down
31 changes: 31 additions & 0 deletions protocols/gossipsub/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ pub(crate) struct Metrics {
/// The number of times we have decided that an IWANT control message is required for this
/// topic. A very high metric might indicate an underperforming network.
topic_iwant_msgs: Family<TopicHash, Counter>,

/// The size of the priority queue.
priority_queue_size: Histogram,
/// The size of the non-priority queue.
non_priority_queue_size: Histogram,
}

impl Metrics {
Expand Down Expand Up @@ -316,6 +321,20 @@ impl Metrics {
metric
};

let priority_queue_size = Histogram::new(linear_buckets(0.0, 2500.0, 100));
registry.register(
"priority_queue_size",
"Histogram of observed priority queue sizes",
priority_queue_size.clone(),
);

let non_priority_queue_size = Histogram::new(linear_buckets(0.0, 2500.0, 100));
registry.register(
"non_priority_queue_size",
"Histogram of observed non-priority queue sizes",
non_priority_queue_size.clone(),
);

Self {
max_topics,
max_never_subscribed_topics,
Expand Down Expand Up @@ -343,6 +362,8 @@ impl Metrics {
heartbeat_duration,
memcache_misses,
topic_iwant_msgs,
priority_queue_size,
non_priority_queue_size,
}
}

Expand Down Expand Up @@ -532,6 +553,16 @@ impl Metrics {
self.heartbeat_duration.observe(millis as f64);
}

/// Observes a priority queue size.
pub(crate) fn observe_priority_queue_size(&mut self, len: usize) {
self.priority_queue_size.observe(len as f64);
}

/// Observes a non-priority queue size.
pub(crate) fn observe_non_priority_queue_size(&mut self, len: usize) {
self.non_priority_queue_size.observe(len as f64);
}

/// Observe a score of a mesh peer.
pub(crate) fn observe_mesh_peers_score(&mut self, topic: &TopicHash, score: f64) {
if self.register_topic(topic).is_ok() {
Expand Down
10 changes: 10 additions & 0 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,16 @@ impl RpcSender {
m.msg_sent(&message.topic, message.raw_protobuf_len());
}
}

/// Returns the current size of the priority queue.
pub(crate) fn priority_len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}

/// Returns the current size of the non-priority queue.
pub(crate) fn non_priority_len(&self) -> usize {
self.non_priority.len()
}
}

/// `RpcOut` sender that is priority aware.
Expand Down

0 comments on commit 107effb

Please sign in to comment.