From b127a0884b44a0ffe35aab2070f82ae767fd485a Mon Sep 17 00:00:00 2001 From: leonzchang Date: Mon, 16 Oct 2023 11:46:38 +0800 Subject: [PATCH 1/2] remove KeepAlive::Until --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/perf/CHANGELOG.md | 2 ++ protocols/perf/Cargo.toml | 2 +- protocols/perf/src/client/handler.rs | 21 +++++---------------- protocols/perf/src/server/handler.rs | 20 +++++--------------- 6 files changed, 15 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dc04ae5f776..e1aa9bbb0b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2830,7 +2830,7 @@ dependencies = [ [[package]] name = "libp2p-perf" -version = "0.2.0" +version = "0.2.1" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index c918f6687b5..dd6c49cd321 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,7 +91,7 @@ libp2p-metrics = { version = "0.13.1", path = "misc/metrics" } libp2p-mplex = { version = "0.40.0", path = "muxers/mplex" } libp2p-muxer-test-harness = { path = "muxers/test-harness" } libp2p-noise = { version = "0.43.1", path = "transports/noise" } -libp2p-perf = { version = "0.2.0", path = "protocols/perf" } +libp2p-perf = { version = "0.2.1", path = "protocols/perf" } libp2p-ping = { version = "0.43.1", path = "protocols/ping" } libp2p-plaintext = { version = "0.40.1", path = "transports/plaintext" } libp2p-pnet = { version = "0.23.0", path = "transports/pnet" } diff --git a/protocols/perf/CHANGELOG.md b/protocols/perf/CHANGELOG.md index e46a94e981a..066c48802cc 100644 --- a/protocols/perf/CHANGELOG.md +++ b/protocols/perf/CHANGELOG.md @@ -1,3 +1,5 @@ +## 0.2.1 - unreleased + ## 0.2.0 - Raise MSRV to 1.65. diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml index c61deb37c29..5e9ecad5202 100644 --- a/protocols/perf/Cargo.toml +++ b/protocols/perf/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-perf" edition = "2021" rust-version = { workspace = true } description = "libp2p perf protocol implementation" -version = "0.2.0" +version = "0.2.1" authors = ["Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index 8a6df43d198..8806fa41a51 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -64,8 +64,6 @@ pub struct Handler { requested_streams: VecDeque, outbound: FuturesUnordered>>, - - keep_alive: KeepAlive, } impl Handler { @@ -74,7 +72,6 @@ impl Handler { queued_events: Default::default(), requested_streams: Default::default(), outbound: Default::default(), - keep_alive: KeepAlive::Yes, } } } @@ -158,7 +155,11 @@ impl ConnectionHandler for Handler { } fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive + if self.outbound.is_empty() { + return KeepAlive::No; + } + + KeepAlive::Yes } fn poll( @@ -186,18 +187,6 @@ impl ConnectionHandler for Handler { } } - if self.outbound.is_empty() { - match self.keep_alive { - KeepAlive::Yes => { - self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10)); - } - KeepAlive::Until(_) => {} - KeepAlive::No => panic!("Handler never sets KeepAlive::No."), - } - } else { - self.keep_alive = KeepAlive::Yes - } - Poll::Pending } } diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index e8f7b72e605..e0ad784a5db 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -44,14 +44,12 @@ pub struct Event { pub struct Handler { inbound: FuturesUnordered>>, - keep_alive: KeepAlive, } impl Handler { pub fn new() -> Self { Self { inbound: Default::default(), - keep_alive: KeepAlive::Yes, } } } @@ -113,7 +111,11 @@ impl ConnectionHandler for Handler { } fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive + if self.inbound.is_empty() { + return KeepAlive::No; + } + + KeepAlive::Yes } fn poll( @@ -138,18 +140,6 @@ impl ConnectionHandler for Handler { } } - if self.inbound.is_empty() { - match self.keep_alive { - KeepAlive::Yes => { - self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10)); - } - KeepAlive::Until(_) => {} - KeepAlive::No => panic!("Handler never sets KeepAlive::No."), - } - } else { - self.keep_alive = KeepAlive::Yes - } - Poll::Pending } } From b77234f7b91d51c39df3c16d2b07b2b2d32077d4 Mon Sep 17 00:00:00 2001 From: leonzchang Date: Mon, 16 Oct 2023 14:16:22 +0800 Subject: [PATCH 2/2] remove unuse module and revert version --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/perf/CHANGELOG.md | 2 - protocols/perf/Cargo.toml | 2 +- protocols/perf/src/client/behaviour.rs | 158 -------------------- protocols/perf/src/client/handler.rs | 192 ------------------------- protocols/perf/src/server/behaviour.rs | 121 ---------------- protocols/perf/src/server/handler.rs | 145 ------------------- 8 files changed, 3 insertions(+), 621 deletions(-) delete mode 100644 protocols/perf/src/client/behaviour.rs delete mode 100644 protocols/perf/src/client/handler.rs delete mode 100644 protocols/perf/src/server/behaviour.rs delete mode 100644 protocols/perf/src/server/handler.rs diff --git a/Cargo.lock b/Cargo.lock index e1aa9bbb0b4..dc04ae5f776 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2830,7 +2830,7 @@ dependencies = [ [[package]] name = "libp2p-perf" -version = "0.2.1" +version = "0.2.0" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index dd6c49cd321..c918f6687b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,7 +91,7 @@ libp2p-metrics = { version = "0.13.1", path = "misc/metrics" } libp2p-mplex = { version = "0.40.0", path = "muxers/mplex" } libp2p-muxer-test-harness = { path = "muxers/test-harness" } libp2p-noise = { version = "0.43.1", path = "transports/noise" } -libp2p-perf = { version = "0.2.1", path = "protocols/perf" } +libp2p-perf = { version = "0.2.0", path = "protocols/perf" } libp2p-ping = { version = "0.43.1", path = "protocols/ping" } libp2p-plaintext = { version = "0.40.1", path = "transports/plaintext" } libp2p-pnet = { version = "0.23.0", path = "transports/pnet" } diff --git a/protocols/perf/CHANGELOG.md b/protocols/perf/CHANGELOG.md index 066c48802cc..e46a94e981a 100644 --- a/protocols/perf/CHANGELOG.md +++ b/protocols/perf/CHANGELOG.md @@ -1,5 +1,3 @@ -## 0.2.1 - unreleased - ## 0.2.0 - Raise MSRV to 1.65. diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml index 5e9ecad5202..c61deb37c29 100644 --- a/protocols/perf/Cargo.toml +++ b/protocols/perf/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-perf" edition = "2021" rust-version = { workspace = true } description = "libp2p perf protocol implementation" -version = "0.2.1" +version = "0.2.0" authors = ["Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs deleted file mode 100644 index 912f6d5bb9e..00000000000 --- a/protocols/perf/src/client/behaviour.rs +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright 2023 Protocol Labs. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! [`NetworkBehaviour`] of the libp2p perf client protocol. - -use std::{ - collections::{HashSet, VecDeque}, - task::{Context, Poll}, -}; - -use libp2p_core::Multiaddr; -use libp2p_identity::PeerId; -use libp2p_swarm::{ - derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionId, FromSwarm, - NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandlerInEvent, - THandlerOutEvent, ToSwarm, -}; -use void::Void; - -use crate::client::handler::Handler; - -use super::{RunId, RunParams, RunStats}; - -#[derive(Debug)] -pub struct Event { - pub id: RunId, - pub result: Result>, -} - -#[derive(Default)] -pub struct Behaviour { - /// Queue of actions to return when polled. - queued_events: VecDeque>>, - /// Set of connected peers. - connected: HashSet, -} - -impl Behaviour { - pub fn new() -> Self { - Self::default() - } - - pub fn perf(&mut self, server: PeerId, params: RunParams) -> Result { - if !self.connected.contains(&server) { - return Err(PerfError::NotConnected); - } - - let id = RunId::next(); - - self.queued_events.push_back(ToSwarm::NotifyHandler { - peer_id: server, - handler: NotifyHandler::Any, - event: crate::client::handler::Command { id, params }, - }); - - Ok(id) - } -} - -#[derive(thiserror::Error, Debug)] -pub enum PerfError { - #[error("Not connected to peer")] - NotConnected, -} - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = Handler; - type ToSwarm = Event; - - fn handle_established_outbound_connection( - &mut self, - _connection_id: ConnectionId, - _peer: PeerId, - _addr: &Multiaddr, - _role_override: libp2p_core::Endpoint, - ) -> Result, libp2p_swarm::ConnectionDenied> { - Ok(Handler::default()) - } - - fn handle_established_inbound_connection( - &mut self, - _connection_id: ConnectionId, - _peer: PeerId, - _local_addr: &Multiaddr, - _remote_addr: &Multiaddr, - ) -> Result, libp2p_swarm::ConnectionDenied> { - Ok(Handler::default()) - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { - self.connected.insert(peer_id); - } - FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id: _, - endpoint: _, - handler: _, - remaining_established, - }) => { - if remaining_established == 0 { - assert!(self.connected.remove(&peer_id)); - } - } - FromSwarm::AddressChange(_) - | FromSwarm::DialFailure(_) - | FromSwarm::ListenFailure(_) - | FromSwarm::NewListener(_) - | FromSwarm::NewListenAddr(_) - | FromSwarm::ExpiredListenAddr(_) - | FromSwarm::ListenerError(_) - | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddrCandidate(_) - | FromSwarm::ExternalAddrExpired(_) - | FromSwarm::ExternalAddrConfirmed(_) => {} - } - } - - fn on_connection_handler_event( - &mut self, - _event_source: PeerId, - _connection_id: ConnectionId, - super::handler::Event { id, result }: THandlerOutEvent, - ) { - self.queued_events - .push_back(ToSwarm::GenerateEvent(Event { id, result })); - } - - fn poll( - &mut self, - _cx: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>> { - if let Some(event) = self.queued_events.pop_front() { - return Poll::Ready(event); - } - - Poll::Pending - } -} diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs deleted file mode 100644 index 8806fa41a51..00000000000 --- a/protocols/perf/src/client/handler.rs +++ /dev/null @@ -1,192 +0,0 @@ -// Copyright 2023 Protocol Labs. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::{Duration, Instant}, -}; - -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt}; -use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; -use libp2p_swarm::{ - handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, - }, - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError, - SubstreamProtocol, -}; -use void::Void; - -use super::{RunId, RunParams, RunStats}; - -#[derive(Debug)] -pub struct Command { - pub(crate) id: RunId, - pub(crate) params: RunParams, -} - -#[derive(Debug)] -pub struct Event { - pub(crate) id: RunId, - pub(crate) result: Result>, -} - -pub struct Handler { - /// Queue of events to return when polled. - queued_events: VecDeque< - ConnectionHandlerEvent< - ::OutboundProtocol, - ::OutboundOpenInfo, - ::ToBehaviour, - ::Error, - >, - >, - - requested_streams: VecDeque, - - outbound: FuturesUnordered>>, -} - -impl Handler { - pub fn new() -> Self { - Self { - queued_events: Default::default(), - requested_streams: Default::default(), - outbound: Default::default(), - } - } -} - -impl Default for Handler { - fn default() -> Self { - Self::new() - } -} - -impl ConnectionHandler for Handler { - type FromBehaviour = Command; - type ToBehaviour = Event; - type Error = Void; - type InboundProtocol = DeniedUpgrade; - type OutboundProtocol = ReadyUpgrade; - type OutboundOpenInfo = (); - type InboundOpenInfo = (); - - fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(DeniedUpgrade, ()) - } - - fn on_behaviour_event(&mut self, command: Self::FromBehaviour) { - self.requested_streams.push_back(command); - self.queued_events - .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ()), - }) - } - - fn on_connection_event( - &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, - ) { - match event { - ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol, .. - }) => void::unreachable(protocol), - ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol, - info: (), - }) => { - let Command { id, params } = self - .requested_streams - .pop_front() - .expect("opened a stream without a pending command"); - self.outbound.push( - crate::protocol::send_receive(params, protocol) - .map_ok(move |timers| Event { - id, - result: Ok(RunStats { params, timers }), - }) - .boxed(), - ); - } - - ConnectionEvent::AddressChange(_) - | ConnectionEvent::LocalProtocolsChange(_) - | ConnectionEvent::RemoteProtocolsChange(_) => {} - ConnectionEvent::DialUpgradeError(DialUpgradeError { info: (), error }) => { - let Command { id, .. } = self - .requested_streams - .pop_front() - .expect("requested stream without pending command"); - self.queued_events - .push_back(ConnectionHandlerEvent::NotifyBehaviour(Event { - id, - result: Err(error), - })); - } - ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => { - void::unreachable(error) - } - } - } - - fn connection_keep_alive(&self) -> KeepAlive { - if self.outbound.is_empty() { - return KeepAlive::No; - } - - KeepAlive::Yes - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, - > { - // Return queued events. - if let Some(event) = self.queued_events.pop_front() { - return Poll::Ready(event); - } - - while let Poll::Ready(Some(result)) = self.outbound.poll_next_unpin(cx) { - match result { - Ok(event) => return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)), - Err(e) => { - panic!("{e:?}") - } - } - } - - Poll::Pending - } -} diff --git a/protocols/perf/src/server/behaviour.rs b/protocols/perf/src/server/behaviour.rs deleted file mode 100644 index b15cb70110d..00000000000 --- a/protocols/perf/src/server/behaviour.rs +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2023 Protocol Labs. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! [`NetworkBehaviour`] of the libp2p perf server protocol. - -use std::{ - collections::VecDeque, - task::{Context, Poll}, -}; - -use libp2p_identity::PeerId; -use libp2p_swarm::{ - ConnectionId, FromSwarm, NetworkBehaviour, PollParameters, THandlerInEvent, THandlerOutEvent, - ToSwarm, -}; - -use crate::server::handler::Handler; - -use super::RunStats; - -#[derive(Debug)] -pub struct Event { - pub remote_peer_id: PeerId, - pub stats: RunStats, -} - -#[derive(Default)] -pub struct Behaviour { - /// Queue of actions to return when polled. - queued_events: VecDeque>>, -} - -impl Behaviour { - pub fn new() -> Self { - Self::default() - } -} - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = Handler; - type ToSwarm = Event; - - fn handle_established_inbound_connection( - &mut self, - _connection_id: ConnectionId, - _peer: PeerId, - _local_addr: &libp2p_core::Multiaddr, - _remote_addr: &libp2p_core::Multiaddr, - ) -> Result, libp2p_swarm::ConnectionDenied> { - Ok(Handler::default()) - } - - fn handle_established_outbound_connection( - &mut self, - _connection_id: ConnectionId, - _peer: PeerId, - _addr: &libp2p_core::Multiaddr, - _role_override: libp2p_core::Endpoint, - ) -> Result, libp2p_swarm::ConnectionDenied> { - Ok(Handler::default()) - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(_) => {} - FromSwarm::ConnectionClosed(_) => {} - FromSwarm::AddressChange(_) => {} - FromSwarm::DialFailure(_) => {} - FromSwarm::ListenFailure(_) => {} - FromSwarm::NewListener(_) => {} - FromSwarm::NewListenAddr(_) => {} - FromSwarm::ExpiredListenAddr(_) => {} - FromSwarm::ListenerError(_) => {} - FromSwarm::ListenerClosed(_) => {} - FromSwarm::NewExternalAddrCandidate(_) => {} - FromSwarm::ExternalAddrExpired(_) => {} - FromSwarm::ExternalAddrConfirmed(_) => {} - } - } - - fn on_connection_handler_event( - &mut self, - event_source: PeerId, - _connection_id: ConnectionId, - super::handler::Event { stats }: THandlerOutEvent, - ) { - self.queued_events.push_back(ToSwarm::GenerateEvent(Event { - remote_peer_id: event_source, - stats, - })) - } - - fn poll( - &mut self, - _cx: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>> { - if let Some(event) = self.queued_events.pop_front() { - return Poll::Ready(event); - } - - Poll::Pending - } -} diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs deleted file mode 100644 index e0ad784a5db..00000000000 --- a/protocols/perf/src/server/handler.rs +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright 2023 Protocol Labs. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use std::{ - task::{Context, Poll}, - time::{Duration, Instant}, -}; - -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; -use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; -use libp2p_swarm::{ - handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, - }, - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, SubstreamProtocol, -}; -use log::error; -use void::Void; - -use super::RunStats; - -#[derive(Debug)] -pub struct Event { - pub stats: RunStats, -} - -pub struct Handler { - inbound: FuturesUnordered>>, -} - -impl Handler { - pub fn new() -> Self { - Self { - inbound: Default::default(), - } - } -} - -impl Default for Handler { - fn default() -> Self { - Self::new() - } -} - -impl ConnectionHandler for Handler { - type FromBehaviour = Void; - type ToBehaviour = Event; - type Error = Void; - type InboundProtocol = ReadyUpgrade; - type OutboundProtocol = DeniedUpgrade; - type OutboundOpenInfo = Void; - type InboundOpenInfo = (); - - fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ()) - } - - fn on_behaviour_event(&mut self, v: Self::FromBehaviour) { - void::unreachable(v) - } - - fn on_connection_event( - &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, - ) { - match event { - ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol, - info: _, - }) => { - self.inbound - .push(crate::protocol::receive_send(protocol).boxed()); - } - ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { info, .. }) => { - void::unreachable(info) - } - - ConnectionEvent::DialUpgradeError(DialUpgradeError { info, .. }) => { - void::unreachable(info) - } - ConnectionEvent::AddressChange(_) - | ConnectionEvent::LocalProtocolsChange(_) - | ConnectionEvent::RemoteProtocolsChange(_) => {} - ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => { - void::unreachable(error) - } - } - } - - fn connection_keep_alive(&self) -> KeepAlive { - if self.inbound.is_empty() { - return KeepAlive::No; - } - - KeepAlive::Yes - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, - > { - while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) { - match result { - Ok(stats) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { stats })) - } - Err(e) => { - error!("{e:?}") - } - } - } - - Poll::Pending - } -}