diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5bdaaef88de..43677de0b83 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,7 +52,7 @@ jobs: run: cargo build --package "$CRATE" --no-default-features - name: Enforce no dependency on meta crate - if: env.CRATE != 'libp2p-server' + if: env.CRATE != 'libp2p-server' && env.CRATE != 'libp2p-perf' run: | cargo metadata --format-version=1 --no-deps | \ jq -e -r '.packages[] | select(.name == "'"$CRATE"'") | .dependencies | all(.name != "libp2p")' diff --git a/Cargo.lock b/Cargo.lock index d409a1edf2e..378ea5af2f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2814,19 +2814,20 @@ dependencies = [ [[package]] name = "libp2p-perf" -version = "0.2.0" +version = "0.3.0" dependencies = [ "anyhow", - "async-trait", "clap", "env_logger 0.10.0", "futures", + "futures-bounded", + "futures-timer", "instant", + "libp2p", "libp2p-core", "libp2p-dns", "libp2p-identity", "libp2p-quic", - "libp2p-request-response", "libp2p-swarm", "libp2p-swarm-test", "libp2p-tcp", diff --git a/Cargo.toml b/Cargo.toml index 2c6f214c5d4..8f487a4e06a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,7 @@ libp2p-metrics = { version = "0.14.0", path = "misc/metrics" } libp2p-mplex = { version = "0.41.0", path = "muxers/mplex" } libp2p-muxer-test-harness = { path = "muxers/test-harness" } libp2p-noise = { version = "0.44.0", path = "transports/noise" } -libp2p-perf = { version = "0.2.0", path = "protocols/perf" } +libp2p-perf = { version = "0.3.0", path = "protocols/perf" } libp2p-ping = { version = "0.44.0", path = "protocols/ping" } libp2p-plaintext = { version = "0.41.0", path = "transports/plaintext" } libp2p-pnet = { version = "0.24.0", path = "transports/pnet" } diff --git a/protocols/perf/CHANGELOG.md b/protocols/perf/CHANGELOG.md index e46a94e981a..6976a89887b 100644 --- a/protocols/perf/CHANGELOG.md +++ b/protocols/perf/CHANGELOG.md @@ -1,4 +1,10 @@ -## 0.2.0 +## 0.3.0 - unreleased + +- Continuously measure on single connection (iperf-style). + See https://github.com/libp2p/test-plans/issues/261 for high level overview. + See [PR 4382](https://github.com/libp2p/rust-libp2p/pull/4382). + +## 0.2.0 - Raise MSRV to 1.65. See [PR 3715]. diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml index ddf95a33867..03aef55d6f2 100644 --- a/protocols/perf/Cargo.toml +++ b/protocols/perf/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-perf" edition = "2021" rust-version = { workspace = true } description = "libp2p perf protocol implementation" -version = "0.2.0" +version = "0.3.0" authors = ["Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,25 +12,26 @@ categories = ["network-programming", "asynchronous"] [dependencies] anyhow = "1" -async-trait = "0.1" clap = { version = "4.4.6", features = ["derive"] } env_logger = "0.10.0" futures = "0.3.28" +futures-bounded = { workspace = true } +futures-timer = "3.0" instant = "0.1.12" +libp2p = { workspace = true, features = ["tokio", "tcp", "quic", "tls", "yamux", "dns"] } libp2p-core = { workspace = true } libp2p-dns = { workspace = true, features = ["tokio"] } libp2p-identity = { workspace = true, features = ["rand"] } -libp2p-tls = { workspace = true } libp2p-quic = { workspace = true, features = ["tokio"] } -libp2p-request-response = { workspace = true } libp2p-swarm = { workspace = true, features = ["macros", "tokio"] } libp2p-tcp = { workspace = true, features = ["tokio"] } +libp2p-tls = { workspace = true } libp2p-yamux = { workspace = true } log = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" -tokio = { version = "1.33.0", features = ["full"] } +tokio = { version = "1.33", default-features = false, features = ["macros", "rt", "rt-multi-thread"] } void = "1" [dev-dependencies] diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index 42c6a182a57..61371317ed2 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -22,16 +22,14 @@ use std::{net::SocketAddr, str::FromStr}; use anyhow::{bail, Result}; use clap::Parser; -use futures::FutureExt; -use futures::{future::Either, StreamExt}; +use futures::StreamExt; use instant::{Duration, Instant}; -use libp2p_core::{ - multiaddr::Protocol, muxing::StreamMuxerBox, transport::OrTransport, upgrade, Multiaddr, - Transport as _, -}; -use libp2p_identity::PeerId; -use libp2p_perf::{Run, RunDuration, RunParams}; -use libp2p_swarm::{Config, NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p::core::{multiaddr::Protocol, upgrade, Multiaddr}; +use libp2p::identity::PeerId; +use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p::SwarmBuilder; +use libp2p_perf::{client, server}; +use libp2p_perf::{Final, Intermediate, Run, RunParams, RunUpdate}; use log::{error, info}; use serde::{Deserialize, Serialize}; @@ -135,7 +133,7 @@ async fn server(server_address: SocketAddr) -> Result<()> { info!("Established connection to {:?} via {:?}", peer_id, endpoint); } SwarmEvent::ConnectionClosed { .. } => {} - SwarmEvent::Behaviour(()) => { + SwarmEvent::Behaviour(server::Event { .. }) => { info!("Finished run",) } e => panic!("{e:?}"), @@ -163,271 +161,70 @@ async fn client( .with(Protocol::Udp(server_address.port())) .with(Protocol::QuicV1), }; - - let benchmarks = if upload_bytes.is_some() { - vec![custom( - server_address, - RunParams { - to_send: upload_bytes.unwrap(), - to_receive: download_bytes.unwrap(), - }, - ) - .boxed()] - } else { - vec![ - latency(server_address.clone()).boxed(), - throughput(server_address.clone()).boxed(), - requests_per_second(server_address.clone()).boxed(), - sequential_connections_per_second(server_address.clone()).boxed(), - ] - }; - - tokio::spawn(async move { - for benchmark in benchmarks { - benchmark.await?; - } - - anyhow::Ok(()) - }) - .await??; - - Ok(()) -} - -async fn custom(server_address: Multiaddr, params: RunParams) -> Result<()> { - info!("start benchmark: custom"); - let mut swarm = swarm().await?; - - let start = Instant::now(); - - let server_peer_id = connect(&mut swarm, server_address.clone()).await?; - - perf(&mut swarm, server_peer_id, params).await?; - - #[derive(Serialize, Deserialize)] - #[serde(rename_all = "camelCase")] - struct CustomResult { - latency: f64, - } - - println!( - "{}", - serde_json::to_string(&CustomResult { - latency: start.elapsed().as_secs_f64(), - }) - .unwrap() - ); - - Ok(()) -} - -async fn latency(server_address: Multiaddr) -> Result<()> { - info!("start benchmark: round-trip-time latency"); - let mut swarm = swarm().await?; - - let server_peer_id = connect(&mut swarm, server_address.clone()).await?; - - let mut rounds = 0; - let start = Instant::now(); - let mut latencies = Vec::new(); - - loop { - if start.elapsed() > Duration::from_secs(30) { - break; - } - - let start = Instant::now(); - - perf( - &mut swarm, - server_peer_id, - RunParams { - to_send: 1, - to_receive: 1, - }, - ) - .await?; - - latencies.push(start.elapsed().as_secs_f64()); - rounds += 1; - } - - latencies.sort_by(|a, b| a.partial_cmp(b).unwrap()); - - info!( - "Finished: {rounds} pings in {:.4}s", - start.elapsed().as_secs_f64() - ); - info!("- {:.4} s median", percentile(&latencies, 0.50),); - info!("- {:.4} s 95th percentile\n", percentile(&latencies, 0.95),); - Ok(()) -} - -fn percentile(values: &[V], percentile: f64) -> V { - let n: usize = (values.len() as f64 * percentile).ceil() as usize - 1; - values[n] -} - -async fn throughput(server_address: Multiaddr) -> Result<()> { - info!("start benchmark: single connection single channel throughput"); - let mut swarm = swarm().await?; - - let server_peer_id = connect(&mut swarm, server_address.clone()).await?; - let params = RunParams { - to_send: 10 * 1024 * 1024, - to_receive: 10 * 1024 * 1024, + to_send: upload_bytes.unwrap(), + to_receive: download_bytes.unwrap(), }; - - perf(&mut swarm, server_peer_id, params).await?; - - Ok(()) -} - -async fn requests_per_second(server_address: Multiaddr) -> Result<()> { - info!("start benchmark: single connection parallel requests per second"); let mut swarm = swarm().await?; - let server_peer_id = connect(&mut swarm, server_address.clone()).await?; - - let num = 1_000; - let to_send = 1; - let to_receive = 1; - - for _ in 0..num { - swarm.behaviour_mut().perf( - server_peer_id, - RunParams { - to_send, - to_receive, - }, - )?; - } - - let mut finished = 0; - let start = Instant::now(); - - loop { - match swarm.next().await.unwrap() { - SwarmEvent::Behaviour(libp2p_perf::client::Event { - id: _, - result: Ok(_), - }) => { - finished += 1; - - if finished == num { - break; - } - } - e => panic!("{e:?}"), - } - } - - let duration = start.elapsed().as_secs_f64(); - let requests_per_second = num as f64 / duration; - - info!( - "Finished: sent {num} {to_send} bytes requests with {to_receive} bytes response each within {duration:.2} s", - ); - info!("- {requests_per_second:.2} req/s\n"); - - Ok(()) -} - -async fn sequential_connections_per_second(server_address: Multiaddr) -> Result<()> { - info!("start benchmark: sequential connections with single request per second"); - let mut rounds = 0; - let to_send = 1; - let to_receive = 1; - let start = Instant::now(); - - let mut latency_connection_establishment = Vec::new(); - let mut latency_connection_establishment_plus_request = Vec::new(); - - loop { - if start.elapsed() > Duration::from_secs(30) { - break; - } - - let mut swarm = swarm().await?; + tokio::spawn(async move { + info!("start benchmark: custom"); let start = Instant::now(); let server_peer_id = connect(&mut swarm, server_address.clone()).await?; - latency_connection_establishment.push(start.elapsed().as_secs_f64()); - - perf( - &mut swarm, - server_peer_id, - RunParams { - to_send, - to_receive, - }, - ) - .await?; - - latency_connection_establishment_plus_request.push(start.elapsed().as_secs_f64()); - rounds += 1; - } - - let duration = start.elapsed().as_secs_f64(); - - latency_connection_establishment.sort_by(|a, b| a.partial_cmp(b).unwrap()); - latency_connection_establishment_plus_request.sort_by(|a, b| a.partial_cmp(b).unwrap()); - - let connection_establishment_95th = percentile(&latency_connection_establishment, 0.95); - let connection_establishment_plus_request_95th = - percentile(&latency_connection_establishment_plus_request, 0.95); - - info!( - "Finished: established {rounds} connections with one {to_send} bytes request and one {to_receive} bytes response within {duration:.2} s", + perf(&mut swarm, server_peer_id, params).await?; + + println!( + "{}", + serde_json::to_string(&BenchmarkResult { + upload_bytes: params.to_send, + download_bytes: params.to_receive, + r#type: "final".to_string(), + time_seconds: start.elapsed().as_secs_f64(), + }) + .unwrap() ); - info!("- {connection_establishment_95th:.4} s 95th percentile connection establishment"); - info!("- {connection_establishment_plus_request_95th:.4} s 95th percentile connection establishment + one request"); + + anyhow::Ok(()) + }) + .await??; Ok(()) } -async fn swarm() -> Result> { - let local_key = libp2p_identity::Keypair::generate_ed25519(); - let local_peer_id = PeerId::from(local_key.public()); - - let transport = { - let tcp = libp2p_tcp::tokio::Transport::new(libp2p_tcp::Config::default().nodelay(true)) - .upgrade(upgrade::Version::V1Lazy) - .authenticate(libp2p_tls::Config::new(&local_key)?) - .multiplex(libp2p_yamux::Config::default()); - - let quic = { - let mut config = libp2p_quic::Config::new(&local_key); - config.support_draft_29 = true; - libp2p_quic::tokio::Transport::new(config) - }; - - let dns = libp2p_dns::tokio::Transport::system(OrTransport::new(quic, tcp))?; +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct BenchmarkResult { + r#type: String, + time_seconds: f64, + upload_bytes: usize, + download_bytes: usize, +} - dns.map(|either_output, _| match either_output { - Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), +async fn swarm() -> Result> { + let swarm = SwarmBuilder::with_new_identity() + .with_tokio() + .with_tcp( + libp2p_tcp::Config::default().nodelay(true), + libp2p_tls::Config::new, + libp2p_yamux::Config::default, + )? + .with_quic() + .with_dns()? + .with_behaviour(|_| B::default())? + .with_swarm_config(|cfg| { + cfg.with_substream_upgrade_protocol_override(upgrade::Version::V1Lazy) + .with_idle_connection_timeout(Duration::from_secs(60 * 5)) }) - .boxed() - }; - - let swarm = Swarm::new( - transport, - Default::default(), - local_peer_id, - Config::with_tokio_executor() - .with_substream_upgrade_protocol_override(upgrade::Version::V1Lazy) - .with_idle_connection_timeout(Duration::from_secs(60 * 5)), - ); + .build(); Ok(swarm) } async fn connect( - swarm: &mut Swarm, + swarm: &mut Swarm, server_address: Multiaddr, ) -> Result { let start = Instant::now(); @@ -450,21 +247,48 @@ async fn connect( } async fn perf( - swarm: &mut Swarm, + swarm: &mut Swarm, server_peer_id: PeerId, params: RunParams, -) -> Result { +) -> Result { swarm.behaviour_mut().perf(server_peer_id, params)?; - let duration = match swarm.next().await.unwrap() { - SwarmEvent::Behaviour(libp2p_perf::client::Event { - id: _, - result: Ok(duration), - }) => duration, - e => panic!("{e:?}"), + let duration = loop { + match swarm.next().await.unwrap() { + SwarmEvent::Behaviour(client::Event { + id: _, + result: Ok(RunUpdate::Intermediate(progressed)), + }) => { + info!("{progressed}"); + + let Intermediate { + duration, + sent, + received, + } = progressed; + + println!( + "{}", + serde_json::to_string(&BenchmarkResult { + r#type: "intermediate".to_string(), + time_seconds: duration.as_secs_f64(), + upload_bytes: sent, + download_bytes: received, + }) + .unwrap() + ); + } + SwarmEvent::Behaviour(client::Event { + id: _, + result: Ok(RunUpdate::Final(Final { duration })), + }) => break duration, + e => panic!("{e:?}"), + }; }; - info!("{}", Run { params, duration }); + let run = Run { params, duration }; + + info!("{run}"); - Ok(duration) + Ok(run) } diff --git a/protocols/perf/src/client.rs b/protocols/perf/src/client.rs index 52388820be4..c4614e979db 100644 --- a/protocols/perf/src/client.rs +++ b/protocols/perf/src/client.rs @@ -18,230 +18,32 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use instant::Duration; +mod behaviour; +mod handler; -use std::{ - collections::HashSet, - task::{Context, Poll}, -}; +use std::sync::atomic::{AtomicUsize, Ordering}; -use libp2p_core::Multiaddr; -use libp2p_identity::PeerId; -use libp2p_request_response as request_response; -use libp2p_swarm::{ - derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionId, FromSwarm, - NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm, -}; +pub use behaviour::{Behaviour, Event}; +use libp2p_swarm::StreamUpgradeError; +use void::Void; -use crate::{protocol::Response, RunDuration, RunParams}; +static NEXT_RUN_ID: AtomicUsize = AtomicUsize::new(1); /// Connection identifier. -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] -pub struct RunId(request_response::RequestId); +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct RunId(usize); -impl From for RunId { - fn from(value: request_response::RequestId) -> Self { - Self(value) - } -} - -#[derive(Debug)] -pub struct Event { - pub id: RunId, - pub result: Result, -} - -pub struct Behaviour { - connected: HashSet, - request_response: request_response::Behaviour, -} - -impl Default for Behaviour { - fn default() -> Self { - let mut req_resp_config = request_response::Config::default(); - req_resp_config.set_request_timeout(Duration::from_secs(60 * 5)); - Self { - connected: Default::default(), - request_response: request_response::Behaviour::new( - std::iter::once(( - crate::PROTOCOL_NAME, - request_response::ProtocolSupport::Outbound, - )), - req_resp_config, - ), - } - } -} - -impl Behaviour { - pub fn new() -> Self { - Self::default() - } - - pub fn perf(&mut self, server: PeerId, params: RunParams) -> Result { - if !self.connected.contains(&server) { - return Err(PerfError::NotConnected); - } - - let id = self.request_response.send_request(&server, params).into(); - - Ok(id) +impl RunId { + /// Returns the next available [`RunId`]. + pub(crate) fn next() -> Self { + Self(NEXT_RUN_ID.fetch_add(1, Ordering::SeqCst)) } } #[derive(thiserror::Error, Debug)] -pub enum PerfError { - #[error("Not connected to peer")] - NotConnected, -} - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = - as NetworkBehaviour>::ConnectionHandler; - type ToSwarm = Event; - - fn handle_pending_outbound_connection( - &mut self, - connection_id: ConnectionId, - maybe_peer: Option, - addresses: &[Multiaddr], - effective_role: libp2p_core::Endpoint, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.request_response.handle_pending_outbound_connection( - connection_id, - maybe_peer, - addresses, - effective_role, - ) - } - - fn handle_established_outbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - addr: &Multiaddr, - role_override: libp2p_core::Endpoint, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.request_response - .handle_established_outbound_connection(connection_id, peer, addr, role_override) - } - - fn handle_pending_inbound_connection( - &mut self, - connection_id: ConnectionId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result<(), libp2p_swarm::ConnectionDenied> { - self.request_response.handle_pending_inbound_connection( - connection_id, - local_addr, - remote_addr, - ) - } - - fn handle_established_inbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.request_response.handle_established_inbound_connection( - connection_id, - peer, - local_addr, - remote_addr, - ) - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { - self.connected.insert(peer_id); - } - FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id: _, - endpoint: _, - handler: _, - remaining_established, - }) => { - if remaining_established == 0 { - assert!(self.connected.remove(&peer_id)); - } - } - FromSwarm::AddressChange(_) - | FromSwarm::DialFailure(_) - | FromSwarm::ListenFailure(_) - | FromSwarm::NewListener(_) - | FromSwarm::NewListenAddr(_) - | FromSwarm::ExpiredListenAddr(_) - | FromSwarm::ListenerError(_) - | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddrCandidate(_) - | FromSwarm::ExternalAddrConfirmed(_) - | FromSwarm::ExternalAddrExpired(_) => {} - }; - - self.request_response.on_swarm_event(event); - } - - fn on_connection_handler_event( - &mut self, - peer_id: PeerId, - connection_id: ConnectionId, - event: THandlerOutEvent, - ) { - self.request_response - .on_connection_handler_event(peer_id, connection_id, event); - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll>> { - self.request_response.poll(cx).map(|to_swarm| { - to_swarm.map_out(|m| match m { - request_response::Event::Message { - peer: _, - message: - request_response::Message::Response { - request_id, - response: Response::Receiver(run_duration), - }, - } => Event { - id: request_id.into(), - result: Ok(run_duration), - }, - request_response::Event::Message { - peer: _, - message: - request_response::Message::Response { - response: Response::Sender(_), - .. - }, - } => unreachable!(), - request_response::Event::Message { - peer: _, - message: request_response::Message::Request { .. }, - } => { - unreachable!() - } - request_response::Event::OutboundFailure { - peer: _, - request_id, - error, - } => Event { - id: request_id.into(), - result: Err(error), - }, - request_response::Event::InboundFailure { - peer: _, - request_id: _, - error: _, - } => unreachable!(), - request_response::Event::ResponseSent { .. } => unreachable!(), - }) - }) - } +pub enum RunError { + #[error(transparent)] + Upgrade(#[from] StreamUpgradeError), + #[error("Failed to execute perf run: {0}")] + Io(#[from] std::io::Error), } diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs new file mode 100644 index 00000000000..0647ce81130 --- /dev/null +++ b/protocols/perf/src/client/behaviour.rs @@ -0,0 +1,159 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! [`NetworkBehaviour`] of the libp2p perf client protocol. + +use std::{ + collections::{HashSet, VecDeque}, + task::{Context, Poll}, +}; + +use libp2p_core::Multiaddr; +use libp2p_identity::PeerId; +use libp2p_swarm::{ + derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionId, FromSwarm, + NetworkBehaviour, NotifyHandler, THandlerInEvent, THandlerOutEvent, ToSwarm, +}; + +use crate::RunParams; +use crate::{client::handler::Handler, RunUpdate}; + +use super::{RunError, RunId}; + +#[derive(Debug)] +pub struct Event { + pub id: RunId, + pub result: Result, +} + +#[derive(Default)] +pub struct Behaviour { + /// Queue of actions to return when polled. + queued_events: VecDeque>>, + /// Set of connected peers. + connected: HashSet, +} + +impl Behaviour { + pub fn new() -> Self { + Self::default() + } + + pub fn perf(&mut self, server: PeerId, params: RunParams) -> Result { + if !self.connected.contains(&server) { + return Err(NotConnected {}); + } + + let id = RunId::next(); + + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: server, + handler: NotifyHandler::Any, + event: crate::client::handler::Command { id, params }, + }); + + Ok(id) + } +} + +#[derive(thiserror::Error, Debug)] +pub struct NotConnected(); + +impl std::fmt::Display for NotConnected { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "not connected to peer") + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Handler; + type ToSwarm = Event; + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _addr: &Multiaddr, + _role_override: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(Handler::default()) + } + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(Handler::default()) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { + self.connected.insert(peer_id); + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id: _, + endpoint: _, + handler: _, + remaining_established, + }) => { + if remaining_established == 0 { + assert!(self.connected.remove(&peer_id)); + } + } + FromSwarm::AddressChange(_) + | FromSwarm::DialFailure(_) + | FromSwarm::ListenFailure(_) + | FromSwarm::NewListener(_) + | FromSwarm::NewListenAddr(_) + | FromSwarm::ExpiredListenAddr(_) + | FromSwarm::ListenerError(_) + | FromSwarm::ListenerClosed(_) + | FromSwarm::NewExternalAddrCandidate(_) + | FromSwarm::ExternalAddrExpired(_) + | FromSwarm::ExternalAddrConfirmed(_) => {} + } + } + + fn on_connection_handler_event( + &mut self, + _event_source: PeerId, + _connection_id: ConnectionId, + super::handler::Event { id, result }: THandlerOutEvent, + ) { + self.queued_events + .push_back(ToSwarm::GenerateEvent(Event { id, result })); + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll>> { + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); + } + + Poll::Pending + } +} diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs new file mode 100644 index 00000000000..a9bb0c7d483 --- /dev/null +++ b/protocols/perf/src/client/handler.rs @@ -0,0 +1,180 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{ + collections::VecDeque, + task::{Context, Poll}, +}; + +use futures::{ + stream::{BoxStream, SelectAll}, + StreamExt, +}; +use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; +use libp2p_swarm::{ + handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ListenUpgradeError, + }, + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol, +}; +use void::Void; + +use crate::client::{RunError, RunId}; +use crate::{RunParams, RunUpdate}; + +#[derive(Debug)] +pub struct Command { + pub(crate) id: RunId, + pub(crate) params: RunParams, +} + +#[derive(Debug)] +pub struct Event { + pub(crate) id: RunId, + pub(crate) result: Result, +} + +pub struct Handler { + /// Queue of events to return when polled. + queued_events: VecDeque< + ConnectionHandlerEvent< + ::OutboundProtocol, + ::OutboundOpenInfo, + ::ToBehaviour, + ::Error, + >, + >, + + requested_streams: VecDeque, + + outbound: SelectAll)>>, +} + +impl Handler { + pub fn new() -> Self { + Self { + queued_events: Default::default(), + requested_streams: Default::default(), + outbound: Default::default(), + } + } +} + +impl Default for Handler { + fn default() -> Self { + Self::new() + } +} + +impl ConnectionHandler for Handler { + type FromBehaviour = Command; + type ToBehaviour = Event; + type Error = Void; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = ReadyUpgrade; + type OutboundOpenInfo = (); + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn on_behaviour_event(&mut self, command: Self::FromBehaviour) { + self.requested_streams.push_back(command); + self.queued_events + .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ()), + }) + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol, .. + }) => void::unreachable(protocol), + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol, + info: (), + }) => { + let Command { id, params } = self + .requested_streams + .pop_front() + .expect("opened a stream without a pending command"); + self.outbound.push( + crate::protocol::send_receive(params, protocol) + .map(move |result| (id, result)) + .boxed(), + ); + } + + ConnectionEvent::AddressChange(_) + | ConnectionEvent::LocalProtocolsChange(_) + | ConnectionEvent::RemoteProtocolsChange(_) => {} + ConnectionEvent::DialUpgradeError(DialUpgradeError { info: (), error }) => { + let Command { id, .. } = self + .requested_streams + .pop_front() + .expect("requested stream without pending command"); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour(Event { + id, + result: Err(error.into()), + })); + } + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => { + void::unreachable(error) + } + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::ToBehaviour, + Self::Error, + >, + > { + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); + } + + if let Poll::Ready(Some((id, result))) = self.outbound.poll_next_unpin(cx) { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { + id, + result: result.map_err(Into::into), + })); + } + + Poll::Pending + } +} diff --git a/protocols/perf/src/lib.rs b/protocols/perf/src/lib.rs index b2b12244341..f9db96aa9d9 100644 --- a/protocols/perf/src/lib.rs +++ b/protocols/perf/src/lib.rs @@ -34,6 +34,46 @@ mod protocol; pub mod server; pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/perf/1.0.0"); +const RUN_TIMEOUT: Duration = Duration::from_secs(5 * 60); +const MAX_PARALLEL_RUNS_PER_CONNECTION: usize = 1_000; + +#[derive(Debug, Clone, Copy)] +pub enum RunUpdate { + Intermediate(Intermediate), + Final(Final), +} + +#[derive(Debug, Clone, Copy)] +pub struct Intermediate { + pub duration: Duration, + pub sent: usize, + pub received: usize, +} + +impl Display for Intermediate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Intermediate { + duration, + sent, + received, + } = self; + write!( + f, + "{:4} s uploaded {} downloaded {} ({})", + duration.as_secs_f64(), + format_bytes(*sent), + format_bytes(*received), + format_bandwidth(*duration, sent + received), + )?; + + Ok(()) + } +} + +#[derive(Debug, Clone, Copy)] +pub struct Final { + pub duration: RunDuration, +} /// Parameters for a single run, i.e. one stream, sending and receiving data. /// @@ -52,48 +92,49 @@ pub struct RunDuration { pub download: Duration, } +#[derive(Debug, Clone, Copy)] pub struct Run { pub params: RunParams, pub duration: RunDuration, } +const KILO: f64 = 1024.0; +const MEGA: f64 = KILO * 1024.0; +const GIGA: f64 = MEGA * 1024.0; + +fn format_bytes(bytes: usize) -> String { + let bytes = bytes as f64; + if bytes >= GIGA { + format!("{:.2} GiB", bytes / GIGA) + } else if bytes >= MEGA { + format!("{:.2} MiB", bytes / MEGA) + } else if bytes >= KILO { + format!("{:.2} KiB", bytes / KILO) + } else { + format!("{} B", bytes) + } +} + +fn format_bandwidth(duration: Duration, bytes: usize) -> String { + const KILO: f64 = 1024.0; + const MEGA: f64 = KILO * 1024.0; + const GIGA: f64 = MEGA * 1024.0; + + let bandwidth = (bytes as f64 * 8.0) / duration.as_secs_f64(); + + if bandwidth >= GIGA { + format!("{:.2} Gbit/s", bandwidth / GIGA) + } else if bandwidth >= MEGA { + format!("{:.2} Mbit/s", bandwidth / MEGA) + } else if bandwidth >= KILO { + format!("{:.2} Kbit/s", bandwidth / KILO) + } else { + format!("{:.2} bit/s", bandwidth) + } +} + impl Display for Run { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - const KILO: f64 = 1024.0; - const MEGA: f64 = KILO * 1024.0; - const GIGA: f64 = MEGA * 1024.0; - - fn format_bytes(bytes: usize) -> String { - let bytes = bytes as f64; - if bytes >= GIGA { - format!("{:.2} GiB", bytes / GIGA) - } else if bytes >= MEGA { - format!("{:.2} MiB", bytes / MEGA) - } else if bytes >= KILO { - format!("{:.2} KiB", bytes / KILO) - } else { - format!("{} B", bytes) - } - } - - fn format_bandwidth(duration: Duration, bytes: usize) -> String { - const KILO: f64 = 1024.0; - const MEGA: f64 = KILO * 1024.0; - const GIGA: f64 = MEGA * 1024.0; - - let bandwidth = (bytes as f64 * 8.0) / duration.as_secs_f64(); - - if bandwidth >= GIGA { - format!("{:.2} Gbit/s", bandwidth / GIGA) - } else if bandwidth >= MEGA { - format!("{:.2} Mbit/s", bandwidth / MEGA) - } else if bandwidth >= KILO { - format!("{:.2} Kbit/s", bandwidth / KILO) - } else { - format!("{:.2} bit/s", bandwidth) - } - } - let Run { params: RunParams { to_send, diff --git a/protocols/perf/src/protocol.rs b/protocols/perf/src/protocol.rs index 4b454fb88b7..d2d65b42303 100644 --- a/protocols/perf/src/protocol.rs +++ b/protocols/perf/src/protocol.rs @@ -18,184 +18,184 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use async_trait::async_trait; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures_timer::Delay; use instant::Instant; -use libp2p_request_response as request_response; -use libp2p_swarm::StreamProtocol; -use std::io; - -use crate::{RunDuration, RunParams}; - -const BUF: [u8; 65536] = [0; 64 << 10]; - -#[derive(Debug)] -pub enum Response { - Sender(usize), - Receiver(RunDuration), -} - -#[derive(Default)] -pub struct Codec { - to_receive: Option, - - write_start: Option, - read_start: Option, - read_done: Option, +use std::time::Duration; + +use futures::{ + future::{select, Either}, + AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt, Stream, StreamExt, +}; + +use crate::{Final, Intermediate, Run, RunDuration, RunParams, RunUpdate}; + +const BUF: [u8; 1024] = [0; 1024]; +const REPORT_INTERVAL: Duration = Duration::from_secs(1); + +pub(crate) fn send_receive( + params: RunParams, + stream: S, +) -> impl Stream> { + // Use a channel to simulate a generator. `send_receive_inner` can `yield` events through the + // channel. + let (sender, receiver) = futures::channel::mpsc::channel(0); + let receiver = receiver.fuse(); + let inner = send_receive_inner(params, stream, sender).fuse(); + + futures::stream::select( + receiver.map(|progressed| Ok(RunUpdate::Intermediate(progressed))), + inner + .map(|finished| finished.map(RunUpdate::Final)) + .into_stream(), + ) } -impl Clone for Codec { - fn clone(&self) -> Self { - Default::default() +async fn send_receive_inner( + params: RunParams, + mut stream: S, + mut progress: futures::channel::mpsc::Sender, +) -> Result { + let mut delay = Delay::new(REPORT_INTERVAL); + + let RunParams { + to_send, + to_receive, + } = params; + + let mut receive_buf = vec![0; 1024]; + let to_receive_bytes = (to_receive as u64).to_be_bytes(); + stream.write_all(&to_receive_bytes).await?; + + let write_start = Instant::now(); + let mut intermittant_start = Instant::now(); + let mut sent = 0; + let mut intermittent_sent = 0; + + while sent < to_send { + let n = std::cmp::min(to_send - sent, BUF.len()); + let buf = &BUF[..n]; + + let mut write = stream.write(buf); + sent += loop { + match select(&mut delay, &mut write).await { + Either::Left((_, _)) => { + delay.reset(REPORT_INTERVAL); + progress + .send(Intermediate { + duration: intermittant_start.elapsed(), + sent: sent - intermittent_sent, + received: 0, + }) + .await + .expect("receiver not to be dropped"); + intermittant_start = Instant::now(); + intermittent_sent = sent; + } + Either::Right((n, _)) => break n?, + } + } } -} -#[async_trait] -impl request_response::Codec for Codec { - /// The type of protocol(s) or protocol versions being negotiated. - type Protocol = StreamProtocol; - /// The type of inbound and outbound requests. - type Request = RunParams; - /// The type of inbound and outbound responses. - type Response = Response; - - /// Reads a request from the given I/O stream according to the - /// negotiated protocol. - async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut receive_buf = vec![0; 64 << 10]; - - let to_send = { - let mut buf = [0; 8]; - io.read_exact(&mut buf).await?; - - u64::from_be_bytes(buf) as usize - }; - - let mut received = 0; - loop { - let n = io.read(&mut receive_buf).await?; - received += n; - if n == 0 { - break; + loop { + match select(&mut delay, stream.close()).await { + Either::Left((_, _)) => { + delay.reset(REPORT_INTERVAL); + progress + .send(Intermediate { + duration: intermittant_start.elapsed(), + sent: sent - intermittent_sent, + received: 0, + }) + .await + .expect("receiver not to be dropped"); + intermittant_start = Instant::now(); + intermittent_sent = sent; } + Either::Right((Ok(_), _)) => break, + Either::Right((Err(e), _)) => return Err(e), } - - Ok(RunParams { - to_receive: received, - to_send, - }) } - /// Reads a response from the given I/O stream according to the - /// negotiated protocol. - async fn read_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - assert!(self.write_start.is_some()); - assert_eq!(self.read_start, None); - assert_eq!(self.read_done, None); - - self.read_start = Some(Instant::now()); - - let mut receive_buf = vec![0; 64 << 10]; - - let mut received = 0; - loop { - let n = io.read(&mut receive_buf).await?; - received += n; - // Make sure to wait for the remote to close the stream. Otherwise with `to_receive` of `0` - // one does not measure the full round-trip of the previous write. - if n == 0 { - break; + let write_done = Instant::now(); + let mut received = 0; + let mut intermittend_received = 0; + + while received < to_receive { + let mut read = stream.read(&mut receive_buf); + received += loop { + match select(&mut delay, &mut read).await { + Either::Left((_, _)) => { + delay.reset(REPORT_INTERVAL); + progress + .send(Intermediate { + duration: intermittant_start.elapsed(), + sent: sent - intermittent_sent, + received: received - intermittend_received, + }) + .await + .expect("receiver not to be dropped"); + intermittant_start = Instant::now(); + intermittent_sent = sent; + intermittend_received = received; + } + Either::Right((n, _)) => break n?, } } + } - self.read_done = Some(Instant::now()); + let read_done = Instant::now(); - assert_eq!(received, self.to_receive.unwrap()); + Ok(Final { + duration: RunDuration { + upload: write_done.duration_since(write_start), + download: read_done.duration_since(write_done), + }, + }) +} - Ok(Response::Receiver(RunDuration { - upload: self - .read_start - .unwrap() - .duration_since(self.write_start.unwrap()), - download: self - .read_done - .unwrap() - .duration_since(self.read_start.unwrap()), - })) +pub(crate) async fn receive_send( + mut stream: S, +) -> Result { + let to_send = { + let mut buf = [0; 8]; + stream.read_exact(&mut buf).await?; + + u64::from_be_bytes(buf) as usize + }; + + let read_start = Instant::now(); + + let mut receive_buf = vec![0; 1024]; + let mut received = 0; + loop { + let n = stream.read(&mut receive_buf).await?; + received += n; + if n == 0 { + break; + } } - /// Writes a request to the given I/O stream according to the - /// negotiated protocol. - async fn write_request( - &mut self, - _: &Self::Protocol, - io: &mut T, - req: Self::Request, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - assert_eq!(self.to_receive, None); - assert_eq!(self.write_start, None); - assert_eq!(self.read_start, None); - assert_eq!(self.read_done, None); - - self.write_start = Some(Instant::now()); - - let RunParams { - to_send, - to_receive, - } = req; - - self.to_receive = Some(to_receive); - - io.write_all(&(to_receive as u64).to_be_bytes()).await?; - - let mut sent = 0; - while sent < to_send { - let n = std::cmp::min(to_send - sent, BUF.len()); - let buf = &BUF[..n]; - - sent += io.write(buf).await?; - } + let read_done = Instant::now(); - Ok(()) + let mut sent = 0; + while sent < to_send { + let n = std::cmp::min(to_send - sent, BUF.len()); + let buf = &BUF[..n]; + + sent += stream.write(buf).await?; } - /// Writes a response to the given I/O stream according to the - /// negotiated protocol. - async fn write_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - response: Self::Response, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - let to_send = match response { - Response::Sender(to_send) => to_send, - Response::Receiver(_) => unreachable!(), - }; - - let mut sent = 0; - while sent < to_send { - let n = std::cmp::min(to_send - sent, BUF.len()); - let buf = &BUF[..n]; - - sent += io.write(buf).await?; - } + stream.close().await?; + let write_done = Instant::now(); - Ok(()) - } + Ok(Run { + params: RunParams { + to_send: sent, + to_receive: received, + }, + duration: RunDuration { + upload: write_done.duration_since(read_done), + download: read_done.duration_since(read_start), + }, + }) } diff --git a/protocols/perf/src/server.rs b/protocols/perf/src/server.rs index 60fa252de1a..9671b43878b 100644 --- a/protocols/perf/src/server.rs +++ b/protocols/perf/src/server.rs @@ -18,147 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::task::{Context, Poll}; +mod behaviour; +mod handler; -use instant::Duration; -use libp2p_core::Multiaddr; -use libp2p_identity::PeerId; -use libp2p_request_response as request_response; -use libp2p_swarm::{ - ConnectionId, FromSwarm, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm, -}; - -use crate::protocol::Response; - -pub struct Behaviour { - request_response: request_response::Behaviour, -} - -impl Default for Behaviour { - fn default() -> Self { - let mut req_resp_config = request_response::Config::default(); - req_resp_config.set_request_timeout(Duration::from_secs(60 * 5)); - - Self { - request_response: request_response::Behaviour::new( - std::iter::once(( - crate::PROTOCOL_NAME, - request_response::ProtocolSupport::Inbound, - )), - req_resp_config, - ), - } - } -} - -impl Behaviour { - pub fn new() -> Self { - Self::default() - } -} - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = - as NetworkBehaviour>::ConnectionHandler; - type ToSwarm = (); - - fn handle_pending_outbound_connection( - &mut self, - connection_id: ConnectionId, - maybe_peer: Option, - addresses: &[Multiaddr], - effective_role: libp2p_core::Endpoint, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.request_response.handle_pending_outbound_connection( - connection_id, - maybe_peer, - addresses, - effective_role, - ) - } - - fn handle_established_outbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - addr: &Multiaddr, - role_override: libp2p_core::Endpoint, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.request_response - .handle_established_outbound_connection(connection_id, peer, addr, role_override) - } - - fn handle_pending_inbound_connection( - &mut self, - connection_id: ConnectionId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result<(), libp2p_swarm::ConnectionDenied> { - self.request_response.handle_pending_inbound_connection( - connection_id, - local_addr, - remote_addr, - ) - } - - fn handle_established_inbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result, libp2p_swarm::ConnectionDenied> { - self.request_response.handle_established_inbound_connection( - connection_id, - peer, - local_addr, - remote_addr, - ) - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - self.request_response.on_swarm_event(event); - } - - fn on_connection_handler_event( - &mut self, - peer_id: PeerId, - connection_id: ConnectionId, - event: THandlerOutEvent, - ) { - self.request_response - .on_connection_handler_event(peer_id, connection_id, event); - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll>> { - self.request_response.poll(cx).map(|to_swarm| { - to_swarm.map_out(|m| match m { - request_response::Event::Message { - peer: _, - message: request_response::Message::Response { .. }, - } => { - unreachable!() - } - request_response::Event::Message { - peer: _, - message: - request_response::Message::Request { - request_id: _, - request, - channel, - }, - } => { - let _ = self - .request_response - .send_response(channel, Response::Sender(request.to_send)); - } - request_response::Event::OutboundFailure { .. } => unreachable!(), - request_response::Event::InboundFailure { .. } => {} - request_response::Event::ResponseSent { .. } => {} - }) - }) - } -} +pub use behaviour::{Behaviour, Event}; diff --git a/protocols/perf/src/server/behaviour.rs b/protocols/perf/src/server/behaviour.rs new file mode 100644 index 00000000000..e1d4c817d0a --- /dev/null +++ b/protocols/perf/src/server/behaviour.rs @@ -0,0 +1,118 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! [`NetworkBehaviour`] of the libp2p perf server protocol. + +use std::{ + collections::VecDeque, + task::{Context, Poll}, +}; + +use libp2p_identity::PeerId; +use libp2p_swarm::{ + ConnectionId, FromSwarm, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm, +}; + +use crate::server::handler::Handler; +use crate::Run; + +#[derive(Debug)] +pub struct Event { + pub remote_peer_id: PeerId, + pub stats: Run, +} + +#[derive(Default)] +pub struct Behaviour { + /// Queue of actions to return when polled. + queued_events: VecDeque>>, +} + +impl Behaviour { + pub fn new() -> Self { + Self::default() + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Handler; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _local_addr: &libp2p_core::Multiaddr, + _remote_addr: &libp2p_core::Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(Handler::default()) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _addr: &libp2p_core::Multiaddr, + _role_override: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(Handler::default()) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(_) => {} + FromSwarm::ConnectionClosed(_) => {} + FromSwarm::AddressChange(_) => {} + FromSwarm::DialFailure(_) => {} + FromSwarm::ListenFailure(_) => {} + FromSwarm::NewListener(_) => {} + FromSwarm::NewListenAddr(_) => {} + FromSwarm::ExpiredListenAddr(_) => {} + FromSwarm::ListenerError(_) => {} + FromSwarm::ListenerClosed(_) => {} + FromSwarm::NewExternalAddrCandidate(_) => {} + FromSwarm::ExternalAddrExpired(_) => {} + FromSwarm::ExternalAddrConfirmed(_) => {} + } + } + + fn on_connection_handler_event( + &mut self, + event_source: PeerId, + _connection_id: ConnectionId, + super::handler::Event { stats }: THandlerOutEvent, + ) { + self.queued_events.push_back(ToSwarm::GenerateEvent(Event { + remote_peer_id: event_source, + stats, + })) + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll>> { + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); + } + + Poll::Pending + } +} diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs new file mode 100644 index 00000000000..4e739995b67 --- /dev/null +++ b/protocols/perf/src/server/handler.rs @@ -0,0 +1,148 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::task::{Context, Poll}; + +use futures::FutureExt; +use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; +use libp2p_swarm::{ + handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ListenUpgradeError, + }, + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol, +}; +use log::error; +use void::Void; + +use crate::Run; + +#[derive(Debug)] +pub struct Event { + pub stats: Run, +} + +pub struct Handler { + inbound: futures_bounded::FuturesSet>, +} + +impl Handler { + pub fn new() -> Self { + Self { + inbound: futures_bounded::FuturesSet::new( + crate::RUN_TIMEOUT, + crate::MAX_PARALLEL_RUNS_PER_CONNECTION, + ), + } + } +} + +impl Default for Handler { + fn default() -> Self { + Self::new() + } +} + +impl ConnectionHandler for Handler { + type FromBehaviour = Void; + type ToBehaviour = Event; + type Error = Void; + type InboundProtocol = ReadyUpgrade; + type OutboundProtocol = DeniedUpgrade; + type OutboundOpenInfo = Void; + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ()) + } + + fn on_behaviour_event(&mut self, v: Self::FromBehaviour) { + void::unreachable(v) + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol, + info: _, + }) => { + if self + .inbound + .try_push(crate::protocol::receive_send(protocol).boxed()) + .is_err() + { + log::warn!("Dropping inbound stream because we are at capacity"); + } + } + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { info, .. }) => { + void::unreachable(info) + } + + ConnectionEvent::DialUpgradeError(DialUpgradeError { info, .. }) => { + void::unreachable(info) + } + ConnectionEvent::AddressChange(_) + | ConnectionEvent::LocalProtocolsChange(_) + | ConnectionEvent::RemoteProtocolsChange(_) => {} + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => { + void::unreachable(error) + } + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::ToBehaviour, + Self::Error, + >, + > { + loop { + match self.inbound.poll_unpin(cx) { + Poll::Ready(Ok(Ok(stats))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { stats })) + } + Poll::Ready(Ok(Err(e))) => { + error!("{e:?}"); + continue; + } + Poll::Ready(Err(e @ futures_bounded::Timeout { .. })) => { + error!("inbound perf request timed out: {e}"); + continue; + } + Poll::Pending => {} + } + + return Poll::Pending; + } + } +} diff --git a/protocols/perf/tests/lib.rs b/protocols/perf/tests/lib.rs index af5bc2c35a2..a79e8dd36b3 100644 --- a/protocols/perf/tests/lib.rs +++ b/protocols/perf/tests/lib.rs @@ -18,7 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use libp2p_perf::{client, server, RunParams}; +use libp2p_perf::{ + client::{self}, + server, RunParams, +}; use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; @@ -33,7 +36,7 @@ async fn perf() { server.listen().await; client.connect(&mut server).await; - tokio::spawn(server.loop_on_next()); + tokio::task::spawn(server.loop_on_next()); client .behaviour_mut()