Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics #607

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ edition = "2021"

[features]
default = []
# Enable metric collection.
metrics = []

# Feature that enables the `test` module.
test = ["heph/test"]
Expand Down
2 changes: 2 additions & 0 deletions rt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ pub mod fs;
pub mod io;
mod local;
pub mod log;
#[cfg(feature = "metrics")]
pub mod metrics;
pub mod net;
pub mod pipe;
mod process;
Expand Down
129 changes: 129 additions & 0 deletions rt/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//! Metric collection.
//!
//! Various types support metric collection. What metrics are collected and how
//! they look is different per type. To see what types support metric collection
//! see the [implementors of Metrics].
//!
//! This module has two main exports.
//!
//! First, the [`Metrics`] trait. It allows you to access the collected metrics.
//! It returns an iterator that iterates over the metrics in metric name-value
//! pairs.
//!
//! Second, the [`Metric`] type, which is the container type for all metrics.
//!
//! [implementors of Metrics]: Metrics#implementors

use std::sync::atomic::{AtomicUsize, Ordering};

/// Access to metrics.
pub trait Metrics {
/// Container type for the metrics that are collected. The container must be
/// an iterator that iterates over the metric name-value pairs.
type Metrics: Iterator<Item = (&'static str, Metric)>;

/// Get all the metrics from this type.
fn metrics(&self) -> Self::Metrics;

/// Get the metric with `name` from this type, if any
fn metric(&self, name: &str) -> Option<Metric> {
self.metrics()
.find_map(|(n, metric)| (n == name).then_some(metric))
}
}

/// Single metric.
///
/// Type that can hold different types of metrics.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum Metric {
/// Simple counter.
///
/// A counter represent a single monotonic value, which means the value can
/// only be incremented, not decremented. Only after a restart may it be
/// reset to zero.
///
/// Examples of counters are the amount of bytes send or received on a
/// connection.
Counter(usize),
}

/// Atomic counter, see [`Metric::Counter`].
#[derive(Debug)]
pub(crate) struct AtomicCounter(AtomicUsize);

impl AtomicCounter {
/// Create a new counter starting at zero.
pub(crate) const fn new() -> AtomicCounter {
AtomicCounter(AtomicUsize::new(0))
}

/// Add `n` to the counter.
pub(crate) fn add(&self, n: usize) {
let _ = self.0.fetch_add(n, Ordering::AcqRel);
}

/// Get the current value of the counter.
pub(crate) fn get(&self) -> usize {
self.0.load(Ordering::Acquire)
}
}

impl From<&AtomicCounter> for Metric {
fn from(counter: &AtomicCounter) -> Metric {
Metric::Counter(counter.get())
}
}

