From 762f636bdaeea94a92cfd48749a2fdc0fb7ecb16 Mon Sep 17 00:00:00 2001 From: Jakub Doka Date: Tue, 9 Apr 2024 19:56:23 +0200 Subject: [PATCH] adjusting the load --- swarm/benches/connection_handler-s.rs | 282 ++++++++++++++++++++++++++ swarm/benches/connection_handler.rs | 65 ++++-- 2 files changed, 326 insertions(+), 21 deletions(-) create mode 100644 swarm/benches/connection_handler-s.rs diff --git a/swarm/benches/connection_handler-s.rs b/swarm/benches/connection_handler-s.rs new file mode 100644 index 00000000000..ee467cbfa8d --- /dev/null +++ b/swarm/benches/connection_handler-s.rs @@ -0,0 +1,282 @@ +use async_std::stream::StreamExt; +use criterion::{criterion_group, criterion_main, Criterion}; +use instant::Duration; +use libp2p_core::{ + transport::MemoryTransport, InboundUpgrade, Multiaddr, OutboundUpgrade, Transport, UpgradeInfo, +}; +use libp2p_identity::PeerId; +use libp2p_swarm::{ConnectionHandler, NetworkBehaviour, StreamProtocol}; +use std::convert::Infallible; + +criterion_main!(one_behavior_many_protocols); + +macro_rules! benchmark_one_behaviour_many_protocols { + ($( + $name:ident, + )*) => { + $( + #[tokio::main(flavor = "multi_thread", worker_threads = 1)] + async fn $name(c: &mut Criterion) { + let protocol_count = parse_counts(stringify!($name)); + bench_run_one_behaviour_many_protocols(c, protocol_count, 10000); + } + )* + + criterion_group!( + one_behavior_many_protocols, + $( + $name, + )* + ); + }; +} + +benchmark_one_behaviour_many_protocols! { + one_behavior_many_protocols_10, + one_behavior_many_protocols_100, + one_behavior_many_protocols_1000, +} + +fn parse_counts(name: &str) -> usize { + name.split('_').next_back().unwrap().parse().unwrap() +} + +fn new_swarm(protocol_count: usize, spam_count: usize) -> libp2p_swarm::Swarm { + // we leak to simulate static protocols witch is the common case + let protocols = (0..protocol_count) + .map(|i| StreamProtocol::new(format!("/protocol/{i}").leak())) + .collect::>() + .leak(); + + let swarm_a_keypair = libp2p_identity::Keypair::generate_ed25519(); + libp2p_swarm::Swarm::new( + MemoryTransport::new() + .upgrade(multistream_select::Version::V1) + .authenticate(libp2p_plaintext::Config::new(&swarm_a_keypair)) + .multiplex(libp2p_yamux::Config::default()) + .boxed(), + PollerBehaviour { + spam_count, + protocols, + restarting: false, + finished: false, + other_peer: None, + }, + swarm_a_keypair.public().to_peer_id(), + libp2p_swarm::Config::with_tokio_executor().with_idle_connection_timeout(Duration::MAX), + ) +} + +fn bench_run_one_behaviour_many_protocols( + c: &mut Criterion, + protocol_count: usize, + spam_count: usize, +) { + let mut sa = new_swarm(protocol_count, spam_count); + let mut sb = new_swarm(protocol_count, 0); + + sa.behaviour_mut().other_peer = Some(*sb.local_peer_id()); + sb.behaviour_mut().other_peer = Some(*sa.local_peer_id()); + + static mut OFFSET: usize = 0; + let offset = unsafe { + OFFSET += 1; + OFFSET + }; + + sa.listen_on(format!("/memory/{offset}").parse().unwrap()) + .unwrap(); + sb.dial(format!("/memory/{offset}").parse::().unwrap()) + .unwrap(); + + c.bench_function( + &format!("one_behavior_many_protocols_{protocol_count}_{spam_count}"), + |b| { + sa.behaviour_mut().finished = false; + sb.behaviour_mut().finished = false; + b.iter(|| { + futures::executor::block_on(async { + while !sa.behaviour().finished || !sb.behaviour().finished { + futures::future::select(sb.next(), sa.next()).await; + } + }); + }); + }, + ); +} + +struct PollerBehaviour { + spam_count: usize, + protocols: &'static [StreamProtocol], + restarting: bool, + finished: bool, + other_peer: Option, +} + +#[derive(Debug)] +struct FinishedSpamming; + +impl NetworkBehaviour for PollerBehaviour { + type ConnectionHandler = PollerHandler; + type ToSwarm = FinishedSpamming; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + _peer: libp2p_identity::PeerId, + _local_addr: &libp2p_core::Multiaddr, + _remote_addr: &libp2p_core::Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(PollerHandler { + spam_count: 0, + protocols: self.protocols, + }) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + _peer: libp2p_identity::PeerId, + _addr: &libp2p_core::Multiaddr, + _role_override: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(PollerHandler { + spam_count: self.spam_count, + protocols: self.protocols, + }) + } + + fn on_swarm_event(&mut self, _: libp2p_swarm::FromSwarm) {} + + fn on_connection_handler_event( + &mut self, + _peer_id: libp2p_identity::PeerId, + _connection_id: libp2p_swarm::ConnectionId, + _event: libp2p_swarm::THandlerOutEvent, + ) { + self.finished = true; + } + + fn poll( + &mut self, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll>> + { + if self.finished && !self.restarting { + self.restarting = true; + std::task::Poll::Ready(libp2p_swarm::ToSwarm::GenerateEvent(FinishedSpamming)) + } else if !self.finished && self.restarting { + self.restarting = false; + std::task::Poll::Ready(libp2p_swarm::ToSwarm::NotifyHandler { + peer_id: self.other_peer.unwrap(), + handler: libp2p_swarm::NotifyHandler::Any, + event: Restart(self.spam_count), + }) + } else { + std::task::Poll::Pending + } + } +} + +#[derive(Default)] +struct PollerHandler { + spam_count: usize, + protocols: &'static [StreamProtocol], +} + +#[derive(Debug)] +struct Restart(usize); + +impl ConnectionHandler for PollerHandler { + type FromBehaviour = Restart; + + type ToBehaviour = FinishedSpamming; + + type InboundProtocol = Upgrade; + + type OutboundProtocol = Upgrade; + + type InboundOpenInfo = (); + + type OutboundOpenInfo = (); + + fn listen_protocol( + &self, + ) -> libp2p_swarm::SubstreamProtocol { + libp2p_swarm::SubstreamProtocol::new(Upgrade(self.protocols), ()) + } + + fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll< + libp2p_swarm::ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::ToBehaviour, + >, + > { + if self.spam_count == usize::MAX { + return std::task::Poll::Pending; + } + + if self.spam_count != 0 { + self.spam_count -= 1; + cx.waker().wake_by_ref(); + return std::task::Poll::Pending; + } + + self.spam_count = usize::MAX; + std::task::Poll::Ready(libp2p_swarm::ConnectionHandlerEvent::NotifyBehaviour( + FinishedSpamming, + )) + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + match event { + Restart(spam_count) => self.spam_count = spam_count, + } + } + + fn on_connection_event( + &mut self, + _event: libp2p_swarm::handler::ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + } +} + +pub struct Upgrade(&'static [StreamProtocol]); + +impl UpgradeInfo for Upgrade { + type Info = &'static StreamProtocol; + type InfoIter = std::slice::Iter<'static, StreamProtocol>; + + fn protocol_info(&self) -> Self::InfoIter { + self.0.iter() + } +} + +impl OutboundUpgrade for Upgrade { + type Output = libp2p_swarm::Stream; + type Error = Infallible; + type Future = futures::future::Ready>; + + fn upgrade_outbound(self, s: libp2p_swarm::Stream, _: Self::Info) -> Self::Future { + futures::future::ready(Ok(s)) + } +} + +impl InboundUpgrade for Upgrade { + type Output = libp2p_swarm::Stream; + type Error = Infallible; + type Future = futures::future::Ready>; + + fn upgrade_inbound(self, s: libp2p_swarm::Stream, _: Self::Info) -> Self::Future { + futures::future::ready(Ok(s)) + } +} diff --git a/swarm/benches/connection_handler.rs b/swarm/benches/connection_handler.rs index ac13fa09cfe..ee467cbfa8d 100644 --- a/swarm/benches/connection_handler.rs +++ b/swarm/benches/connection_handler.rs @@ -1,10 +1,10 @@ use async_std::stream::StreamExt; use criterion::{criterion_group, criterion_main, Criterion}; -use futures::FutureExt; use instant::Duration; use libp2p_core::{ transport::MemoryTransport, InboundUpgrade, Multiaddr, OutboundUpgrade, Transport, UpgradeInfo, }; +use libp2p_identity::PeerId; use libp2p_swarm::{ConnectionHandler, NetworkBehaviour, StreamProtocol}; use std::convert::Infallible; @@ -18,7 +18,7 @@ macro_rules! benchmark_one_behaviour_many_protocols { #[tokio::main(flavor = "multi_thread", worker_threads = 1)] async fn $name(c: &mut Criterion) { let protocol_count = parse_counts(stringify!($name)); - bench_run_one_behaviour_many_protocols(c, protocol_count, 100000); + bench_run_one_behaviour_many_protocols(c, protocol_count, 10000); } )* @@ -32,9 +32,9 @@ macro_rules! benchmark_one_behaviour_many_protocols { } benchmark_one_behaviour_many_protocols! { + one_behavior_many_protocols_10, one_behavior_many_protocols_100, one_behavior_many_protocols_1000, - one_behavior_many_protocols_10000, } fn parse_counts(name: &str) -> usize { @@ -58,6 +58,9 @@ fn new_swarm(protocol_count: usize, spam_count: usize) -> libp2p_swarm::Swarm().unwrap()) .unwrap(); - c.bench_function(&format!("one_behavior_many_protocols_{protocol_count}_{spam_count}"), |b| { + c.bench_function( + &format!("one_behavior_many_protocols_{protocol_count}_{spam_count}"), + |b| { + sa.behaviour_mut().finished = false; + sb.behaviour_mut().finished = false; b.iter(|| { futures::executor::block_on(async { - let [mut af, mut bf] = [false; 2]; - while !af || !bf { - futures::select! { - event = sb.next().fuse() => { - bf |= matches!(event, Some(libp2p_swarm::SwarmEvent::Behaviour(FinishedSpamming))); - } - event = sa.next().fuse() => { - af |= matches!(event, Some(libp2p_swarm::SwarmEvent::Behaviour(FinishedSpamming))); - } - } + while !sa.behaviour().finished || !sb.behaviour().finished { + futures::future::select(sb.next(), sa.next()).await; } }); }); - }); + }, + ); } struct PollerBehaviour { spam_count: usize, protocols: &'static [StreamProtocol], + restarting: bool, + finished: bool, + other_peer: Option, } #[derive(Debug)] @@ -148,7 +154,7 @@ impl NetworkBehaviour for PollerBehaviour { _connection_id: libp2p_swarm::ConnectionId, _event: libp2p_swarm::THandlerOutEvent, ) { - self.spam_count = 0; + self.finished = true; } fn poll( @@ -156,8 +162,16 @@ impl NetworkBehaviour for PollerBehaviour { _: &mut std::task::Context<'_>, ) -> std::task::Poll>> { - if self.spam_count == 0 { + if self.finished && !self.restarting { + self.restarting = true; std::task::Poll::Ready(libp2p_swarm::ToSwarm::GenerateEvent(FinishedSpamming)) + } else if !self.finished && self.restarting { + self.restarting = false; + std::task::Poll::Ready(libp2p_swarm::ToSwarm::NotifyHandler { + peer_id: self.other_peer.unwrap(), + handler: libp2p_swarm::NotifyHandler::Any, + event: Restart(self.spam_count), + }) } else { std::task::Poll::Pending } @@ -170,8 +184,11 @@ struct PollerHandler { protocols: &'static [StreamProtocol], } +#[derive(Debug)] +struct Restart(usize); + impl ConnectionHandler for PollerHandler { - type FromBehaviour = Infallible; + type FromBehaviour = Restart; type ToBehaviour = FinishedSpamming; @@ -199,6 +216,10 @@ impl ConnectionHandler for PollerHandler { Self::ToBehaviour, >, > { + if self.spam_count == usize::MAX { + return std::task::Poll::Pending; + } + if self.spam_count != 0 { self.spam_count -= 1; cx.waker().wake_by_ref(); @@ -211,8 +232,10 @@ impl ConnectionHandler for PollerHandler { )) } - fn on_behaviour_event(&mut self, _event: Self::FromBehaviour) { - match _event {} + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + match event { + Restart(spam_count) => self.spam_count = spam_count, + } } fn on_connection_event(