From 0cfda6ec3b417916e270963634fd9a9308c56c56 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 9 Nov 2023 15:44:37 +1100 Subject: [PATCH 1/8] WIP --- swarm/src/upgrade.rs | 2 ++ swarm/src/upgrade/one.rs | 16 ++++++++++++++++ 2 files changed, 18 insertions(+) create mode 100644 swarm/src/upgrade/one.rs diff --git a/swarm/src/upgrade.rs b/swarm/src/upgrade.rs index 53b627458c9..4876965ab04 100644 --- a/swarm/src/upgrade.rs +++ b/swarm/src/upgrade.rs @@ -18,6 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +mod one; + use crate::Stream; use futures::prelude::*; diff --git a/swarm/src/upgrade/one.rs b/swarm/src/upgrade/one.rs new file mode 100644 index 00000000000..d6db848c45d --- /dev/null +++ b/swarm/src/upgrade/one.rs @@ -0,0 +1,16 @@ +use crate::upgrade::UpgradeInfoSend; +use crate::StreamProtocol; +use std::iter; + +pub struct OneProtocol { + p: StreamProtocol, +} + +impl UpgradeInfoSend for OneProtocol { + type Info = StreamProtocol; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.p.clone()) + } +} From e33905e137fc933774b2f5b493caaa0aebe95cf3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 9 Nov 2023 16:07:27 +1100 Subject: [PATCH 2/8] Deprecate `ReadyUpgrade` in favor of `SingleProtocol` --- core/CHANGELOG.md | 2 ++ core/src/upgrade.rs | 1 + core/src/upgrade/ready.rs | 3 ++ protocols/dcutr/src/handler/relayed.rs | 12 +++---- swarm/src/lib.rs | 1 + swarm/src/upgrade.rs | 2 +- swarm/src/upgrade/one.rs | 16 ---------- swarm/src/upgrade/single.rs | 44 ++++++++++++++++++++++++++ 8 files changed, 58 insertions(+), 23 deletions(-) delete mode 100644 swarm/src/upgrade/one.rs create mode 100644 swarm/src/upgrade/single.rs diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 034524b46dd..a8a1c3088ed 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -2,6 +2,8 @@ - Implement `{In,Out}boundConnectionUpgrade` for `SelectUpgrade`. See [PR 4812](https://github.com/libp2p/rust-libp2p/pull/4812). +- Deprecate "upgrade" implementations in preparation for removal. + See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX). ## 0.41.0 diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index 69561fbebd8..be06a9aa362 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -71,6 +71,7 @@ pub(crate) use apply::{ pub(crate) use error::UpgradeError; use futures::future::Future; +#[allow(deprecated)] pub use self::{ denied::DeniedUpgrade, pending::PendingUpgrade, ready::ReadyUpgrade, select::SelectUpgrade, }; diff --git a/core/src/upgrade/ready.rs b/core/src/upgrade/ready.rs index 323f1f73f32..f672b8e9c7d 100644 --- a/core/src/upgrade/ready.rs +++ b/core/src/upgrade/ready.rs @@ -19,6 +19,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +#![allow(deprecated)] + use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use futures::future; use std::iter; @@ -26,6 +28,7 @@ use void::Void; /// Implementation of [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] that directly yields the substream. #[derive(Debug, Copy, Clone)] +#[deprecated(note = "Use `libp2p::swarm::SingleProtocol` instead.")] pub struct ReadyUpgrade

{ protocol_name: P, } diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index eba58f89313..7b8c385aed5 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -25,14 +25,14 @@ use crate::{protocol, PROTOCOL_NAME}; use either::Either; use futures::future; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; +use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::ConnectedPoint; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError, + ConnectionHandler, ConnectionHandlerEvent, SingleProtocol, StreamProtocol, StreamUpgradeError, SubstreamProtocol, }; use protocol::{inbound, outbound}; @@ -180,15 +180,15 @@ impl Handler { impl ConnectionHandler for Handler { type FromBehaviour = Command; type ToBehaviour = Event; - type InboundProtocol = Either, DeniedUpgrade>; - type OutboundProtocol = ReadyUpgrade; + type InboundProtocol = Either; + type OutboundProtocol = SingleProtocol; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { match self.endpoint { ConnectedPoint::Dialer { .. } => { - SubstreamProtocol::new(Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)), ()) + SubstreamProtocol::new(Either::Left(SingleProtocol::new(PROTOCOL_NAME)), ()) } ConnectedPoint::Listener { .. } => { // By the protocol specification the listening side of a relayed connection @@ -206,7 +206,7 @@ impl ConnectionHandler for Handler { Command::Connect => { self.queued_events .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()), + protocol: SubstreamProtocol::new(SingleProtocol::new(PROTOCOL_NAME), ()), }); self.attempts += 1; } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 462dc718b86..f94dfa3ee2d 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -123,6 +123,7 @@ pub use libp2p_swarm_derive::NetworkBehaviour; pub use listen_opts::ListenOpts; pub use stream::Stream; pub use stream_protocol::{InvalidProtocol, StreamProtocol}; +pub use upgrade::single::SingleProtocol; use crate::behaviour::ExternalAddrConfirmed; use crate::handler::UpgradeInfoSend; diff --git a/swarm/src/upgrade.rs b/swarm/src/upgrade.rs index 4876965ab04..a5ef11e6e16 100644 --- a/swarm/src/upgrade.rs +++ b/swarm/src/upgrade.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -mod one; +pub(crate) mod single; use crate::Stream; diff --git a/swarm/src/upgrade/one.rs b/swarm/src/upgrade/one.rs deleted file mode 100644 index d6db848c45d..00000000000 --- a/swarm/src/upgrade/one.rs +++ /dev/null @@ -1,16 +0,0 @@ -use crate::upgrade::UpgradeInfoSend; -use crate::StreamProtocol; -use std::iter; - -pub struct OneProtocol { - p: StreamProtocol, -} - -impl UpgradeInfoSend for OneProtocol { - type Info = StreamProtocol; - type InfoIter = iter::Once; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once(self.p.clone()) - } -} diff --git a/swarm/src/upgrade/single.rs b/swarm/src/upgrade/single.rs new file mode 100644 index 00000000000..3da3e621b30 --- /dev/null +++ b/swarm/src/upgrade/single.rs @@ -0,0 +1,44 @@ +use crate::{Stream, StreamProtocol}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use std::future::{ready, Ready}; +use std::iter; +use void::Void; + +pub struct SingleProtocol { + p: StreamProtocol, +} + +impl SingleProtocol { + pub fn new(p: StreamProtocol) -> Self { + Self { p } + } +} + +impl UpgradeInfo for SingleProtocol { + type Info = StreamProtocol; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.p.clone()) + } +} + +impl InboundUpgrade for SingleProtocol { + type Output = Stream; + type Error = Void; + type Future = Ready>; + + fn upgrade_inbound(self, socket: Stream, _: Self::Info) -> Self::Future { + ready(Ok(socket)) + } +} + +impl OutboundUpgrade for SingleProtocol { + type Output = Stream; + type Error = Void; + type Future = Ready>; + + fn upgrade_outbound(self, socket: Stream, _: Self::Info) -> Self::Future { + ready(Ok(socket)) + } +} From 8c82d0421f5e54a14dd50cdeda2cd9c969fff25f Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 9 Nov 2023 16:08:50 +1100 Subject: [PATCH 3/8] Remove pending upgrade & handler This would need to be deprecated first. --- core/src/upgrade.rs | 5 +- core/src/upgrade/pending.rs | 76 ---------------------------- swarm/src/handler.rs | 2 - swarm/src/handler/pending.rs | 97 ------------------------------------ 4 files changed, 1 insertion(+), 179 deletions(-) delete mode 100644 core/src/upgrade/pending.rs delete mode 100644 swarm/src/handler/pending.rs diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index be06a9aa362..e5ebf657924 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -61,7 +61,6 @@ mod apply; mod denied; mod either; mod error; -mod pending; mod ready; mod select; @@ -72,9 +71,7 @@ pub(crate) use error::UpgradeError; use futures::future::Future; #[allow(deprecated)] -pub use self::{ - denied::DeniedUpgrade, pending::PendingUpgrade, ready::ReadyUpgrade, select::SelectUpgrade, -}; +pub use self::{denied::DeniedUpgrade, ready::ReadyUpgrade, select::SelectUpgrade}; pub use crate::Negotiated; pub use multistream_select::{NegotiatedComplete, NegotiationError, ProtocolError, Version}; diff --git a/core/src/upgrade/pending.rs b/core/src/upgrade/pending.rs deleted file mode 100644 index 6931e20bfdc..00000000000 --- a/core/src/upgrade/pending.rs +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2022 Protocol Labs. -// Copyright 2017-2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use futures::future; -use std::iter; -use void::Void; - -/// Implementation of [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] that always -/// returns a pending upgrade. -#[derive(Debug, Copy, Clone)] -pub struct PendingUpgrade

{ - protocol_name: P, -} - -impl

PendingUpgrade

{ - pub fn new(protocol_name: P) -> Self { - Self { protocol_name } - } -} - -impl

UpgradeInfo for PendingUpgrade

-where - P: AsRef + Clone, -{ - type Info = P; - type InfoIter = iter::Once

; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once(self.protocol_name.clone()) - } -} - -impl InboundUpgrade for PendingUpgrade

-where - P: AsRef + Clone, -{ - type Output = Void; - type Error = Void; - type Future = future::Pending>; - - fn upgrade_inbound(self, _: C, _: Self::Info) -> Self::Future { - future::pending() - } -} - -impl OutboundUpgrade for PendingUpgrade

-where - P: AsRef + Clone, -{ - type Output = Void; - type Error = Void; - type Future = future::Pending>; - - fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future { - future::pending() - } -} diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 31d2c91e391..f064a721c51 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -43,14 +43,12 @@ mod map_in; mod map_out; pub mod multi; mod one_shot; -mod pending; mod select; pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend}; pub use map_in::MapInEvent; pub use map_out::MapOutEvent; pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; -pub use pending::PendingConnectionHandler; pub use select::ConnectionHandlerSelect; use crate::StreamProtocol; diff --git a/swarm/src/handler/pending.rs b/swarm/src/handler/pending.rs deleted file mode 100644 index 23b9adcfd90..00000000000 --- a/swarm/src/handler/pending.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2022 Protocol Labs. -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::handler::{ - ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, FullyNegotiatedInbound, - FullyNegotiatedOutbound, SubstreamProtocol, -}; -use libp2p_core::upgrade::PendingUpgrade; -use std::task::{Context, Poll}; -use void::Void; - -/// Implementation of [`ConnectionHandler`] that returns a pending upgrade. -#[derive(Clone, Debug)] -pub struct PendingConnectionHandler { - protocol_name: String, -} - -impl PendingConnectionHandler { - pub fn new(protocol_name: String) -> Self { - PendingConnectionHandler { protocol_name } - } -} - -impl ConnectionHandler for PendingConnectionHandler { - type FromBehaviour = Void; - type ToBehaviour = Void; - type InboundProtocol = PendingUpgrade; - type OutboundProtocol = PendingUpgrade; - type OutboundOpenInfo = Void; - type InboundOpenInfo = (); - - fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(PendingUpgrade::new(self.protocol_name.clone()), ()) - } - - fn on_behaviour_event(&mut self, v: Self::FromBehaviour) { - void::unreachable(v) - } - - fn poll( - &mut self, - _: &mut Context<'_>, - ) -> Poll< - ConnectionHandlerEvent, - > { - Poll::Pending - } - - fn on_connection_event( - &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, - ) { - match event { - ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { - protocol, .. - }) => void::unreachable(protocol), - ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol, - info: _info, - }) => { - void::unreachable(protocol); - #[allow(unreachable_code, clippy::used_underscore_binding)] - { - void::unreachable(_info); - } - } - ConnectionEvent::AddressChange(_) - | ConnectionEvent::DialUpgradeError(_) - | ConnectionEvent::ListenUpgradeError(_) - | ConnectionEvent::LocalProtocolsChange(_) - | ConnectionEvent::RemoteProtocolsChange(_) => {} - } - } -} From adc067646b0b9c6e979d80fa50557fb67b7322fc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 9 Nov 2023 16:20:42 +1100 Subject: [PATCH 4/8] Remove `Protocol` assoc type from `request-response` --- protocols/autonat/src/protocol.rs | 1 - protocols/rendezvous/src/codec.rs | 9 ++++----- protocols/request-response/src/cbor.rs | 9 ++++----- protocols/request-response/src/codec.rs | 11 +++++------ protocols/request-response/src/handler.rs | 14 +++++++------- protocols/request-response/src/json.rs | 9 ++++----- protocols/request-response/src/lib.rs | 12 ++++++------ .../request-response/tests/error_reporting.rs | 9 ++++----- 8 files changed, 34 insertions(+), 40 deletions(-) diff --git a/protocols/autonat/src/protocol.rs b/protocols/autonat/src/protocol.rs index b28f70cadf4..b490bae82d8 100644 --- a/protocols/autonat/src/protocol.rs +++ b/protocols/autonat/src/protocol.rs @@ -37,7 +37,6 @@ pub struct AutoNatCodec; #[async_trait] impl request_response::Codec for AutoNatCodec { - type Protocol = StreamProtocol; type Request = DialRequest; type Response = DialResponse; diff --git a/protocols/rendezvous/src/codec.rs b/protocols/rendezvous/src/codec.rs index bfc3cf275fc..e9d093c9113 100644 --- a/protocols/rendezvous/src/codec.rs +++ b/protocols/rendezvous/src/codec.rs @@ -241,11 +241,10 @@ pub struct Codec {} #[async_trait] impl libp2p_request_response::Codec for Codec { - type Protocol = StreamProtocol; type Request = Message; type Response = Message; - async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + async fn read_request(&mut self, _: &StreamProtocol, io: &mut T) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -259,7 +258,7 @@ impl libp2p_request_response::Codec for Codec { async fn read_response( &mut self, - _: &Self::Protocol, + _: &StreamProtocol, io: &mut T, ) -> io::Result where @@ -275,7 +274,7 @@ impl libp2p_request_response::Codec for Codec { async fn write_request( &mut self, - _: &Self::Protocol, + _: &StreamProtocol, io: &mut T, req: Self::Request, ) -> io::Result<()> @@ -289,7 +288,7 @@ impl libp2p_request_response::Codec for Codec { async fn write_response( &mut self, - _: &Self::Protocol, + _: &StreamProtocol, io: &mut T, res: Self::Response, ) -> io::Result<()> diff --git a/protocols/request-response/src/cbor.rs b/protocols/request-response/src/cbor.rs index f371f6149dc..1bdd47fcfb8 100644 --- a/protocols/request-response/src/cbor.rs +++ b/protocols/request-response/src/cbor.rs @@ -81,11 +81,10 @@ mod codec { Req: Send + Serialize + DeserializeOwned, Resp: Send + Serialize + DeserializeOwned, { - type Protocol = StreamProtocol; type Request = Req; type Response = Resp; - async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + async fn read_request(&mut self, _: &StreamProtocol, io: &mut T) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -96,7 +95,7 @@ mod codec { cbor4ii::serde::from_slice(vec.as_slice()).map_err(decode_into_io_error) } - async fn read_response(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + async fn read_response(&mut self, _: &StreamProtocol, io: &mut T) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -109,7 +108,7 @@ mod codec { async fn write_request( &mut self, - _: &Self::Protocol, + _: &StreamProtocol, io: &mut T, req: Self::Request, ) -> io::Result<()> @@ -126,7 +125,7 @@ mod codec { async fn write_response( &mut self, - _: &Self::Protocol, + _: &StreamProtocol, io: &mut T, resp: Self::Response, ) -> io::Result<()> diff --git a/protocols/request-response/src/codec.rs b/protocols/request-response/src/codec.rs index d26b729acae..166ee43bb5b 100644 --- a/protocols/request-response/src/codec.rs +++ b/protocols/request-response/src/codec.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use futures::prelude::*; +use libp2p_swarm::StreamProtocol; use std::io; /// A `Codec` defines the request and response types @@ -27,8 +28,6 @@ use std::io; /// protocol family and how they are encoded / decoded on an I/O stream. #[async_trait] pub trait Codec { - /// The type of protocol(s) or protocol versions being negotiated. - type Protocol: AsRef + Send + Clone; /// The type of inbound and outbound requests. type Request: Send; /// The type of inbound and outbound responses. @@ -38,7 +37,7 @@ pub trait Codec { /// negotiated protocol. async fn read_request( &mut self, - protocol: &Self::Protocol, + protocol: &StreamProtocol, io: &mut T, ) -> io::Result where @@ -48,7 +47,7 @@ pub trait Codec { /// negotiated protocol. async fn read_response( &mut self, - protocol: &Self::Protocol, + protocol: &StreamProtocol, io: &mut T, ) -> io::Result where @@ -58,7 +57,7 @@ pub trait Codec { /// negotiated protocol. async fn write_request( &mut self, - protocol: &Self::Protocol, + protocol: &StreamProtocol, io: &mut T, req: Self::Request, ) -> io::Result<()> @@ -69,7 +68,7 @@ pub trait Codec { /// negotiated protocol. async fn write_response( &mut self, - protocol: &Self::Protocol, + protocol: &StreamProtocol, io: &mut T, res: Self::Response, ) -> io::Result<()> diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 2d45e0d7dc3..80abafba580 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -34,7 +34,7 @@ use libp2p_swarm::handler::{ }; use libp2p_swarm::{ handler::{ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError}, - SubstreamProtocol, + StreamProtocol, SubstreamProtocol, }; use smallvec::SmallVec; use std::{ @@ -54,7 +54,7 @@ where TCodec: Codec, { /// The supported inbound protocols. - inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, + inbound_protocols: SmallVec<[StreamProtocol; 2]>, /// The request/response message codec. codec: TCodec, /// Queue of events to emit in `poll()`. @@ -92,7 +92,7 @@ where TCodec: Codec + Send + Clone + 'static, { pub(super) fn new( - inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, + inbound_protocols: SmallVec<[StreamProtocol; 2]>, codec: TCodec, substream_timeout: Duration, inbound_request_id: Arc, @@ -349,7 +349,7 @@ impl fmt::Debug for Event { pub struct OutboundMessage { pub(crate) request_id: OutboundRequestId, pub(crate) request: TCodec::Request, - pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, + pub(crate) protocols: SmallVec<[StreamProtocol; 2]>, } impl fmt::Debug for OutboundMessage @@ -367,8 +367,8 @@ where { type FromBehaviour = OutboundMessage; type ToBehaviour = Event; - type InboundProtocol = Protocol; - type OutboundProtocol = Protocol; + type InboundProtocol = Protocol; + type OutboundProtocol = Protocol; type OutboundOpenInfo = (); type InboundOpenInfo = (); @@ -389,7 +389,7 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll, (), Self::ToBehaviour>> { + ) -> Poll, (), Self::ToBehaviour>> { match self.worker_streams.poll_unpin(cx) { Poll::Ready((_, Ok(Ok(event)))) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); diff --git a/protocols/request-response/src/json.rs b/protocols/request-response/src/json.rs index 0b3d634573b..657928263c4 100644 --- a/protocols/request-response/src/json.rs +++ b/protocols/request-response/src/json.rs @@ -79,11 +79,10 @@ mod codec { Req: Send + Serialize + DeserializeOwned, Resp: Send + Serialize + DeserializeOwned, { - type Protocol = StreamProtocol; type Request = Req; type Response = Resp; - async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + async fn read_request(&mut self, _: &StreamProtocol, io: &mut T) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -94,7 +93,7 @@ mod codec { Ok(serde_json::from_slice(vec.as_slice())?) } - async fn read_response(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + async fn read_response(&mut self, _: &StreamProtocol, io: &mut T) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -107,7 +106,7 @@ mod codec { async fn write_request( &mut self, - _: &Self::Protocol, + _: &StreamProtocol, io: &mut T, req: Self::Request, ) -> io::Result<()> @@ -123,7 +122,7 @@ mod codec { async fn write_response( &mut self, - _: &Self::Protocol, + _: &StreamProtocol, io: &mut T, resp: Self::Response, ) -> io::Result<()> diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 9737663a876..3e95ebb7cc9 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -84,8 +84,8 @@ use libp2p_identity::PeerId; use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, DialFailure, FromSwarm}, dial_opts::DialOpts, - ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, - THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler, + StreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use smallvec::SmallVec; use std::{ @@ -339,9 +339,9 @@ where TCodec: Codec + Clone + Send + 'static, { /// The supported inbound protocols. - inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, + inbound_protocols: SmallVec<[StreamProtocol; 2]>, /// The supported outbound protocols. - outbound_protocols: SmallVec<[TCodec::Protocol; 2]>, + outbound_protocols: SmallVec<[StreamProtocol; 2]>, /// The next (local) request ID. next_outbound_request_id: OutboundRequestId, /// The next (inbound) request ID. @@ -370,7 +370,7 @@ where /// Creates a new `Behaviour` for the given protocols and configuration, using [`Default`] to construct the codec. pub fn new(protocols: I, cfg: Config) -> Self where - I: IntoIterator, + I: IntoIterator, { Self::with_codec(TCodec::default(), protocols, cfg) } @@ -384,7 +384,7 @@ where /// protocols, codec and configuration. pub fn with_codec(codec: TCodec, protocols: I, cfg: Config) -> Self where - I: IntoIterator, + I: IntoIterator, { let mut inbound_protocols = SmallVec::new(); let mut outbound_protocols = SmallVec::new(); diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index 2dc82b2e0c5..e8815ad3c73 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -365,13 +365,12 @@ impl TryFrom for Action { #[async_trait] impl Codec for TestCodec { - type Protocol = StreamProtocol; type Request = Action; type Response = Action; async fn read_request( &mut self, - _protocol: &Self::Protocol, + _protocol: &StreamProtocol, io: &mut T, ) -> io::Result where @@ -396,7 +395,7 @@ impl Codec for TestCodec { async fn read_response( &mut self, - _protocol: &Self::Protocol, + _protocol: &StreamProtocol, io: &mut T, ) -> io::Result where @@ -424,7 +423,7 @@ impl Codec for TestCodec { async fn write_request( &mut self, - _protocol: &Self::Protocol, + _protocol: &StreamProtocol, io: &mut T, req: Self::Request, ) -> io::Result<()> @@ -445,7 +444,7 @@ impl Codec for TestCodec { async fn write_response( &mut self, - _protocol: &Self::Protocol, + _protocol: &StreamProtocol, io: &mut T, res: Self::Response, ) -> io::Result<()> From a7cbae66741216455b9350bdd55940797ddb3958 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 9 Nov 2023 16:22:31 +1100 Subject: [PATCH 5/8] Introduce `SeveralProtocols` --- protocols/request-response/src/handler.rs | 18 +++---- .../request-response/src/handler/protocol.rs | 51 ------------------- swarm/src/connection.rs | 41 ++------------- swarm/src/lib.rs | 2 +- swarm/src/upgrade.rs | 1 + swarm/src/upgrade/several.rs | 44 ++++++++++++++++ 6 files changed, 55 insertions(+), 102 deletions(-) create mode 100644 swarm/src/upgrade/several.rs diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 80abafba580..238fff07399 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -23,7 +23,6 @@ pub(crate) mod protocol; pub use protocol::ProtocolSupport; use crate::codec::Codec; -use crate::handler::protocol::Protocol; use crate::{InboundRequestId, OutboundRequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use futures::channel::mpsc; @@ -34,7 +33,7 @@ use libp2p_swarm::handler::{ }; use libp2p_swarm::{ handler::{ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError}, - StreamProtocol, SubstreamProtocol, + SeveralProtocols, StreamProtocol, SubstreamProtocol, }; use smallvec::SmallVec; use std::{ @@ -367,18 +366,13 @@ where { type FromBehaviour = OutboundMessage; type ToBehaviour = Event; - type InboundProtocol = Protocol; - type OutboundProtocol = Protocol; + type InboundProtocol = SeveralProtocols; + type OutboundProtocol = SeveralProtocols; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new( - Protocol { - protocols: self.inbound_protocols.clone(), - }, - (), - ) + SubstreamProtocol::new(SeveralProtocols::new(self.inbound_protocols.to_vec()), ()) } fn on_behaviour_event(&mut self, request: Self::FromBehaviour) { @@ -389,7 +383,7 @@ where fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll, (), Self::ToBehaviour>> { + ) -> Poll> { match self.worker_streams.poll_unpin(cx) { Poll::Ready((_, Ok(Ok(event)))) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); @@ -447,7 +441,7 @@ where self.requested_outbound.push_back(request); return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(Protocol { protocols }, ()), + protocol: SubstreamProtocol::new(SeveralProtocols::new(protocols.to_vec()), ()), }); } diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs index 833cacdd6ce..f77bd10c390 100644 --- a/protocols/request-response/src/handler/protocol.rs +++ b/protocols/request-response/src/handler/protocol.rs @@ -23,11 +23,6 @@ //! receives a request and sends a response, whereas the //! outbound upgrade send a request and receives a response. -use futures::future::{ready, Ready}; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use libp2p_swarm::Stream; -use smallvec::SmallVec; - /// The level of support for a particular protocol. #[derive(Debug, Clone)] pub enum ProtocolSupport { @@ -56,49 +51,3 @@ impl ProtocolSupport { } } } - -/// Response substream upgrade protocol. -/// -/// Receives a request and sends a response. -#[derive(Debug)] -pub struct Protocol

{ - pub(crate) protocols: SmallVec<[P; 2]>, -} - -impl

UpgradeInfo for Protocol

-where - P: AsRef + Clone, -{ - type Info = P; - type InfoIter = smallvec::IntoIter<[Self::Info; 2]>; - - fn protocol_info(&self) -> Self::InfoIter { - self.protocols.clone().into_iter() - } -} - -impl

InboundUpgrade for Protocol

-where - P: AsRef + Clone, -{ - type Output = (Stream, P); - type Error = void::Void; - type Future = Ready>; - - fn upgrade_inbound(self, io: Stream, protocol: Self::Info) -> Self::Future { - ready(Ok((io, protocol))) - } -} - -impl

OutboundUpgrade for Protocol

-where - P: AsRef + Clone, -{ - type Output = (Stream, P); - type Error = void::Void; - type Future = Ready>; - - fn upgrade_outbound(self, io: Stream, protocol: Self::Info) -> Self::Future { - ready(Ok((io, protocol))) - } -} diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 15c49bb7bd5..107a7044273 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -740,7 +740,7 @@ enum Shutdown { #[cfg(test)] mod tests { use super::*; - use crate::dummy; + use crate::{dummy, SeveralProtocols}; use futures::future; use futures::AsyncRead; use futures::AsyncWrite; @@ -1214,7 +1214,7 @@ mod tests { impl ConnectionHandler for ConfigurableProtocolConnectionHandler { type FromBehaviour = Void; type ToBehaviour = Void; - type InboundProtocol = ManyProtocolsUpgrade; + type InboundProtocol = SeveralProtocols; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); type OutboundOpenInfo = (); @@ -1223,9 +1223,7 @@ mod tests { &self, ) -> SubstreamProtocol { SubstreamProtocol::new( - ManyProtocolsUpgrade { - protocols: Vec::from_iter(self.active_protocols.clone()), - }, + SeveralProtocols::new(Vec::from_iter(self.active_protocols.clone())), (), ) } @@ -1281,39 +1279,6 @@ mod tests { Poll::Pending } } - - struct ManyProtocolsUpgrade { - protocols: Vec, - } - - impl UpgradeInfo for ManyProtocolsUpgrade { - type Info = StreamProtocol; - type InfoIter = std::vec::IntoIter; - - fn protocol_info(&self) -> Self::InfoIter { - self.protocols.clone().into_iter() - } - } - - impl InboundUpgrade for ManyProtocolsUpgrade { - type Output = C; - type Error = Void; - type Future = future::Ready>; - - fn upgrade_inbound(self, stream: C, _: Self::Info) -> Self::Future { - future::ready(Ok(stream)) - } - } - - impl OutboundUpgrade for ManyProtocolsUpgrade { - type Output = C; - type Error = Void; - type Future = future::Ready>; - - fn upgrade_outbound(self, stream: C, _: Self::Info) -> Self::Future { - future::ready(Ok(stream)) - } - } } /// The endpoint roles associated with a pending peer-to-peer connection. diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index f94dfa3ee2d..5bd05bc9620 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -123,7 +123,7 @@ pub use libp2p_swarm_derive::NetworkBehaviour; pub use listen_opts::ListenOpts; pub use stream::Stream; pub use stream_protocol::{InvalidProtocol, StreamProtocol}; -pub use upgrade::single::SingleProtocol; +pub use upgrade::{several::SeveralProtocols, single::SingleProtocol}; use crate::behaviour::ExternalAddrConfirmed; use crate::handler::UpgradeInfoSend; diff --git a/swarm/src/upgrade.rs b/swarm/src/upgrade.rs index a5ef11e6e16..8a906a21227 100644 --- a/swarm/src/upgrade.rs +++ b/swarm/src/upgrade.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +pub(crate) mod several; pub(crate) mod single; use crate::Stream; diff --git a/swarm/src/upgrade/several.rs b/swarm/src/upgrade/several.rs new file mode 100644 index 00000000000..bd8d601b3ff --- /dev/null +++ b/swarm/src/upgrade/several.rs @@ -0,0 +1,44 @@ +use crate::{Stream, StreamProtocol}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use std::future::{ready, Ready}; +use std::vec; +use void::Void; + +pub struct SeveralProtocols { + protocols: Vec, +} + +impl SeveralProtocols { + pub fn new(protocols: Vec) -> Self { + Self { protocols } + } +} + +impl UpgradeInfo for SeveralProtocols { + type Info = StreamProtocol; + type InfoIter = vec::IntoIter; + + fn protocol_info(&self) -> Self::InfoIter { + self.protocols.clone().into_iter() + } +} + +impl InboundUpgrade for SeveralProtocols { + type Output = (Stream, StreamProtocol); + type Error = Void; + type Future = Ready>; + + fn upgrade_inbound(self, socket: Stream, protocol: Self::Info) -> Self::Future { + ready(Ok((socket, protocol))) + } +} + +impl OutboundUpgrade for SeveralProtocols { + type Output = (Stream, StreamProtocol); + type Error = Void; + type Future = Ready>; + + fn upgrade_outbound(self, socket: Stream, protocol: Self::Info) -> Self::Future { + ready(Ok((socket, protocol))) + } +} From a3527a954675ea869c4440b33606a1dafc58d8d3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 9 Nov 2023 16:31:52 +1100 Subject: [PATCH 6/8] Migrate `libp2p-kad` to `SeveralProtocols` --- protocols/kad/src/behaviour.rs | 27 +++++----- protocols/kad/src/handler.rs | 59 ++++++++++++++-------- protocols/kad/src/protocol.rs | 90 ++-------------------------------- 3 files changed, 59 insertions(+), 117 deletions(-) diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index fc942cf635a..2e9c53cef78 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -26,7 +26,7 @@ use crate::addresses::Addresses; use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId}; use crate::jobs::*; use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus}; -use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig}; +use crate::protocol::{ConnectionType, KadPeer, DEFAULT_MAX_PACKET_SIZE, DEFAULT_PROTO_NAME}; use crate::query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState}; use crate::record::{ self, @@ -68,8 +68,8 @@ pub struct Behaviour { /// The k-bucket insertion strategy. kbucket_inserts: BucketInserts, - /// Configuration of the wire protocol. - protocol_config: ProtocolConfig, + protocol_names: Vec, + max_packet_size: usize, /// Configuration of [`RecordStore`] filtering. record_filtering: StoreInserts, @@ -172,7 +172,8 @@ pub enum StoreInserts { pub struct Config { kbucket_pending_timeout: Duration, query_config: QueryConfig, - protocol_config: ProtocolConfig, + protocol_names: Vec, + max_packet_size: usize, record_ttl: Option, record_replication_interval: Option, record_publication_interval: Option, @@ -188,7 +189,7 @@ impl Default for Config { Config { kbucket_pending_timeout: Duration::from_secs(60), query_config: QueryConfig::default(), - protocol_config: Default::default(), + protocol_names: vec![DEFAULT_PROTO_NAME], record_ttl: Some(Duration::from_secs(36 * 60 * 60)), record_replication_interval: Some(Duration::from_secs(60 * 60)), record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)), @@ -197,6 +198,7 @@ impl Default for Config { provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)), kbucket_inserts: BucketInserts::OnConnected, caching: Caching::Enabled { max_peers: 1 }, + max_packet_size: DEFAULT_MAX_PACKET_SIZE, } } } @@ -227,7 +229,7 @@ impl Config { /// be able to talk to other nodes supporting any of the provided names. /// Multiple names must be used with caution to avoid network partitioning. pub fn set_protocol_names(&mut self, names: Vec) -> &mut Self { - self.protocol_config.set_protocol_names(names); + self.protocol_names = names; self } @@ -371,7 +373,7 @@ impl Config { /// It might be necessary to increase this value if trying to put large /// records. pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self { - self.protocol_config.set_max_packet_size(size); + self.max_packet_size = size; self } @@ -404,7 +406,7 @@ where /// Get the protocol name of this kademlia instance. pub fn protocol_names(&self) -> &[StreamProtocol] { - self.protocol_config.protocol_names() + self.protocol_names.as_slice() } /// Creates a new `Kademlia` network behaviour with the given configuration. @@ -432,7 +434,8 @@ where caching: config.caching, kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout), kbucket_inserts: config.kbucket_inserts, - protocol_config: config.protocol_config, + protocol_names: config.protocol_names, + max_packet_size: config.max_packet_size, record_filtering: config.record_filtering, queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()), listen_addresses: Default::default(), @@ -2082,7 +2085,8 @@ where }; let mut handler = Handler::new( - self.protocol_config.clone(), + self.protocol_names.clone(), + self.max_packet_size, connected_point, peer, self.mode, @@ -2105,7 +2109,8 @@ where }; let mut handler = Handler::new( - self.protocol_config.clone(), + self.protocol_names.clone(), + self.max_packet_size, connected_point, peer, self.mode, diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index adfb076541c..14572037f72 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -20,10 +20,11 @@ use crate::behaviour::Mode; use crate::protocol::{ - KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg, ProtocolConfig, + Codec, KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg, }; use crate::record::{self, Record}; use crate::QueryId; +use asynchronous_codec::Framed; use either::Either; use futures::prelude::*; use futures::stream::SelectAll; @@ -33,12 +34,13 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, Stream, StreamUpgradeError, SubstreamProtocol, - SupportedProtocols, + ConnectionHandler, ConnectionHandlerEvent, SeveralProtocols, Stream, StreamProtocol, + StreamUpgradeError, SubstreamProtocol, SupportedProtocols, }; use std::collections::VecDeque; use std::task::Waker; use std::{error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll}; +use void::Void; const MAX_NUM_SUBSTREAMS: usize = 32; @@ -50,8 +52,8 @@ const MAX_NUM_SUBSTREAMS: usize = 32; /// /// It also handles requests made by the remote. pub struct Handler { - /// Configuration of the wire protocol. - protocol_config: ProtocolConfig, + protocol_names: Vec, + max_packet_size: usize, /// In client mode, we don't accept inbound substreams. mode: Mode, @@ -293,7 +295,7 @@ pub enum HandlerEvent { #[derive(Debug)] pub enum HandlerQueryErr { /// Error while trying to perform the query. - Upgrade(StreamUpgradeError), + Upgrade(StreamUpgradeError), /// Received an answer that doesn't correspond to the request. UnexpectedMessage, /// I/O error in the substream. @@ -329,8 +331,8 @@ impl error::Error for HandlerQueryErr { } } -impl From> for HandlerQueryErr { - fn from(err: StreamUpgradeError) -> Self { +impl From> for HandlerQueryErr { + fn from(err: StreamUpgradeError) -> Self { HandlerQueryErr::Upgrade(err) } } @@ -451,7 +453,8 @@ struct UniqueConnecId(u64); impl Handler { pub fn new( - protocol_config: ProtocolConfig, + protocol_names: Vec, + max_packet_size: usize, endpoint: ConnectedPoint, remote_peer_id: PeerId, mode: Mode, @@ -474,7 +477,8 @@ impl Handler { } Handler { - protocol_config, + protocol_names, + max_packet_size, mode, endpoint, remote_peer_id, @@ -490,14 +494,21 @@ impl Handler { fn on_fully_negotiated_outbound( &mut self, - FullyNegotiatedOutbound { protocol, info: () }: FullyNegotiatedOutbound< + FullyNegotiatedOutbound { + protocol: (stream, _), + info: (), + }: FullyNegotiatedOutbound< ::OutboundProtocol, ::OutboundOpenInfo, >, ) { if let Some((msg, query_id)) = self.pending_messages.pop_front() { self.outbound_substreams - .push(OutboundSubstreamState::PendingSend(protocol, msg, query_id)); + .push(OutboundSubstreamState::PendingSend( + Framed::new(stream, Codec::new(self.max_packet_size)), + msg, + query_id, + )); } else { debug_assert!(false, "Requested outbound stream without message") } @@ -525,7 +536,7 @@ impl Handler { // If `self.allow_listening` is false, then we produced a `DeniedUpgrade` and `protocol` // is a `Void`. let protocol = match protocol { - future::Either::Left(p) => p, + future::Either::Left((p, _)) => p, future::Either::Right(p) => void::unreachable(p), }; @@ -569,7 +580,7 @@ impl Handler { .push(InboundSubstreamState::WaitingMessage { first: true, connection_id: connec_unique_id, - substream: protocol, + substream: Framed::new(protocol, Codec::new(self.max_packet_size)), }); } @@ -597,14 +608,17 @@ impl Handler { impl ConnectionHandler for Handler { type FromBehaviour = HandlerIn; type ToBehaviour = HandlerEvent; - type InboundProtocol = Either; - type OutboundProtocol = ProtocolConfig; + type InboundProtocol = Either; + type OutboundProtocol = SeveralProtocols; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { match self.mode { - Mode::Server => SubstreamProtocol::new(Either::Left(self.protocol_config.clone()), ()), + Mode::Server => SubstreamProtocol::new( + Either::Left(SeveralProtocols::new(self.protocol_names.clone())), + (), + ), Mode::Client => SubstreamProtocol::new(Either::Right(upgrade::DeniedUpgrade), ()), } } @@ -745,7 +759,10 @@ impl ConnectionHandler for Handler { { self.num_requested_outbound_streams += 1; return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.protocol_config.clone(), ()), + protocol: SubstreamProtocol::new( + SeveralProtocols::new(self.protocol_names.clone()), + (), + ), }); } @@ -778,7 +795,7 @@ impl ConnectionHandler for Handler { let remote_supports_our_kademlia_protocols = self .remote_supported_protocols .iter() - .any(|p| self.protocol_config.protocol_names().contains(p)); + .any(|p| self.protocol_names.contains(p)); self.protocol_status = Some(compute_new_protocol_status( remote_supports_our_kademlia_protocols, @@ -840,7 +857,7 @@ impl Handler { } impl futures::Stream for OutboundSubstreamState { - type Item = ConnectionHandlerEvent; + type Item = ConnectionHandlerEvent; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -972,7 +989,7 @@ impl futures::Stream for OutboundSubstreamState { } impl futures::Stream for InboundSubstreamState { - type Item = ConnectionHandlerEvent; + type Item = ConnectionHandlerEvent; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 247b12bb4cd..52da111a258 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -30,15 +30,13 @@ use crate::proto; use crate::record::{self, Record}; use asynchronous_codec::{Decoder, Encoder, Framed}; use bytes::BytesMut; -use futures::prelude::*; use instant::Instant; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_swarm::StreamProtocol; +use std::io; use std::marker::PhantomData; use std::{convert::TryFrom, time::Duration}; -use std::{io, iter}; /// The protocol name used for negotiating with multistream-select. pub(crate) const DEFAULT_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0"); @@ -129,61 +127,13 @@ impl From for proto::Peer { } } -/// Configuration for a Kademlia connection upgrade. When applied to a connection, turns this -/// connection into a `Stream + Sink` whose items are of type `KadRequestMsg` and `KadResponseMsg`. -// TODO: if, as suspected, we can confirm with Protocol Labs that each open Kademlia substream does -// only one request, then we can change the output of the `InboundUpgrade` and -// `OutboundUpgrade` to be just a single message -#[derive(Debug, Clone)] -pub struct ProtocolConfig { - protocol_names: Vec, - /// Maximum allowed size of a packet. - max_packet_size: usize, -} - -impl ProtocolConfig { - /// Returns the configured protocol name. - pub fn protocol_names(&self) -> &[StreamProtocol] { - &self.protocol_names - } - - /// Modifies the protocol names used on the wire. Can be used to create incompatibilities - /// between networks on purpose. - pub fn set_protocol_names(&mut self, names: Vec) { - self.protocol_names = names; - } - - /// Modifies the maximum allowed size of a single Kademlia packet. - pub fn set_max_packet_size(&mut self, size: usize) { - self.max_packet_size = size; - } -} - -impl Default for ProtocolConfig { - fn default() -> Self { - ProtocolConfig { - protocol_names: iter::once(DEFAULT_PROTO_NAME).collect(), - max_packet_size: DEFAULT_MAX_PACKET_SIZE, - } - } -} - -impl UpgradeInfo for ProtocolConfig { - type Info = StreamProtocol; - type InfoIter = std::vec::IntoIter; - - fn protocol_info(&self) -> Self::InfoIter { - self.protocol_names.clone().into_iter() - } -} - /// Codec for Kademlia inbound and outbound message framing. -pub struct Codec { +pub(crate) struct Codec { codec: quick_protobuf_codec::Codec, __phantom: PhantomData<(A, B)>, } impl Codec { - fn new(max_packet_size: usize) -> Self { + pub(crate) fn new(max_packet_size: usize) -> Self { Codec { codec: quick_protobuf_codec::Codec::new(max_packet_size), __phantom: PhantomData, @@ -213,39 +163,9 @@ pub(crate) type KadInStreamSink = Framed = Framed>; -impl InboundUpgrade for ProtocolConfig -where - C: AsyncRead + AsyncWrite + Unpin, -{ - type Output = KadInStreamSink; - type Future = future::Ready>; - type Error = io::Error; - - fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future { - let codec = Codec::new(self.max_packet_size); - - future::ok(Framed::new(incoming, codec)) - } -} - -impl OutboundUpgrade for ProtocolConfig -where - C: AsyncRead + AsyncWrite + Unpin, -{ - type Output = KadOutStreamSink; - type Future = future::Ready>; - type Error = io::Error; - - fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future { - let codec = Codec::new(self.max_packet_size); - - future::ok(Framed::new(incoming, codec)) - } -} - /// Request that we can send to a peer or that we received from a peer. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum KadRequestMsg { +pub(crate) enum KadRequestMsg { /// Ping request. Ping, @@ -283,7 +203,7 @@ pub enum KadRequestMsg { /// Response that we can send to a peer or that we received from a peer. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum KadResponseMsg { +pub(crate) enum KadResponseMsg { /// Ping response. Pong, From 2a72de53081b77dcbeb7fd221d5341f41930b669 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 9 Nov 2023 16:55:20 +1100 Subject: [PATCH 7/8] Replace `DeniedUpgrade` with `NoProtocols` --- core/src/upgrade.rs | 3 +- core/src/upgrade/denied.rs | 58 -------------------------- protocols/dcutr/src/handler/relayed.rs | 7 ++-- protocols/gossipsub/src/handler.rs | 7 ++-- protocols/kad/src/handler.rs | 8 ++-- protocols/perf/src/client/handler.rs | 8 ++-- protocols/perf/src/server/handler.rs | 6 +-- swarm/src/behaviour/toggle.rs | 9 ++-- swarm/src/connection.rs | 16 ++++--- swarm/src/dummy.rs | 11 +++-- swarm/src/handler/one_shot.rs | 6 +-- swarm/src/lib.rs | 2 +- swarm/src/upgrade.rs | 1 + swarm/src/upgrade/no.rs | 43 +++++++++++++++++++ swarm/tests/connection_close.rs | 9 ++-- 15 files changed, 87 insertions(+), 107 deletions(-) delete mode 100644 core/src/upgrade/denied.rs create mode 100644 swarm/src/upgrade/no.rs diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index e5ebf657924..fca5dd5ab68 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -58,7 +58,6 @@ //! mod apply; -mod denied; mod either; mod error; mod ready; @@ -71,7 +70,7 @@ pub(crate) use error::UpgradeError; use futures::future::Future; #[allow(deprecated)] -pub use self::{denied::DeniedUpgrade, ready::ReadyUpgrade, select::SelectUpgrade}; +pub use self::{ready::ReadyUpgrade, select::SelectUpgrade}; pub use crate::Negotiated; pub use multistream_select::{NegotiatedComplete, NegotiationError, ProtocolError, Version}; diff --git a/core/src/upgrade/denied.rs b/core/src/upgrade/denied.rs deleted file mode 100644 index 353a184822d..00000000000 --- a/core/src/upgrade/denied.rs +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2017-2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use futures::future; -use std::iter; -use void::Void; - -/// Dummy implementation of `UpgradeInfo`/`InboundUpgrade`/`OutboundUpgrade` that doesn't support -/// any protocol. -#[derive(Debug, Copy, Clone)] -pub struct DeniedUpgrade; - -impl UpgradeInfo for DeniedUpgrade { - type Info = &'static str; - type InfoIter = iter::Empty; - - fn protocol_info(&self) -> Self::InfoIter { - iter::empty() - } -} - -impl InboundUpgrade for DeniedUpgrade { - type Output = Void; - type Error = Void; - type Future = future::Pending>; - - fn upgrade_inbound(self, _: C, _: Self::Info) -> Self::Future { - future::pending() - } -} - -impl OutboundUpgrade for DeniedUpgrade { - type Output = Void; - type Error = Void; - type Future = future::Pending>; - - fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future { - future::pending() - } -} diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 7b8c385aed5..b8072173745 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -25,14 +25,13 @@ use crate::{protocol, PROTOCOL_NAME}; use either::Either; use futures::future; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::ConnectedPoint; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, SingleProtocol, StreamProtocol, StreamUpgradeError, + ConnectionHandler, ConnectionHandlerEvent, NoProtocols, SingleProtocol, StreamUpgradeError, SubstreamProtocol, }; use protocol::{inbound, outbound}; @@ -180,7 +179,7 @@ impl Handler { impl ConnectionHandler for Handler { type FromBehaviour = Command; type ToBehaviour = Event; - type InboundProtocol = Either; + type InboundProtocol = Either; type OutboundProtocol = SingleProtocol; type OutboundOpenInfo = (); type InboundOpenInfo = (); @@ -196,7 +195,7 @@ impl ConnectionHandler for Handler { // the relayed connection opens a substream to the dialing side. (Connection roles // and substream roles are reversed.) The listening side on a relayed connection // never expects incoming substreams, hence the denied upgrade below. - SubstreamProtocol::new(Either::Right(DeniedUpgrade), ()) + SubstreamProtocol::new(Either::Right(NoProtocols::new()), ()) } } } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 63ef96781d9..cd93f4fd578 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -27,12 +27,11 @@ use futures::future::Either; use futures::prelude::*; use futures::StreamExt; use instant::Instant; -use libp2p_core::upgrade::DeniedUpgrade; use libp2p_swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, }; -use libp2p_swarm::Stream; +use libp2p_swarm::{NoProtocols, Stream}; use smallvec::SmallVec; use std::{ pin::Pin, @@ -390,7 +389,7 @@ impl ConnectionHandler for Handler { type FromBehaviour = HandlerIn; type ToBehaviour = HandlerEvent; type InboundOpenInfo = (); - type InboundProtocol = either::Either; + type InboundProtocol = either::Either; type OutboundOpenInfo = (); type OutboundProtocol = ProtocolConfig; @@ -400,7 +399,7 @@ impl ConnectionHandler for Handler { SubstreamProtocol::new(either::Either::Left(handler.listen_protocol.clone()), ()) } Handler::Disabled(_) => { - SubstreamProtocol::new(either::Either::Right(DeniedUpgrade), ()) + SubstreamProtocol::new(either::Either::Right(NoProtocols::new()), ()) } } } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 14572037f72..684dcccb403 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -34,8 +34,8 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, SeveralProtocols, Stream, StreamProtocol, - StreamUpgradeError, SubstreamProtocol, SupportedProtocols, + ConnectionHandler, ConnectionHandlerEvent, NoProtocols, SeveralProtocols, Stream, + StreamProtocol, StreamUpgradeError, SubstreamProtocol, SupportedProtocols, }; use std::collections::VecDeque; use std::task::Waker; @@ -608,7 +608,7 @@ impl Handler { impl ConnectionHandler for Handler { type FromBehaviour = HandlerIn; type ToBehaviour = HandlerEvent; - type InboundProtocol = Either; + type InboundProtocol = Either; type OutboundProtocol = SeveralProtocols; type OutboundOpenInfo = (); type InboundOpenInfo = (); @@ -619,7 +619,7 @@ impl ConnectionHandler for Handler { Either::Left(SeveralProtocols::new(self.protocol_names.clone())), (), ), - Mode::Client => SubstreamProtocol::new(Either::Right(upgrade::DeniedUpgrade), ()), + Mode::Client => SubstreamProtocol::new(Either::Right(NoProtocols::new()), ()), } } diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index 2a2c5499fc2..5e859918843 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -27,13 +27,13 @@ use futures::{ stream::{BoxStream, SelectAll}, StreamExt, }; -use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; +use libp2p_core::upgrade::ReadyUpgrade; use libp2p_swarm::{ handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }, - ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, NoProtocols, StreamProtocol, SubstreamProtocol, }; use crate::client::{RunError, RunId}; @@ -85,13 +85,13 @@ impl Default for Handler { impl ConnectionHandler for Handler { type FromBehaviour = Command; type ToBehaviour = Event; - type InboundProtocol = DeniedUpgrade; + type InboundProtocol = NoProtocols; type OutboundProtocol = ReadyUpgrade; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(DeniedUpgrade, ()) + SubstreamProtocol::new(NoProtocols::new(), ()) } fn on_behaviour_event(&mut self, command: Self::FromBehaviour) { diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index ddfe8f881e5..03e20ee54eb 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -21,13 +21,13 @@ use std::task::{Context, Poll}; use futures::FutureExt; -use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; +use libp2p_core::upgrade::ReadyUpgrade; use libp2p_swarm::{ handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }, - ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, NoProtocols, StreamProtocol, SubstreamProtocol, }; use tracing::error; use void::Void; @@ -64,7 +64,7 @@ impl ConnectionHandler for Handler { type FromBehaviour = Void; type ToBehaviour = Event; type InboundProtocol = ReadyUpgrade; - type OutboundProtocol = DeniedUpgrade; + type OutboundProtocol = NoProtocols; type OutboundOpenInfo = Void; type InboundOpenInfo = (); diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index e81c5343701..966a38255a8 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -26,11 +26,12 @@ use crate::handler::{ }; use crate::upgrade::SendWrapper; use crate::{ - ConnectionDenied, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionDenied, NetworkBehaviour, NoProtocols, THandler, THandlerInEvent, THandlerOutEvent, + ToSwarm, }; use either::Either; use futures::future; -use libp2p_core::{upgrade::DeniedUpgrade, Endpoint, Multiaddr}; +use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; use std::{task::Context, task::Poll}; @@ -264,7 +265,7 @@ where { type FromBehaviour = TInner::FromBehaviour; type ToBehaviour = TInner::ToBehaviour; - type InboundProtocol = Either, SendWrapper>; + type InboundProtocol = Either, NoProtocols>; type OutboundProtocol = TInner::OutboundProtocol; type OutboundOpenInfo = TInner::OutboundOpenInfo; type InboundOpenInfo = Either; @@ -276,7 +277,7 @@ where .map_upgrade(|u| Either::Left(SendWrapper(u))) .map_info(Either::Left) } else { - SubstreamProtocol::new(Either::Right(SendWrapper(DeniedUpgrade)), Either::Right(())) + SubstreamProtocol::new(Either::Right(NoProtocols::new()), Either::Right(())) } } diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 107a7044273..9434489d85a 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -740,11 +740,9 @@ enum Shutdown { #[cfg(test)] mod tests { use super::*; - use crate::{dummy, SeveralProtocols}; - use futures::future; + use crate::{dummy, NoProtocols, SeveralProtocols}; use futures::AsyncRead; use futures::AsyncWrite; - use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::StreamMuxer; use quickcheck::*; use std::sync::{Arc, Weak}; @@ -1107,7 +1105,7 @@ mod tests { #[derive(Default)] struct ConfigurableProtocolConnectionHandler { - events: Vec>, + events: Vec>, active_protocols: HashSet, local_added: Vec>, local_removed: Vec>, @@ -1142,15 +1140,15 @@ mod tests { impl ConnectionHandler for MockConnectionHandler { type FromBehaviour = Void; type ToBehaviour = Void; - type InboundProtocol = DeniedUpgrade; - type OutboundProtocol = DeniedUpgrade; + type InboundProtocol = NoProtocols; + type OutboundProtocol = NoProtocols; type InboundOpenInfo = (); type OutboundOpenInfo = (); fn listen_protocol( &self, ) -> SubstreamProtocol { - SubstreamProtocol::new(DeniedUpgrade, ()).with_timeout(self.upgrade_timeout) + SubstreamProtocol::new(NoProtocols::new(), ()).with_timeout(self.upgrade_timeout) } fn on_connection_event( @@ -1202,7 +1200,7 @@ mod tests { if self.outbound_requested { self.outbound_requested = false; return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(DeniedUpgrade, ()) + protocol: SubstreamProtocol::new(NoProtocols::new(), ()) .with_timeout(self.upgrade_timeout), }); } @@ -1215,7 +1213,7 @@ mod tests { type FromBehaviour = Void; type ToBehaviour = Void; type InboundProtocol = SeveralProtocols; - type OutboundProtocol = DeniedUpgrade; + type OutboundProtocol = NoProtocols; type InboundOpenInfo = (); type OutboundOpenInfo = (); diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 86df676443b..d4eb8016f75 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -4,10 +4,9 @@ use crate::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use crate::{ - ConnectionDenied, ConnectionHandlerEvent, StreamUpgradeError, SubstreamProtocol, THandler, - THandlerInEvent, THandlerOutEvent, + ConnectionDenied, ConnectionHandlerEvent, NoProtocols, StreamUpgradeError, SubstreamProtocol, + THandler, THandlerInEvent, THandlerOutEvent, }; -use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::Endpoint; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; @@ -64,13 +63,13 @@ pub struct ConnectionHandler; impl crate::handler::ConnectionHandler for ConnectionHandler { type FromBehaviour = Void; type ToBehaviour = Void; - type InboundProtocol = DeniedUpgrade; - type OutboundProtocol = DeniedUpgrade; + type InboundProtocol = NoProtocols; + type OutboundProtocol = NoProtocols; type InboundOpenInfo = (); type OutboundOpenInfo = Void; fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(DeniedUpgrade, ()) + SubstreamProtocol::new(NoProtocols::new(), ()) } fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index b1fc41e9098..754bd35eaa5 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -214,15 +214,15 @@ impl Default for OneShotHandlerConfig { mod tests { use super::*; + use crate::NoProtocols; use futures::executor::block_on; use futures::future::poll_fn; - use libp2p_core::upgrade::DeniedUpgrade; use void::Void; #[test] fn do_not_keep_idle_connection_alive() { - let mut handler: OneShotHandler<_, DeniedUpgrade, Void> = OneShotHandler::new( - SubstreamProtocol::new(DeniedUpgrade {}, ()), + let mut handler: OneShotHandler<_, NoProtocols, Void> = OneShotHandler::new( + SubstreamProtocol::new(NoProtocols::new(), ()), Default::default(), ); diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 5bd05bc9620..bdc0fb68b9f 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -123,7 +123,7 @@ pub use libp2p_swarm_derive::NetworkBehaviour; pub use listen_opts::ListenOpts; pub use stream::Stream; pub use stream_protocol::{InvalidProtocol, StreamProtocol}; -pub use upgrade::{several::SeveralProtocols, single::SingleProtocol}; +pub use upgrade::{no::NoProtocols, several::SeveralProtocols, single::SingleProtocol}; use crate::behaviour::ExternalAddrConfirmed; use crate::handler::UpgradeInfoSend; diff --git a/swarm/src/upgrade.rs b/swarm/src/upgrade.rs index 8a906a21227..8c6a1350f0d 100644 --- a/swarm/src/upgrade.rs +++ b/swarm/src/upgrade.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +pub(crate) mod no; pub(crate) mod several; pub(crate) mod single; diff --git a/swarm/src/upgrade/no.rs b/swarm/src/upgrade/no.rs new file mode 100644 index 00000000000..55a9fff0996 --- /dev/null +++ b/swarm/src/upgrade/no.rs @@ -0,0 +1,43 @@ +use crate::{Stream, StreamProtocol}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use std::future::{pending, Pending}; +use std::iter; +use void::Void; + +#[derive(Debug, Clone, Copy)] +pub struct NoProtocols {} + +impl NoProtocols { + pub fn new() -> Self { + Self {} + } +} + +impl UpgradeInfo for NoProtocols { + type Info = StreamProtocol; + type InfoIter = iter::Empty; + + fn protocol_info(&self) -> Self::InfoIter { + iter::empty() + } +} + +impl InboundUpgrade for NoProtocols { + type Output = Void; + type Error = Void; + type Future = Pending>; + + fn upgrade_inbound(self, _: Stream, _: Self::Info) -> Self::Future { + pending() + } +} + +impl OutboundUpgrade for NoProtocols { + type Output = Void; + type Error = Void; + type Future = Pending>; + + fn upgrade_outbound(self, _: Stream, _: Self::Info) -> Self::Future { + pending() + } +} diff --git a/swarm/tests/connection_close.rs b/swarm/tests/connection_close.rs index 4efe8d17e49..7ec39a4bd2c 100644 --- a/swarm/tests/connection_close.rs +++ b/swarm/tests/connection_close.rs @@ -1,10 +1,9 @@ -use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::handler::ConnectionEvent; use libp2p_swarm::{ ConnectionDenied, ConnectionHandler, ConnectionHandlerEvent, ConnectionId, FromSwarm, - NetworkBehaviour, SubstreamProtocol, Swarm, SwarmEvent, THandler, THandlerInEvent, + NetworkBehaviour, NoProtocols, SubstreamProtocol, Swarm, SwarmEvent, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use libp2p_swarm_test::SwarmExt; @@ -96,13 +95,13 @@ impl NetworkBehaviour for Behaviour { impl ConnectionHandler for HandlerWithState { type FromBehaviour = Void; type ToBehaviour = u64; - type InboundProtocol = DeniedUpgrade; - type OutboundProtocol = DeniedUpgrade; + type InboundProtocol = NoProtocols; + type OutboundProtocol = NoProtocols; type InboundOpenInfo = (); type OutboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(DeniedUpgrade, ()) + SubstreamProtocol::new(NoProtocols::new(), ()) } fn connection_keep_alive(&self) -> bool { From 637228e19e24861e04a93848345cbb895475baea Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 9 Nov 2023 17:01:56 +1100 Subject: [PATCH 8/8] Require `AsRef` --- protocols/gossipsub/src/protocol.rs | 6 ++++++ swarm/src/connection.rs | 6 +++++- swarm/src/handler/multi.rs | 12 ++++++++++-- swarm/src/lib.rs | 11 ----------- swarm/src/stream_protocol.rs | 20 ++++++++++++++++---- swarm/src/upgrade.rs | 13 ++++++++----- 6 files changed, 45 insertions(+), 23 deletions(-) diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index dcd509f6aa9..fab51664173 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -90,6 +90,12 @@ impl AsRef for ProtocolId { } } +impl AsRef for ProtocolId { + fn as_ref(&self) -> &StreamProtocol { + self.protocol.as_ref() + } +} + impl UpgradeInfo for ProtocolConfig { type Info = ProtocolId; type InfoIter = Vec; diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 9434489d85a..1525b38087a 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -459,7 +459,11 @@ fn gather_supported_protocols(handler: &impl ConnectionHandler) -> HashSet> AsRef for IndexedProtoName { } } +impl> AsRef for IndexedProtoName { + fn as_ref(&self) -> &StreamProtocol { + self.1.as_ref() + } +} + /// The aggregated `InboundOpenInfo`s of supported inbound substream protocols. #[derive(Clone)] pub struct Info { @@ -421,7 +427,9 @@ where let mut set = HashSet::new(); for infos in iter { for i in infos.protocol_info() { - let v = Vec::from(i.as_ref()); + let x: &str = i.as_ref(); + + let v = Vec::from(x); if set.contains(&v) { return Err(DuplicateProtonameError(v)); } else { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index bdc0fb68b9f..694d14a990e 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -334,9 +334,6 @@ where /// handlers. behaviour: TBehaviour, - /// List of protocols that the behaviour says it supports. - supported_protocols: SmallVec<[Vec; 16]>, - confirmed_external_addr: HashSet, /// Multiaddresses that our listeners are listening on, @@ -371,7 +368,6 @@ where transport, pool: Pool::new(local_peer_id, config.pool_config), behaviour, - supported_protocols: Default::default(), confirmed_external_addr: Default::default(), listened_addrs: HashMap::new(), pending_handler_event: None, @@ -760,12 +756,6 @@ where } }; - let supported_protocols = handler - .listen_protocol() - .upgrade() - .protocol_info() - .map(|p| p.as_ref().as_bytes().to_vec()) - .collect(); let other_established_connection_ids = self .pool .iter_established_connections_of_peer(&peer_id) @@ -803,7 +793,6 @@ where other_established: other_established_connection_ids.len(), }, )); - self.supported_protocols = supported_protocols; self.pending_swarm_events .push_back(SwarmEvent::ConnectionEstablished { peer_id, diff --git a/swarm/src/stream_protocol.rs b/swarm/src/stream_protocol.rs index f746429a3d7..fed7d1a8216 100644 --- a/swarm/src/stream_protocol.rs +++ b/swarm/src/stream_protocol.rs @@ -50,6 +50,12 @@ impl AsRef for StreamProtocol { } } +impl AsRef for StreamProtocol { + fn as_ref(&self) -> &StreamProtocol { + &self + } +} + impl fmt::Debug for StreamProtocol { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { either::for_both!(&self.inner, s => s.fmt(f)) @@ -64,25 +70,31 @@ impl fmt::Display for StreamProtocol { impl PartialEq<&str> for StreamProtocol { fn eq(&self, other: &&str) -> bool { - self.as_ref() == *other + let x: &str = self.as_ref(); + x == *other } } impl PartialEq for &str { fn eq(&self, other: &StreamProtocol) -> bool { - *self == other.as_ref() + let x: &str = other.as_ref(); + *self == x } } impl PartialEq for StreamProtocol { fn eq(&self, other: &Self) -> bool { - self.as_ref() == other.as_ref() + let x: &str = self.as_ref(); + let y: &str = other.as_ref(); + + x == y } } impl Hash for StreamProtocol { fn hash(&self, state: &mut H) { - self.as_ref().hash(state) + let x: &str = self.as_ref(); + x.hash(state) } } diff --git a/swarm/src/upgrade.rs b/swarm/src/upgrade.rs index 8c6a1350f0d..40c123e24e0 100644 --- a/swarm/src/upgrade.rs +++ b/swarm/src/upgrade.rs @@ -22,7 +22,7 @@ pub(crate) mod no; pub(crate) mod several; pub(crate) mod single; -use crate::Stream; +use crate::{Stream, StreamProtocol}; use futures::prelude::*; use libp2p_core::upgrade; @@ -34,7 +34,7 @@ use libp2p_core::upgrade; /// [`UpgradeInfo`](upgrade::UpgradeInfo). pub trait UpgradeInfoSend: Send + 'static { /// Equivalent to [`UpgradeInfo::Info`](upgrade::UpgradeInfo::Info). - type Info: AsRef + Clone + Send + 'static; + type Info: AsRef + AsRef + Clone + Send + 'static; /// Equivalent to [`UpgradeInfo::InfoIter`](upgrade::UpgradeInfo::InfoIter). type InfoIter: Iterator + Send + 'static; @@ -42,11 +42,12 @@ pub trait UpgradeInfoSend: Send + 'static { fn protocol_info(&self) -> Self::InfoIter; } -impl UpgradeInfoSend for T +impl UpgradeInfoSend for T where - T: upgrade::UpgradeInfo + Send + 'static, + T: upgrade::UpgradeInfo + Send + 'static, T::Info: Send + 'static, ::IntoIter: Send + 'static, + I: AsRef + AsRef + Clone + Send + 'static, { type Info = T::Info; type InfoIter = ::IntoIter; @@ -132,7 +133,9 @@ where /// > doesn't need to be used directly. pub struct SendWrapper(pub T); -impl upgrade::UpgradeInfo for SendWrapper { +impl + Clone + AsRef, T: UpgradeInfoSend> + upgrade::UpgradeInfo for SendWrapper +{ type Info = T::Info; type InfoIter = T::InfoIter;