/// Macro to create metrics structure for a given type.
macro_rules! create_metric {
(
$vis: vis struct $name: ident for $for_ty: ident $( < $( $generic: ident )+ > )? {
// `$field_doc` documents the metric as field doc and in the structure docs, max ~1 line.
// `$field` is used as name for `Metrics` implementation.
// `$field_ty` must support `Metric::from(&$value)`.
// `$metric_ty` must be a variant of `Metric`.
$(
$(#[ $field_doc: meta ])+
$field: ident : $field_ty: ident -> $metric_ty: ident,
)+
}
) => {
#[doc = concat!("Metrics for [`", stringify!($for_ty), "`].")]
#[derive(Debug)]
$vis struct $name {
$(
$(#[ $field_doc ])+
$vis $field: $crate::metrics::$field_ty,
)*
}

impl $name {
/// Create empty metrics.
const fn empty() -> $name {
$name {
$( $field: $crate::metrics::$field_ty::new() ),*
}
}
}

/// Collects the following metrics:
$(
#[doc = concat!(" * `", stringify!($field), "`: ")]
$(#[ $field_doc ])+
#[doc = concat!("Type [`", stringify!($metric_ty), "`](crate::metrics::Metric::", stringify!($metric_ty), ").")]
)*
impl$( < $( $generic )+ > )? $crate::metrics::Metrics for $for_ty$( < $( $generic )+ > )? {
type Metrics = impl Iterator<Item = (&'static str, crate::metrics::Metric)> + std::iter::ExactSizeIterator + std::iter::FusedIterator;

fn metrics(&self) -> Self::Metrics {
std::iter::IntoIterator::into_iter([
$( (stringify!($field), crate::metrics::Metric::from(&self.metrics.$field)) ),*
])
}
}
};
}

pub(crate) use create_metric;
66 changes: 51 additions & 15 deletions rt/src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use a10::AsyncFd;
use socket2::{Domain, Protocol, SockRef, Socket, Type};

use crate::access::Access;
#[cfg(feature = "metrics")]
use crate::metrics::create_metric;
use crate::net::{convert_address, SockAddr, TcpStream};
use crate::wakers::NoRing;

Expand Down Expand Up @@ -98,9 +100,19 @@ use crate::wakers::NoRing;
/// ```
pub struct TcpListener {
fd: AsyncFd,
#[cfg(feature = "metrics")]
pub(crate) metrics: Metrics,
}

impl TcpListener {
const fn new(fd: AsyncFd) -> TcpListener {
TcpListener {
fd,
#[cfg(feature = "metrics")]
metrics: Metrics::empty(),
}
}

/// Creates a new `TcpListener` which will be bound to the specified
/// `address`.
pub async fn bind<RT>(rt: &RT, address: SocketAddr) -> io::Result<TcpListener>
Expand Down Expand Up @@ -128,7 +140,7 @@ impl TcpListener {
))
.await?;

let socket = TcpListener { fd };
let socket = TcpListener::new(fd);

socket.with_ref(|socket| {
#[cfg(target_os = "linux")]
Expand All @@ -155,17 +167,17 @@ impl TcpListener {
where
RT: Access,
{
TcpListener {
fd: AsyncFd::new(listener.into(), rt.submission_queue()),
}
TcpListener::new(AsyncFd::new(listener.into(), rt.submission_queue()))
}

/// Creates a new independently owned `TcpListener` that shares the same
/// underlying file descriptor as the existing `TcpListener`.
///
/// # Notes
///
/// The metrics are reset for the cloned listener and are **not** shared.
pub fn try_clone(&self) -> io::Result<TcpListener> {
Ok(TcpListener {
fd: self.fd.try_clone()?,
})
Ok(TcpListener::new(self.fd.try_clone()?))
}

/// Returns the local socket address of this listener.
Expand Down Expand Up @@ -195,7 +207,11 @@ impl TcpListener {
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
NoRing(self.fd.accept::<SockAddr>())
.await
.map(|(fd, addr)| (TcpStream { fd }, addr.into()))
.map(|(fd, addr)| {
#[cfg(feature = "metrics")]
self.metrics.accepted.add(1);
(TcpStream::new(fd), addr.into())
})
}

/// Returns a stream of incoming [`TcpStream`]s.
Expand All @@ -212,7 +228,11 @@ impl TcpListener {
/// use [`TcpStream::set_auto_cpu_affinity`].
#[allow(clippy::doc_markdown)] // For "io_uring".
pub const fn incoming(&self) -> Incoming<'_> {
Incoming(self.fd.multishot_accept())
Incoming {
accept: self.fd.multishot_accept(),
#[cfg(feature = "metrics")]
metrics: &self.metrics,
}
}

/// Get the value of the `SO_ERROR` option on this socket.
Expand All @@ -235,16 +255,24 @@ impl TcpListener {
/// The [`AsyncIterator`] behind [`TcpListener::incoming`].
#[derive(Debug)]
#[must_use = "AsyncIterators do nothing unless polled"]
pub struct Incoming<'a>(a10::net::MultishotAccept<'a>);
pub struct Incoming<'a> {
accept: a10::net::MultishotAccept<'a>,
#[cfg(feature = "metrics")]
metrics: &'a Metrics,
}

impl<'a> AsyncIterator for Incoming<'a> {
type Item = io::Result<TcpStream>;

fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
// SAFETY: not moving the `Future`.
unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) }
.poll_next(ctx)
.map_ok(|fd| TcpStream { fd })
fn poll_next(
mut self: Pin<&mut Self>,
ctx: &mut task::Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.accept).poll_next(ctx).map_ok(|fd| {
#[cfg(feature = "metrics")]
self.metrics.accepted.add(1);
TcpStream::new(fd)
})
}
}

Expand All @@ -259,3 +287,11 @@ impl fmt::Debug for TcpListener {
self.fd.fmt(f)
}
}

#[cfg(feature = "metrics")]
create_metric! {
pub(crate) struct Metrics for TcpListener {
/// Number of connections accepted.
accepted: AtomicCounter -> Counter,
}
}
39 changes: 34 additions & 5 deletions rt/src/net/tcp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ use log::{debug, trace};
use socket2::{Domain, Protocol, Socket, Type};

use crate::access::Access;
#[cfg(feature = "metrics")]
use crate::net::tcp::listener::Metrics;
use crate::net::{TcpListener, TcpStream};
use crate::spawn::{ActorOptions, Spawn};
use crate::util::{either, next};
Expand Down Expand Up @@ -425,10 +427,23 @@ where
debug!("no more connections to accept in TCP server, stopping");
return Ok(());
}
Err(Ok(_)) => {
Err(Ok(Message {
inner: MessageKind::Shutdown,
})) => {
debug!("TCP server received shutdown message, stopping");
return Ok(());
}
#[cfg(feature = "metrics")]
Err(Ok(Message {
inner: MessageKind::LogMetrics,
})) => {
let Metrics { accepted } = &listener.metrics;
log::info!(
target: "metrics",
connections_accepted = accepted.get();
"TCP server metrics",
);
}
Err(Err(NoMessages)) => {
debug!("All actor references to TCP server dropped, stopping");
return Ok(());
Expand All @@ -443,13 +458,21 @@ where
/// [`TryFrom`]`<`[`Signal`]`>` for the message, allowing for graceful shutdown.
#[derive(Debug)]
pub struct Message {
// Allow for future expansion.
_inner: (),
inner: MessageKind,
}

#[derive(Debug)]
enum MessageKind {
Shutdown,
#[cfg(feature = "metrics")]
LogMetrics,
}

impl From<Terminate> for Message {
fn from(_: Terminate) -> Message {
Message { _inner: () }
Message {
inner: MessageKind::Shutdown,
}
}
}

Expand All @@ -460,7 +483,13 @@ impl TryFrom<Signal> for Message {
/// [`Signal::Quit`], fails for all other signals (by returning `Err(())`).
fn try_from(signal: Signal) -> Result<Self, Self::Error> {
match signal {
Signal::Interrupt | Signal::Terminate | Signal::Quit => Ok(Message { _inner: () }),
Signal::Interrupt | Signal::Terminate | Signal::Quit => Ok(Message {
inner: MessageKind::Shutdown,
}),
#[cfg(feature = "metrics")]
Signal::User2 => Ok(Message {
inner: MessageKind::LogMetrics,
}),
_ => Err(()),
}
}
Expand Down
Loading
Loading