From 4c996b34e20b88d8d3f82bf49719ef6e70f50f91 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 20 Nov 2023 13:23:14 +1100 Subject: [PATCH 1/9] Pass on `QueryId` for `AddProvider` --- protocols/kad/src/behaviour.rs | 1 + protocols/kad/src/handler.rs | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 5a4b737c998..cde4fbb8536 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -3173,6 +3173,7 @@ impl QueryInfo { multiaddrs: external_addresses.clone(), connection_ty: crate::protocol::ConnectionType::Connected, }, + query_id, }, }, QueryInfo::GetRecord { key, .. } => HandlerIn::GetRecord { diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index adfb076541c..1b01af6c05a 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -355,7 +355,7 @@ pub enum HandlerIn { FindNodeReq { /// Identifier of the node. key: Vec, - /// Custom user data. Passed back in the out event when the results arrive. + /// ID of the query that generated this request. query_id: QueryId, }, @@ -374,7 +374,7 @@ pub enum HandlerIn { GetProvidersReq { /// Identifier being searched. key: record::Key, - /// Custom user data. Passed back in the out event when the results arrive. + /// ID of the query that generated this request. query_id: QueryId, }, @@ -399,13 +399,15 @@ pub enum HandlerIn { key: record::Key, /// Known provider for this key. provider: KadPeer, + /// ID of the query that generated this request. + query_id: QueryId, }, /// Request to retrieve a record from the DHT. GetRecord { /// The key of the record. key: record::Key, - /// Custom data. Passed back in the out event when the results arrive. + /// ID of the query that generated this request. query_id: QueryId, }, @@ -422,7 +424,7 @@ pub enum HandlerIn { /// Put a value into the dht records. PutRecord { record: Record, - /// Custom data. Passed back in the out event when the results arrive. + /// ID of the query that generated this request. query_id: QueryId, }, @@ -648,9 +650,13 @@ impl ConnectionHandler for Handler { provider_peers, }, ), - HandlerIn::AddProvider { key, provider } => { + HandlerIn::AddProvider { + key, + provider, + query_id, + } => { let msg = KadRequestMsg::AddProvider { key, provider }; - self.pending_messages.push_back((msg, None)); + self.pending_messages.push_back((msg, Some(query_id))); } HandlerIn::GetRecord { key, query_id } => { let msg = KadRequestMsg::GetValue { key }; From 00e624b67e6dd3f3be97086df9a79b39fdca576b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 20 Nov 2023 13:27:01 +1100 Subject: [PATCH 2/9] Convert tuple variants to structs --- protocols/kad/src/handler.rs | 226 ++++++++++++++++++++--------------- 1 file changed, 131 insertions(+), 95 deletions(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 1b01af6c05a..32ba641a7c5 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -98,16 +98,31 @@ struct ProtocolStatus { /// State of an active outbound substream. enum OutboundSubstreamState { /// Waiting to send a message to the remote. - PendingSend(KadOutStreamSink, KadRequestMsg, Option), + PendingSend { + stream: KadOutStreamSink, + msg: KadRequestMsg, + query_id: Option, + }, /// Waiting to flush the substream so that the data arrives to the remote. - PendingFlush(KadOutStreamSink, Option), + PendingFlush { + stream: KadOutStreamSink, + query_id: Option, + }, /// Waiting for an answer back from the remote. // TODO: add timeout - WaitingAnswer(KadOutStreamSink, QueryId), + WaitingAnswer { + stream: KadOutStreamSink, + query_id: QueryId, + }, /// An error happened on the substream and we should report the error to the user. - ReportError(HandlerQueryErr, QueryId), + ReportError { + error: HandlerQueryErr, + query_id: QueryId, + }, /// The substream is being closed. - Closing(KadOutStreamSink), + Closing { + stream: KadOutStreamSink, + }, /// The substream is complete and will not perform any more work. Done, Poisoned, @@ -499,7 +514,11 @@ impl Handler { ) { if let Some((msg, query_id)) = self.pending_messages.pop_front() { self.outbound_substreams - .push(OutboundSubstreamState::PendingSend(protocol, msg, query_id)); + .push(OutboundSubstreamState::PendingSend { + stream: protocol, + msg, + query_id, + }); } else { debug_assert!(false, "Requested outbound stream without message") } @@ -589,7 +608,10 @@ impl Handler { if let Some((_, Some(query_id))) = self.pending_messages.pop_front() { self.outbound_substreams - .push(OutboundSubstreamState::ReportError(error.into(), query_id)); + .push(OutboundSubstreamState::ReportError { + error: error.into(), + query_id, + }); } self.num_requested_outbound_streams -= 1; @@ -853,31 +875,19 @@ impl futures::Stream for OutboundSubstreamState { loop { match std::mem::replace(this, OutboundSubstreamState::Poisoned) { - OutboundSubstreamState::PendingSend(mut substream, msg, query_id) => { - match substream.poll_ready_unpin(cx) { - Poll::Ready(Ok(())) => match substream.start_send_unpin(msg) { - Ok(()) => { - *this = OutboundSubstreamState::PendingFlush(substream, query_id); - } - Err(error) => { - *this = OutboundSubstreamState::Done; - let event = query_id.map(|query_id| { - ConnectionHandlerEvent::NotifyBehaviour( - HandlerEvent::QueryError { - error: HandlerQueryErr::Io(error), - query_id, - }, - ) - }); - - return Poll::Ready(event); - } - }, - Poll::Pending => { - *this = OutboundSubstreamState::PendingSend(substream, msg, query_id); - return Poll::Pending; + OutboundSubstreamState::PendingSend { + stream: mut substream, + msg, + query_id, + } => match substream.poll_ready_unpin(cx) { + Poll::Ready(Ok(())) => match substream.start_send_unpin(msg) { + Ok(()) => { + *this = OutboundSubstreamState::PendingFlush { + stream: substream, + query_id, + }; } - Poll::Ready(Err(error)) => { + Err(error) => { *this = OutboundSubstreamState::Done; let event = query_id.map(|query_id| { ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::QueryError { @@ -888,85 +898,111 @@ impl futures::Stream for OutboundSubstreamState { return Poll::Ready(event); } + }, + Poll::Pending => { + *this = OutboundSubstreamState::PendingSend { + stream: substream, + msg, + query_id, + }; + return Poll::Pending; } - } - OutboundSubstreamState::PendingFlush(mut substream, query_id) => { - match substream.poll_flush_unpin(cx) { - Poll::Ready(Ok(())) => { - if let Some(query_id) = query_id { - *this = OutboundSubstreamState::WaitingAnswer(substream, query_id); - } else { - *this = OutboundSubstreamState::Closing(substream); - } - } - Poll::Pending => { - *this = OutboundSubstreamState::PendingFlush(substream, query_id); - return Poll::Pending; - } - Poll::Ready(Err(error)) => { - *this = OutboundSubstreamState::Done; - let event = query_id.map(|query_id| { - ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::QueryError { - error: HandlerQueryErr::Io(error), - query_id, - }) - }); + Poll::Ready(Err(error)) => { + *this = OutboundSubstreamState::Done; + let event = query_id.map(|query_id| { + ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::QueryError { + error: HandlerQueryErr::Io(error), + query_id, + }) + }); - return Poll::Ready(event); - } + return Poll::Ready(event); } - } - OutboundSubstreamState::WaitingAnswer(mut substream, query_id) => { - match substream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(msg))) => { - *this = OutboundSubstreamState::Closing(substream); - let event = process_kad_response(msg, query_id); - - return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( - event, - ))); - } - Poll::Pending => { - *this = OutboundSubstreamState::WaitingAnswer(substream, query_id); - return Poll::Pending; - } - Poll::Ready(Some(Err(error))) => { - *this = OutboundSubstreamState::Done; - let event = HandlerEvent::QueryError { - error: HandlerQueryErr::Io(error), + }, + OutboundSubstreamState::PendingFlush { + stream: mut substream, + query_id, + } => match substream.poll_flush_unpin(cx) { + Poll::Ready(Ok(())) => { + if let Some(query_id) = query_id { + *this = OutboundSubstreamState::WaitingAnswer { + stream: substream, query_id, }; - - return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( - event, - ))); + } else { + *this = OutboundSubstreamState::Closing { stream: substream }; } - Poll::Ready(None) => { - *this = OutboundSubstreamState::Done; - let event = HandlerEvent::QueryError { - error: HandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()), + } + Poll::Pending => { + *this = OutboundSubstreamState::PendingFlush { + stream: substream, + query_id, + }; + return Poll::Pending; + } + Poll::Ready(Err(error)) => { + *this = OutboundSubstreamState::Done; + let event = query_id.map(|query_id| { + ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::QueryError { + error: HandlerQueryErr::Io(error), query_id, - }; + }) + }); - return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( - event, - ))); - } + return Poll::Ready(event); } - } - OutboundSubstreamState::ReportError(error, query_id) => { + }, + OutboundSubstreamState::WaitingAnswer { + stream: mut substream, + query_id, + } => match substream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => { + *this = OutboundSubstreamState::Closing { stream: substream }; + let event = process_kad_response(msg, query_id); + + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event))); + } + Poll::Pending => { + *this = OutboundSubstreamState::WaitingAnswer { + stream: substream, + query_id, + }; + return Poll::Pending; + } + Poll::Ready(Some(Err(error))) => { + *this = OutboundSubstreamState::Done; + let event = HandlerEvent::QueryError { + error: HandlerQueryErr::Io(error), + query_id, + }; + + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event))); + } + Poll::Ready(None) => { + *this = OutboundSubstreamState::Done; + let event = HandlerEvent::QueryError { + error: HandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()), + query_id, + }; + + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event))); + } + }, + OutboundSubstreamState::ReportError { error, query_id } => { *this = OutboundSubstreamState::Done; let event = HandlerEvent::QueryError { error, query_id }; return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event))); } - OutboundSubstreamState::Closing(mut stream) => match stream.poll_close_unpin(cx) { - Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => return Poll::Ready(None), - Poll::Pending => { - *this = OutboundSubstreamState::Closing(stream); - return Poll::Pending; + OutboundSubstreamState::Closing { mut stream } => { + match stream.poll_close_unpin(cx) { + Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => return Poll::Ready(None), + Poll::Pending => { + *this = OutboundSubstreamState::Closing { stream }; + return Poll::Pending; + } } - }, + } OutboundSubstreamState::Done => { *this = OutboundSubstreamState::Done; return Poll::Ready(None); From b804bc8e4944cd6ee13737a6816270fccd6e4924 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 20 Nov 2023 13:29:30 +1100 Subject: [PATCH 3/9] Track `has_answer` separately --- protocols/kad/src/handler.rs | 65 +++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 32ba641a7c5..acadfde6588 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -107,6 +107,8 @@ enum OutboundSubstreamState { PendingFlush { stream: KadOutStreamSink, query_id: Option, + /// Whether the sent message has an answer. + has_answer: bool, }, /// Waiting for an answer back from the remote. // TODO: add timeout @@ -879,15 +881,41 @@ impl futures::Stream for OutboundSubstreamState { stream: mut substream, msg, query_id, - } => match substream.poll_ready_unpin(cx) { - Poll::Ready(Ok(())) => match substream.start_send_unpin(msg) { - Ok(()) => { - *this = OutboundSubstreamState::PendingFlush { + } => { + let has_answer = !matches!(msg, KadRequestMsg::AddProvider { .. }); // All queries apart from `AddProvider` have an answer. + + match substream.poll_ready_unpin(cx) { + Poll::Ready(Ok(())) => match substream.start_send_unpin(msg) { + Ok(()) => { + *this = OutboundSubstreamState::PendingFlush { + stream: substream, + query_id, + has_answer, + }; + } + Err(error) => { + *this = OutboundSubstreamState::Done; + let event = query_id.map(|query_id| { + ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::QueryError { + error: HandlerQueryErr::Io(error), + query_id, + }, + ) + }); + + return Poll::Ready(event); + } + }, + Poll::Pending => { + *this = OutboundSubstreamState::PendingSend { stream: substream, + msg, query_id, }; + return Poll::Pending; } - Err(error) => { + Poll::Ready(Err(error)) => { *this = OutboundSubstreamState::Done; let event = query_id.map(|query_id| { ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::QueryError { @@ -898,36 +926,18 @@ impl futures::Stream for OutboundSubstreamState { return Poll::Ready(event); } - }, - Poll::Pending => { - *this = OutboundSubstreamState::PendingSend { - stream: substream, - msg, - query_id, - }; - return Poll::Pending; } - Poll::Ready(Err(error)) => { - *this = OutboundSubstreamState::Done; - let event = query_id.map(|query_id| { - ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::QueryError { - error: HandlerQueryErr::Io(error), - query_id, - }) - }); - - return Poll::Ready(event); - } - }, + } OutboundSubstreamState::PendingFlush { stream: mut substream, query_id, + has_answer, } => match substream.poll_flush_unpin(cx) { Poll::Ready(Ok(())) => { - if let Some(query_id) = query_id { + if has_answer { *this = OutboundSubstreamState::WaitingAnswer { stream: substream, - query_id, + query_id: query_id.unwrap(), }; } else { *this = OutboundSubstreamState::Closing { stream: substream }; @@ -937,6 +947,7 @@ impl futures::Stream for OutboundSubstreamState { *this = OutboundSubstreamState::PendingFlush { stream: substream, query_id, + has_answer, }; return Poll::Pending; } From 5101e63b8419d31c257edcd405a10b60ff81dd23 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 20 Nov 2023 13:31:07 +1100 Subject: [PATCH 4/9] Always track `QueryId` --- protocols/kad/src/handler.rs | 59 +++++++++++++++++------------------- 1 file changed, 27 insertions(+), 32 deletions(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index acadfde6588..5e95cd295f1 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -67,7 +67,7 @@ pub struct Handler { /// List of outbound substreams that are waiting to become active next. /// Contains the request we want to send, and the user data if we expect an answer. - pending_messages: VecDeque<(KadRequestMsg, Option)>, + pending_messages: VecDeque<(KadRequestMsg, QueryId)>, /// List of active inbound substreams with the state they are in. inbound_substreams: SelectAll, @@ -101,12 +101,12 @@ enum OutboundSubstreamState { PendingSend { stream: KadOutStreamSink, msg: KadRequestMsg, - query_id: Option, + query_id: QueryId, }, /// Waiting to flush the substream so that the data arrives to the remote. PendingFlush { stream: KadOutStreamSink, - query_id: Option, + query_id: QueryId, /// Whether the sent message has an answer. has_answer: bool, }, @@ -608,7 +608,7 @@ impl Handler { // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't // continue trying - if let Some((_, Some(query_id))) = self.pending_messages.pop_front() { + if let Some((_, query_id)) = self.pending_messages.pop_front() { self.outbound_substreams .push(OutboundSubstreamState::ReportError { error: error.into(), @@ -653,7 +653,7 @@ impl ConnectionHandler for Handler { } HandlerIn::FindNodeReq { key, query_id } => { let msg = KadRequestMsg::FindNode { key }; - self.pending_messages.push_back((msg, Some(query_id))); + self.pending_messages.push_back((msg, query_id)); } HandlerIn::FindNodeRes { closer_peers, @@ -661,7 +661,7 @@ impl ConnectionHandler for Handler { } => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }), HandlerIn::GetProvidersReq { key, query_id } => { let msg = KadRequestMsg::GetProviders { key }; - self.pending_messages.push_back((msg, Some(query_id))); + self.pending_messages.push_back((msg, query_id)); } HandlerIn::GetProvidersRes { closer_peers, @@ -680,15 +680,15 @@ impl ConnectionHandler for Handler { query_id, } => { let msg = KadRequestMsg::AddProvider { key, provider }; - self.pending_messages.push_back((msg, Some(query_id))); + self.pending_messages.push_back((msg, query_id)); } HandlerIn::GetRecord { key, query_id } => { let msg = KadRequestMsg::GetValue { key }; - self.pending_messages.push_back((msg, Some(query_id))); + self.pending_messages.push_back((msg, query_id)); } HandlerIn::PutRecord { record, query_id } => { let msg = KadRequestMsg::PutValue { record }; - self.pending_messages.push_back((msg, Some(query_id))); + self.pending_messages.push_back((msg, query_id)); } HandlerIn::GetRecordRes { record, @@ -895,16 +895,13 @@ impl futures::Stream for OutboundSubstreamState { } Err(error) => { *this = OutboundSubstreamState::Done; - let event = query_id.map(|query_id| { - ConnectionHandlerEvent::NotifyBehaviour( - HandlerEvent::QueryError { - error: HandlerQueryErr::Io(error), - query_id, - }, - ) - }); - - return Poll::Ready(event); + + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::QueryError { + error: HandlerQueryErr::Io(error), + query_id, + }, + ))); } }, Poll::Pending => { @@ -917,14 +914,13 @@ impl futures::Stream for OutboundSubstreamState { } Poll::Ready(Err(error)) => { *this = OutboundSubstreamState::Done; - let event = query_id.map(|query_id| { - ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::QueryError { + + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::QueryError { error: HandlerQueryErr::Io(error), query_id, - }) - }); - - return Poll::Ready(event); + }, + ))); } } } @@ -937,7 +933,7 @@ impl futures::Stream for OutboundSubstreamState { if has_answer { *this = OutboundSubstreamState::WaitingAnswer { stream: substream, - query_id: query_id.unwrap(), + query_id, }; } else { *this = OutboundSubstreamState::Closing { stream: substream }; @@ -953,14 +949,13 @@ impl futures::Stream for OutboundSubstreamState { } Poll::Ready(Err(error)) => { *this = OutboundSubstreamState::Done; - let event = query_id.map(|query_id| { - ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::QueryError { + + return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::QueryError { error: HandlerQueryErr::Io(error), query_id, - }) - }); - - return Poll::Ready(event); + }, + ))); } }, OutboundSubstreamState::WaitingAnswer { From 4b17fcce80eda91bfedcaf05a6b32e60a23c6b79 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 20 Nov 2023 14:13:53 +1100 Subject: [PATCH 5/9] Refactor kademlia to use `async-await` --- Cargo.lock | 1 + protocols/kad/Cargo.toml | 1 + protocols/kad/src/handler.rs | 388 ++++++++++------------------------- 3 files changed, 116 insertions(+), 274 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8fe1eeee7a..b623b693928 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2719,6 +2719,7 @@ dependencies = [ "either", "fnv", "futures", + "futures-bounded", "futures-timer", "instant", "libp2p-core", diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 04101d51026..1e4c788cf00 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -19,6 +19,7 @@ asynchronous-codec = { workspace = true } futures = "0.3.29" libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } +futures-bounded = { workspace = true } quick-protobuf = "0.8" quick-protobuf-codec = { workspace = true } libp2p-identity = { workspace = true, features = ["rand"] } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 5e95cd295f1..daf7e0f9725 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -25,22 +25,22 @@ use crate::protocol::{ use crate::record::{self, Record}; use crate::QueryId; use either::Either; +use futures::channel::oneshot; use futures::prelude::*; use futures::stream::SelectAll; use libp2p_core::{upgrade, ConnectedPoint}; use libp2p_identity::PeerId; -use libp2p_swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, -}; +use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, Stream, StreamUpgradeError, SubstreamProtocol, SupportedProtocols, }; use std::collections::VecDeque; use std::task::Waker; +use std::time::Duration; use std::{error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll}; -const MAX_NUM_SUBSTREAMS: usize = 32; +const MAX_NUM_STREAMS: usize = 32; /// Protocol handler that manages substreams for the Kademlia protocol /// on a single connection with a peer. @@ -59,11 +59,12 @@ pub struct Handler { /// Next unique ID of a connection. next_connec_unique_id: UniqueConnecId, - /// List of active outbound substreams with the state they are in. - outbound_substreams: SelectAll, + /// List of active outbound streams. + outbound_substreams: futures_bounded::FuturesMap>>, - /// Number of outbound streams being upgraded right now. - num_requested_outbound_streams: usize, + /// Contains one [`oneshot::Sender`] per outbound stream that we have requested. + pending_streams: + VecDeque, StreamUpgradeError>>>, /// List of outbound substreams that are waiting to become active next. /// Contains the request we want to send, and the user data if we expect an answer. @@ -95,41 +96,6 @@ struct ProtocolStatus { reported: bool, } -/// State of an active outbound substream. -enum OutboundSubstreamState { - /// Waiting to send a message to the remote. - PendingSend { - stream: KadOutStreamSink, - msg: KadRequestMsg, - query_id: QueryId, - }, - /// Waiting to flush the substream so that the data arrives to the remote. - PendingFlush { - stream: KadOutStreamSink, - query_id: QueryId, - /// Whether the sent message has an answer. - has_answer: bool, - }, - /// Waiting for an answer back from the remote. - // TODO: add timeout - WaitingAnswer { - stream: KadOutStreamSink, - query_id: QueryId, - }, - /// An error happened on the substream and we should report the error to the user. - ReportError { - error: HandlerQueryErr, - query_id: QueryId, - }, - /// The substream is being closed. - Closing { - stream: KadOutStreamSink, - }, - /// The substream is complete and will not perform any more work. - Done, - Poisoned, -} - /// State of an active inbound substream. enum InboundSubstreamState { /// Waiting for a request from the remote. @@ -309,8 +275,6 @@ pub enum HandlerEvent { /// Error that can happen when requesting an RPC query. #[derive(Debug)] pub enum HandlerQueryErr { - /// Error while trying to perform the query. - Upgrade(StreamUpgradeError), /// Received an answer that doesn't correspond to the request. UnexpectedMessage, /// I/O error in the substream. @@ -320,9 +284,6 @@ pub enum HandlerQueryErr { impl fmt::Display for HandlerQueryErr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - HandlerQueryErr::Upgrade(err) => { - write!(f, "Error while performing Kademlia query: {err}") - } HandlerQueryErr::UnexpectedMessage => { write!( f, @@ -339,19 +300,12 @@ impl fmt::Display for HandlerQueryErr { impl error::Error for HandlerQueryErr { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { - HandlerQueryErr::Upgrade(err) => Some(err), HandlerQueryErr::UnexpectedMessage => None, HandlerQueryErr::Io(err) => Some(err), } } } -impl From> for HandlerQueryErr { - fn from(err: StreamUpgradeError) -> Self { - HandlerQueryErr::Upgrade(err) - } -} - /// Event to send to the handler. #[derive(Debug)] pub enum HandlerIn { @@ -499,8 +453,11 @@ impl Handler { remote_peer_id, next_connec_unique_id: UniqueConnecId(0), inbound_substreams: Default::default(), - outbound_substreams: Default::default(), - num_requested_outbound_streams: 0, + outbound_substreams: futures_bounded::FuturesMap::new( + Duration::from_secs(30), + MAX_NUM_STREAMS, + ), + pending_streams: Default::default(), pending_messages: Default::default(), protocol_status: None, remote_supported_protocols: Default::default(), @@ -509,24 +466,18 @@ impl Handler { fn on_fully_negotiated_outbound( &mut self, - FullyNegotiatedOutbound { protocol, info: () }: FullyNegotiatedOutbound< + FullyNegotiatedOutbound { + protocol: stream, + info: (), + }: FullyNegotiatedOutbound< ::OutboundProtocol, ::OutboundOpenInfo, >, ) { - if let Some((msg, query_id)) = self.pending_messages.pop_front() { - self.outbound_substreams - .push(OutboundSubstreamState::PendingSend { - stream: protocol, - msg, - query_id, - }); - } else { - debug_assert!(false, "Requested outbound stream without message") + if let Some(sender) = self.pending_streams.pop_front() { + let _ = sender.send(Ok(stream)); } - self.num_requested_outbound_streams -= 1; - if self.protocol_status.is_none() { // Upon the first successfully negotiated substream, we know that the // remote is configured with the same protocol name and we want @@ -562,7 +513,7 @@ impl Handler { }); } - if self.inbound_substreams.len() == MAX_NUM_SUBSTREAMS { + if self.inbound_substreams.len() == MAX_NUM_STREAMS { if let Some(s) = self.inbound_substreams.iter_mut().find(|s| { matches!( s, @@ -596,27 +547,40 @@ impl Handler { }); } - fn on_dial_upgrade_error( - &mut self, - DialUpgradeError { - info: (), error, .. - }: DialUpgradeError< - ::OutboundOpenInfo, - ::OutboundProtocol, - >, - ) { - // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't - // continue trying - - if let Some((_, query_id)) = self.pending_messages.pop_front() { - self.outbound_substreams - .push(OutboundSubstreamState::ReportError { - error: error.into(), - query_id, - }); - } + fn request_new_stream(&mut self, id: QueryId, msg: KadRequestMsg) { + let (sender, receiver) = oneshot::channel(); + + self.pending_streams.push_back(sender); + let result = self.outbound_substreams.try_push(id, async move { + let mut stream = receiver + .await + .map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))? + .map_err(|e| match e { + StreamUpgradeError::Timeout => io::ErrorKind::TimedOut.into(), + StreamUpgradeError::Apply(e) => e, + StreamUpgradeError::NegotiationFailed => { + io::Error::new(io::ErrorKind::ConnectionRefused, "protocol not supported") + } + StreamUpgradeError::Io(e) => e, + })?; + + let has_answer = !matches!(msg, KadRequestMsg::AddProvider { .. }); + + stream.send(msg).await?; + + if !has_answer { + return Ok(None); + } + + let msg = stream.next().await.ok_or(io::ErrorKind::UnexpectedEof)??; + + Ok(Some(msg)) + }); - self.num_requested_outbound_streams -= 1; + debug_assert!( + result.is_ok(), + "Expected to not create more streams than allowed" + ); } } @@ -742,44 +706,68 @@ impl ConnectionHandler for Handler { ) -> Poll< ConnectionHandlerEvent, > { - match &mut self.protocol_status { - Some(status) if !status.reported => { - status.reported = true; - let event = if status.supported { - HandlerEvent::ProtocolConfirmed { - endpoint: self.endpoint.clone(), - } - } else { - HandlerEvent::ProtocolNotSupported { - endpoint: self.endpoint.clone(), - } - }; + loop { + match &mut self.protocol_status { + Some(status) if !status.reported => { + status.reported = true; + let event = if status.supported { + HandlerEvent::ProtocolConfirmed { + endpoint: self.endpoint.clone(), + } + } else { + HandlerEvent::ProtocolNotSupported { + endpoint: self.endpoint.clone(), + } + }; - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + } + _ => {} } - _ => {} - } - if let Poll::Ready(Some(event)) = self.outbound_substreams.poll_next_unpin(cx) { - return Poll::Ready(event); - } + match self.outbound_substreams.poll_unpin(cx) { + Poll::Ready((query, Ok(Ok(Some(response))))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + process_kad_response(response, query), + )) + } + Poll::Ready((_, Ok(Ok(None)))) => { + continue; + } + Poll::Ready((query_id, Ok(Err(e)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::QueryError { + error: HandlerQueryErr::Io(e), + query_id, + }, + )) + } + Poll::Ready((query_id, Err(_timeout))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::QueryError { + error: HandlerQueryErr::Io(io::ErrorKind::TimedOut.into()), + query_id, + }, + )) + } + Poll::Pending => {} + } - if let Poll::Ready(Some(event)) = self.inbound_substreams.poll_next_unpin(cx) { - return Poll::Ready(event); - } + if let Poll::Ready(Some(event)) = self.inbound_substreams.poll_next_unpin(cx) { + return Poll::Ready(event); + } - let num_in_progress_outbound_substreams = - self.outbound_substreams.len() + self.num_requested_outbound_streams; - if num_in_progress_outbound_substreams < MAX_NUM_SUBSTREAMS - && self.num_requested_outbound_streams < self.pending_messages.len() - { - self.num_requested_outbound_streams += 1; - return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.protocol_config.clone(), ()), - }); - } + if (self.outbound_substreams.len() + self.pending_streams.len()) < MAX_NUM_STREAMS { + if let Some((msg, id)) = self.pending_messages.pop_front() { + self.request_new_stream(id, msg); + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(self.protocol_config.clone(), ()), + }); + } + } - Poll::Pending + return Poll::Pending; + } } fn on_connection_event( @@ -798,8 +786,10 @@ impl ConnectionHandler for Handler { ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { self.on_fully_negotiated_inbound(fully_negotiated_inbound) } - ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { - self.on_dial_upgrade_error(dial_upgrade_error) + ConnectionEvent::DialUpgradeError(ev) => { + if let Some(sender) = self.pending_streams.pop_front() { + let _ = sender.send(Err(ev.error)); + } } ConnectionEvent::RemoteProtocolsChange(change) => { let dirty = self.remote_supported_protocols.on_protocols_change(change); @@ -869,156 +859,6 @@ impl Handler { } } -impl futures::Stream for OutboundSubstreamState { - type Item = ConnectionHandlerEvent; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - - loop { - match std::mem::replace(this, OutboundSubstreamState::Poisoned) { - OutboundSubstreamState::PendingSend { - stream: mut substream, - msg, - query_id, - } => { - let has_answer = !matches!(msg, KadRequestMsg::AddProvider { .. }); // All queries apart from `AddProvider` have an answer. - - match substream.poll_ready_unpin(cx) { - Poll::Ready(Ok(())) => match substream.start_send_unpin(msg) { - Ok(()) => { - *this = OutboundSubstreamState::PendingFlush { - stream: substream, - query_id, - has_answer, - }; - } - Err(error) => { - *this = OutboundSubstreamState::Done; - - return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( - HandlerEvent::QueryError { - error: HandlerQueryErr::Io(error), - query_id, - }, - ))); - } - }, - Poll::Pending => { - *this = OutboundSubstreamState::PendingSend { - stream: substream, - msg, - query_id, - }; - return Poll::Pending; - } - Poll::Ready(Err(error)) => { - *this = OutboundSubstreamState::Done; - - return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( - HandlerEvent::QueryError { - error: HandlerQueryErr::Io(error), - query_id, - }, - ))); - } - } - } - OutboundSubstreamState::PendingFlush { - stream: mut substream, - query_id, - has_answer, - } => match substream.poll_flush_unpin(cx) { - Poll::Ready(Ok(())) => { - if has_answer { - *this = OutboundSubstreamState::WaitingAnswer { - stream: substream, - query_id, - }; - } else { - *this = OutboundSubstreamState::Closing { stream: substream }; - } - } - Poll::Pending => { - *this = OutboundSubstreamState::PendingFlush { - stream: substream, - query_id, - has_answer, - }; - return Poll::Pending; - } - Poll::Ready(Err(error)) => { - *this = OutboundSubstreamState::Done; - - return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( - HandlerEvent::QueryError { - error: HandlerQueryErr::Io(error), - query_id, - }, - ))); - } - }, - OutboundSubstreamState::WaitingAnswer { - stream: mut substream, - query_id, - } => match substream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(msg))) => { - *this = OutboundSubstreamState::Closing { stream: substream }; - let event = process_kad_response(msg, query_id); - - return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event))); - } - Poll::Pending => { - *this = OutboundSubstreamState::WaitingAnswer { - stream: substream, - query_id, - }; - return Poll::Pending; - } - Poll::Ready(Some(Err(error))) => { - *this = OutboundSubstreamState::Done; - let event = HandlerEvent::QueryError { - error: HandlerQueryErr::Io(error), - query_id, - }; - - return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event))); - } - Poll::Ready(None) => { - *this = OutboundSubstreamState::Done; - let event = HandlerEvent::QueryError { - error: HandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()), - query_id, - }; - - return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event))); - } - }, - OutboundSubstreamState::ReportError { error, query_id } => { - *this = OutboundSubstreamState::Done; - let event = HandlerEvent::QueryError { error, query_id }; - - return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(event))); - } - OutboundSubstreamState::Closing { mut stream } => { - match stream.poll_close_unpin(cx) { - Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => return Poll::Ready(None), - Poll::Pending => { - *this = OutboundSubstreamState::Closing { stream }; - return Poll::Pending; - } - } - } - OutboundSubstreamState::Done => { - *this = OutboundSubstreamState::Done; - return Poll::Ready(None); - } - OutboundSubstreamState::Poisoned => unreachable!(), - } - } - } -} - impl futures::Stream for InboundSubstreamState { type Item = ConnectionHandlerEvent; From 75ff7a695e570cfb0824e112f1006b12ed4ea0d5 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 20 Nov 2023 14:19:42 +1100 Subject: [PATCH 6/9] Close stream after sending --- protocols/kad/src/handler.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index daf7e0f9725..c0fb83332b2 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -567,6 +567,7 @@ impl Handler { let has_answer = !matches!(msg, KadRequestMsg::AddProvider { .. }); stream.send(msg).await?; + stream.close().await?; if !has_answer { return Ok(None); From 51556d6d96e039a1141bb57fcb87bed3038ccb25 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 21 Nov 2023 08:47:27 +1100 Subject: [PATCH 7/9] Update protocols/kad/src/handler.rs Co-authored-by: Max Inden --- protocols/kad/src/handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index c0fb83332b2..20fed80bcc7 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -454,7 +454,7 @@ impl Handler { next_connec_unique_id: UniqueConnecId(0), inbound_substreams: Default::default(), outbound_substreams: futures_bounded::FuturesMap::new( - Duration::from_secs(30), + Duration::from_secs(10), MAX_NUM_STREAMS, ), pending_streams: Default::default(), From 0cd4f95ad29ebbfaffe6714d5f4360a4b46f39c0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Nov 2023 10:24:25 +1100 Subject: [PATCH 8/9] Rename fn --- protocols/kad/src/handler.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index c0fb83332b2..c2f540885df 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -547,7 +547,8 @@ impl Handler { }); } - fn request_new_stream(&mut self, id: QueryId, msg: KadRequestMsg) { + /// Takes the given [`KadRequestMsg`] and composes it into an outbound request-response protocol handshake using a [`oneshot::channel`]. + fn queue_new_stream(&mut self, id: QueryId, msg: KadRequestMsg) { let (sender, receiver) = oneshot::channel(); self.pending_streams.push_back(sender); @@ -760,7 +761,7 @@ impl ConnectionHandler for Handler { if (self.outbound_substreams.len() + self.pending_streams.len()) < MAX_NUM_STREAMS { if let Some((msg, id)) = self.pending_messages.pop_front() { - self.request_new_stream(id, msg); + self.queue_new_stream(id, msg); return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(self.protocol_config.clone(), ()), }); From 43d2e6d7b149b77f39cdf3f76cf3e9026c733da7 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 23 Nov 2023 10:24:58 +1100 Subject: [PATCH 9/9] Don't double count streams --- protocols/kad/src/handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 5d5f8c4d8f6..318261d8d21 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -759,7 +759,7 @@ impl ConnectionHandler for Handler { return Poll::Ready(event); } - if (self.outbound_substreams.len() + self.pending_streams.len()) < MAX_NUM_STREAMS { + if self.outbound_substreams.len() < MAX_NUM_STREAMS { if let Some((msg, id)) = self.pending_messages.pop_front() { self.queue_new_stream(id, msg); return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {