From 003c010b23e40b9afe08b9dea0de864301da6c80 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Mon, 1 Apr 2024 14:03:31 +0200 Subject: [PATCH 1/5] Add metrics module Has a single trait and type to support metric collection and extraction for types in the crate. For now I've put it behind a metrics feature as it does require additional CPU cycles to collect, even if it's not used. --- rt/Cargo.toml | 2 + rt/src/lib.rs | 2 + rt/src/metrics.rs | 129 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 133 insertions(+) create mode 100644 rt/src/metrics.rs diff --git a/rt/Cargo.toml b/rt/Cargo.toml index 28f64550..2a560bdc 100644 --- a/rt/Cargo.toml +++ b/rt/Cargo.toml @@ -15,6 +15,8 @@ edition = "2021" [features] default = [] +# Enable metric collection. +metrics = [] # Feature that enables the `test` module. test = ["heph/test"] diff --git a/rt/src/lib.rs b/rt/src/lib.rs index 1ab23a7d..d4cebed5 100644 --- a/rt/src/lib.rs +++ b/rt/src/lib.rs @@ -252,6 +252,8 @@ pub mod fs; pub mod io; mod local; pub mod log; +#[cfg(feature = "metrics")] +pub mod metrics; pub mod net; pub mod pipe; mod process; diff --git a/rt/src/metrics.rs b/rt/src/metrics.rs new file mode 100644 index 00000000..973f894b --- /dev/null +++ b/rt/src/metrics.rs @@ -0,0 +1,129 @@ +//! Metric collection. +//! +//! Various types support metric collection. What metrics are collected and how +//! they look is different per type. To see what types support metric collection +//! see the [implementors of Metrics]. +//! +//! This module has two main exports. +//! +//! First, the [`Metrics`] trait. It allows you to access the collected metrics. +//! It returns an iterator that iterates over the metrics in metric name-value +//! pairs. +//! +//! Second, the [`Metric`] type, which is the container type for all metrics. +//! +//! [implementors of Metrics]: Metrics#implementors + +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// Access to metrics. +pub trait Metrics { + /// Container type for the metrics that are collected. The container must be + /// an iterator that iterates over the metric name-value pairs. + type Metrics: Iterator; + + /// Get all the metrics from this type. + fn metrics(&self) -> Self::Metrics; + + /// Get the metric with `name` from this type, if any + fn metric(&self, name: &str) -> Option { + self.metrics() + .find_map(|(n, metric)| (n == name).then_some(metric)) + } +} + +/// Single metric. +/// +/// Type that can hold different types of metrics. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum Metric { + /// Simple counter. + /// + /// A counter represent a single monotonic value, which means the value can + /// only be incremented, not decremented. Only after a restart may it be + /// reset to zero. + /// + /// Examples of counters are the amount of bytes send or received on a + /// connection. + Counter(usize), +} + +/// Atomic counter, see [`Metric::Counter`]. +#[derive(Debug)] +pub(crate) struct AtomicCounter(AtomicUsize); + +impl AtomicCounter { + /// Create a new counter starting at zero. + pub(crate) const fn new() -> AtomicCounter { + AtomicCounter(AtomicUsize::new(0)) + } + + /// Add `n` to the counter. + pub(crate) fn add(&self, n: usize) { + let _ = self.0.fetch_add(n, Ordering::AcqRel); + } + + /// Get the current value of the counter. + pub(crate) fn get(&self) -> usize { + self.0.load(Ordering::Acquire) + } +} + +impl From<&AtomicCounter> for Metric { + fn from(counter: &AtomicCounter) -> Metric { + Metric::Counter(counter.get()) + } +} + +/// Macro to create metrics structure for a given type. +macro_rules! create_metric { + ( + $vis: vis struct $name: ident for $for_ty: ident $( < $( $generic: ident )+ > )? { + // `$field_doc` documents the metric as field doc and in the structure docs, max ~1 line. + // `$field` is used as name for `Metrics` implementation. + // `$field_ty` must support `Metric::from(&$value)`. + // `$metric_ty` must be a variant of `Metric`. + $( + $(#[ $field_doc: meta ])+ + $field: ident : $field_ty: ident -> $metric_ty: ident, + )+ + } + ) => { + #[doc = concat!("Metrics for [`", stringify!($for_ty), "`].")] + #[derive(Debug)] + $vis struct $name { + $( + $(#[ $field_doc ])+ + $vis $field: $crate::metrics::$field_ty, + )* + } + + impl $name { + /// Create empty metrics. + const fn empty() -> $name { + $name { + $( $field: $crate::metrics::$field_ty::new() ),* + } + } + } + + /// Collects the following metrics: + $( + #[doc = concat!(" * `", stringify!($field), "`: ")] + $(#[ $field_doc ])+ + #[doc = concat!("Type [`", stringify!($metric_ty), "`](crate::metrics::Metric::", stringify!($metric_ty), ").")] + )* + impl$( < $( $generic )+ > )? $crate::metrics::Metrics for $for_ty$( < $( $generic )+ > )? { + type Metrics = impl Iterator + std::iter::ExactSizeIterator + std::iter::FusedIterator; + + fn metrics(&self) -> Self::Metrics { + std::iter::IntoIterator::into_iter([ + $( (stringify!($field), crate::metrics::Metric::from(&self.metrics.$field)) ),* + ]) + } + } + }; +} + +pub(crate) use create_metric; From 3b3441ad2b1e5737cdb23f6bffd31e2e4bff985d Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Mon, 1 Apr 2024 14:14:54 +0200 Subject: [PATCH 2/5] Add TcpStream::new and make fd field private And use it everywhere. --- rt/src/net/tcp/listener.rs | 4 ++-- rt/src/net/tcp/stream.rs | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/rt/src/net/tcp/listener.rs b/rt/src/net/tcp/listener.rs index 67211ca2..4c705582 100644 --- a/rt/src/net/tcp/listener.rs +++ b/rt/src/net/tcp/listener.rs @@ -195,7 +195,7 @@ impl TcpListener { pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { NoRing(self.fd.accept::()) .await - .map(|(fd, addr)| (TcpStream { fd }, addr.into())) + .map(|(fd, addr)| (TcpStream::new(fd), addr.into())) } /// Returns a stream of incoming [`TcpStream`]s. @@ -244,7 +244,7 @@ impl<'a> AsyncIterator for Incoming<'a> { // SAFETY: not moving the `Future`. unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) } .poll_next(ctx) - .map_ok(|fd| TcpStream { fd }) + .map_ok(|fd| TcpStream::new(fd)) } } diff --git a/rt/src/net/tcp/stream.rs b/rt/src/net/tcp/stream.rs index 67e3116e..0aacceb7 100644 --- a/rt/src/net/tcp/stream.rs +++ b/rt/src/net/tcp/stream.rs @@ -43,10 +43,14 @@ use crate::wakers::NoRing; /// ``` #[derive(Debug)] pub struct TcpStream { - pub(in crate::net) fd: AsyncFd, + fd: AsyncFd, } impl TcpStream { + pub(crate) const fn new(fd: AsyncFd) -> TcpStream { + TcpStream { fd } + } + /// Create a new TCP stream and issues a non-blocking connect to the /// specified `address`. pub async fn connect(rt: &RT, address: SocketAddr) -> io::Result @@ -61,7 +65,7 @@ impl TcpStream { 0, )) .await?; - let socket = TcpStream { fd }; + let socket = TcpStream::new(fd); socket.set_auto_cpu_affinity(rt); NoRing(socket.fd.connect(SockAddr::from(address))).await?; Ok(socket) @@ -74,17 +78,13 @@ impl TcpStream { where RT: Access, { - TcpStream { - fd: AsyncFd::new(stream.into(), rt.submission_queue()), - } + TcpStream::new(AsyncFd::new(stream.into(), rt.submission_queue())) } /// Creates a new independently owned `TcpStream` that shares the same /// underlying file descriptor as the existing `TcpStream`. pub fn try_clone(&self) -> io::Result { - Ok(TcpStream { - fd: self.fd.try_clone()?, - }) + Ok(TcpStream::new(self.fd.try_clone()?)) } /// Automatically set the CPU affinity based on the runtime access `rt`. From 2d9a0025e125f036f83d4bf52d7a814f0a44c4a1 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Mon, 1 Apr 2024 14:38:11 +0200 Subject: [PATCH 3/5] Add metrics to TcpListener It collects only one metric: number of accepted connections. --- rt/src/net/tcp/listener.rs | 66 ++++++++++++++++++++++------- rt/tests/functional/tcp/listener.rs | 22 ++++++++++ 2 files changed, 73 insertions(+), 15 deletions(-) diff --git a/rt/src/net/tcp/listener.rs b/rt/src/net/tcp/listener.rs index 4c705582..1b9e72d9 100644 --- a/rt/src/net/tcp/listener.rs +++ b/rt/src/net/tcp/listener.rs @@ -11,6 +11,8 @@ use a10::AsyncFd; use socket2::{Domain, Protocol, SockRef, Socket, Type}; use crate::access::Access; +#[cfg(feature = "metrics")] +use crate::metrics::create_metric; use crate::net::{convert_address, SockAddr, TcpStream}; use crate::wakers::NoRing; @@ -98,9 +100,19 @@ use crate::wakers::NoRing; /// ``` pub struct TcpListener { fd: AsyncFd, + #[cfg(feature = "metrics")] + metrics: Metrics, } impl TcpListener { + const fn new(fd: AsyncFd) -> TcpListener { + TcpListener { + fd, + #[cfg(feature = "metrics")] + metrics: Metrics::empty(), + } + } + /// Creates a new `TcpListener` which will be bound to the specified /// `address`. pub async fn bind(rt: &RT, address: SocketAddr) -> io::Result @@ -128,7 +140,7 @@ impl TcpListener { )) .await?; - let socket = TcpListener { fd }; + let socket = TcpListener::new(fd); socket.with_ref(|socket| { #[cfg(target_os = "linux")] @@ -155,17 +167,17 @@ impl TcpListener { where RT: Access, { - TcpListener { - fd: AsyncFd::new(listener.into(), rt.submission_queue()), - } + TcpListener::new(AsyncFd::new(listener.into(), rt.submission_queue())) } /// Creates a new independently owned `TcpListener` that shares the same /// underlying file descriptor as the existing `TcpListener`. + /// + /// # Notes + /// + /// The metrics are reset for the cloned listener and are **not** shared. pub fn try_clone(&self) -> io::Result { - Ok(TcpListener { - fd: self.fd.try_clone()?, - }) + Ok(TcpListener::new(self.fd.try_clone()?)) } /// Returns the local socket address of this listener. @@ -195,7 +207,11 @@ impl TcpListener { pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { NoRing(self.fd.accept::()) .await - .map(|(fd, addr)| (TcpStream::new(fd), addr.into())) + .map(|(fd, addr)| { + #[cfg(feature = "metrics")] + self.metrics.accepted.add(1); + (TcpStream::new(fd), addr.into()) + }) } /// Returns a stream of incoming [`TcpStream`]s. @@ -212,7 +228,11 @@ impl TcpListener { /// use [`TcpStream::set_auto_cpu_affinity`]. #[allow(clippy::doc_markdown)] // For "io_uring". pub const fn incoming(&self) -> Incoming<'_> { - Incoming(self.fd.multishot_accept()) + Incoming { + accept: self.fd.multishot_accept(), + #[cfg(feature = "metrics")] + metrics: &self.metrics, + } } /// Get the value of the `SO_ERROR` option on this socket. @@ -235,16 +255,24 @@ impl TcpListener { /// The [`AsyncIterator`] behind [`TcpListener::incoming`]. #[derive(Debug)] #[must_use = "AsyncIterators do nothing unless polled"] -pub struct Incoming<'a>(a10::net::MultishotAccept<'a>); +pub struct Incoming<'a> { + accept: a10::net::MultishotAccept<'a>, + #[cfg(feature = "metrics")] + metrics: &'a Metrics, +} impl<'a> AsyncIterator for Incoming<'a> { type Item = io::Result; - fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll> { - // SAFETY: not moving the `Future`. - unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) } - .poll_next(ctx) - .map_ok(|fd| TcpStream::new(fd)) + fn poll_next( + mut self: Pin<&mut Self>, + ctx: &mut task::Context<'_>, + ) -> Poll> { + Pin::new(&mut self.accept).poll_next(ctx).map_ok(|fd| { + #[cfg(feature = "metrics")] + self.metrics.accepted.add(1); + TcpStream::new(fd) + }) } } @@ -259,3 +287,11 @@ impl fmt::Debug for TcpListener { self.fd.fmt(f) } } + +#[cfg(feature = "metrics")] +create_metric! { + pub(crate) struct Metrics for TcpListener { + /// Number of connections accepted. + accepted: AtomicCounter -> Counter, + } +} diff --git a/rt/tests/functional/tcp/listener.rs b/rt/tests/functional/tcp/listener.rs index 050cd11d..61706840 100644 --- a/rt/tests/functional/tcp/listener.rs +++ b/rt/tests/functional/tcp/listener.rs @@ -118,6 +118,7 @@ fn accept() { let (stream, remote_address) = listener.accept().await.unwrap(); assert!(remote_address.ip().is_loopback()); + assert_metrics(&listener, 1); let buf = Vec::with_capacity(DATA.len() + 1); let buf = stream.recv(buf).await.unwrap(); @@ -151,6 +152,7 @@ fn incoming() { let mut incoming = listener.incoming(); let stream = next(&mut incoming).await.unwrap().unwrap(); + assert_metrics(&listener, 1); let buf = Vec::with_capacity(DATA.len() + 1); let buf = stream.recv(buf).await.unwrap(); @@ -168,3 +170,23 @@ fn incoming() { join_many(&[stream_ref, listener_ref], Duration::from_secs(1)).unwrap(); } + +#[track_caller] +#[cfg(feature = "metrics")] +fn assert_metrics(stream: &TcpListener, expected_accepted: usize) { + use heph_rt::metrics::Metrics; + for (name, value) in stream.metrics() { + match (name, value) { + ("accepted", heph_rt::metrics::Metric::Counter(got)) => { + assert_eq!(got, expected_accepted) + } + (name, value) => panic!("unknown metric: {:?}, value: {:?}", name, value), + } + } +} + +#[track_caller] +#[cfg(not(feature = "metrics"))] +fn assert_metrics(stream: &TcpListener, expected_accepted: usize) { + _ = (stream, expected_accepted); +} From d5ce11e1f0eec1c392e13c358ab33480dcac7671 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Mon, 1 Apr 2024 14:51:25 +0200 Subject: [PATCH 4/5] Log metrics for TCP server on USR2 signal When a USR2 signal is received, same signal the runtime uses, it will log the metrics that a TcpListener collects, namely the number of accepted connections. --- rt/src/net/tcp/listener.rs | 2 +- rt/src/net/tcp/server.rs | 39 +++++++++++++++++++++++++++++++++----- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/rt/src/net/tcp/listener.rs b/rt/src/net/tcp/listener.rs index 1b9e72d9..dea51682 100644 --- a/rt/src/net/tcp/listener.rs +++ b/rt/src/net/tcp/listener.rs @@ -101,7 +101,7 @@ use crate::wakers::NoRing; pub struct TcpListener { fd: AsyncFd, #[cfg(feature = "metrics")] - metrics: Metrics, + pub(crate) metrics: Metrics, } impl TcpListener { diff --git a/rt/src/net/tcp/server.rs b/rt/src/net/tcp/server.rs index ff6fefc5..9c95823e 100644 --- a/rt/src/net/tcp/server.rs +++ b/rt/src/net/tcp/server.rs @@ -233,6 +233,8 @@ use log::{debug, trace}; use socket2::{Domain, Protocol, Socket, Type}; use crate::access::Access; +#[cfg(feature = "metrics")] +use crate::net::tcp::listener::Metrics; use crate::net::{TcpListener, TcpStream}; use crate::spawn::{ActorOptions, Spawn}; use crate::util::{either, next}; @@ -425,10 +427,23 @@ where debug!("no more connections to accept in TCP server, stopping"); return Ok(()); } - Err(Ok(_)) => { + Err(Ok(Message { + inner: MessageKind::Shutdown, + })) => { debug!("TCP server received shutdown message, stopping"); return Ok(()); } + #[cfg(feature = "metrics")] + Err(Ok(Message { + inner: MessageKind::LogMetrics, + })) => { + let Metrics { accepted } = &listener.metrics; + log::info!( + target: "metrics", + connections_accepted = accepted.get(); + "TCP server metrics", + ); + } Err(Err(NoMessages)) => { debug!("All actor references to TCP server dropped, stopping"); return Ok(()); @@ -443,13 +458,21 @@ where /// [`TryFrom`]`<`[`Signal`]`>` for the message, allowing for graceful shutdown. #[derive(Debug)] pub struct Message { - // Allow for future expansion. - _inner: (), + inner: MessageKind, +} + +#[derive(Debug)] +enum MessageKind { + Shutdown, + #[cfg(feature = "metrics")] + LogMetrics, } impl From for Message { fn from(_: Terminate) -> Message { - Message { _inner: () } + Message { + inner: MessageKind::Shutdown, + } } } @@ -460,7 +483,13 @@ impl TryFrom for Message { /// [`Signal::Quit`], fails for all other signals (by returning `Err(())`). fn try_from(signal: Signal) -> Result { match signal { - Signal::Interrupt | Signal::Terminate | Signal::Quit => Ok(Message { _inner: () }), + Signal::Interrupt | Signal::Terminate | Signal::Quit => Ok(Message { + inner: MessageKind::Shutdown, + }), + #[cfg(feature = "metrics")] + Signal::User2 => Ok(Message { + inner: MessageKind::LogMetrics, + }), _ => Err(()), } } From 6e93e0d9be4801fc57c4231324ee74c554bff5d1 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Mon, 1 Apr 2024 15:08:33 +0200 Subject: [PATCH 5/5] [WIP] Add metrics to TcpStream Need to actually start collecting them. --- rt/src/net/tcp/stream.rs | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/rt/src/net/tcp/stream.rs b/rt/src/net/tcp/stream.rs index 0aacceb7..a2cb2219 100644 --- a/rt/src/net/tcp/stream.rs +++ b/rt/src/net/tcp/stream.rs @@ -9,6 +9,8 @@ use socket2::{Domain, Protocol, SockRef, Type}; use crate::access::Access; use crate::io::{impl_read, impl_write, Buf, BufMut, BufMutSlice, BufSlice, BufWrapper}; +#[cfg(feature = "metrics")] +use crate::metrics::create_metric; use crate::net::{ convert_address, Recv, RecvN, RecvNVectored, RecvVectored, Send, SendAll, SendAllVectored, SendVectored, SockAddr, @@ -44,11 +46,17 @@ use crate::wakers::NoRing; #[derive(Debug)] pub struct TcpStream { fd: AsyncFd, + #[cfg(feature = "metrics")] + metrics: Metrics, } impl TcpStream { pub(crate) const fn new(fd: AsyncFd) -> TcpStream { - TcpStream { fd } + TcpStream { + fd, + #[cfg(feature = "metrics")] + metrics: Metrics::empty(), + } } /// Create a new TCP stream and issues a non-blocking connect to the @@ -83,6 +91,10 @@ impl TcpStream { /// Creates a new independently owned `TcpStream` that shares the same /// underlying file descriptor as the existing `TcpStream`. + /// + /// # Notes + /// + /// The metrics are reset for the cloned stream and are **not** shared. pub fn try_clone(&self) -> io::Result { Ok(TcpStream::new(self.fd.try_clone()?)) } @@ -390,3 +402,13 @@ impl AsFd for TcpStream { self.fd.as_fd() } } + +#[cfg(feature = "metrics")] +create_metric! { + pub(crate) struct Metrics for TcpStream { + /// Number of bytes send. + send: AtomicCounter -> Counter, + /// Number of bytes received. + received: AtomicCounter -> Counter, + } +}