diff --git a/Cargo.lock b/Cargo.lock index 617d54e9b2bd..39052b728d44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2720,7 +2720,7 @@ dependencies = [ [[package]] name = "libp2p-kad" -version = "0.45.2" +version = "0.45.3" dependencies = [ "arrayvec", "async-std", diff --git a/Cargo.toml b/Cargo.toml index 346b316d4dc6..032088e6a81d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ libp2p-floodsub = { version = "0.44.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.46.1", path = "protocols/gossipsub" } libp2p-identify = { version = "0.44.1", path = "protocols/identify" } libp2p-identity = { version = "0.2.8" } -libp2p-kad = { version = "0.45.2", path = "protocols/kad" } +libp2p-kad = { version = "0.45.3", path = "protocols/kad" } libp2p-mdns = { version = "0.45.1", path = "protocols/mdns" } libp2p-memory-connection-limits = { version = "0.2.0", path = "misc/memory-connection-limits" } libp2p-metrics = { version = "0.14.1", path = "misc/metrics" } diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 4740e4b1f958..b86ed4119f85 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.45.3 - unreleased + +- Fix a bug in `Handler::poll` that could result in the handling of events, received + by `on_behaviour_event`, to be delayed or completely blocked. + See [PR 4961](https://github.com/libp2p/rust-libp2p/pull/4961). + ## 0.45.2 - Ensure `Multiaddr` handled and returned by `Behaviour` are `/p2p` terminated. diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 1e4c788cf003..f4ad83972b42 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-kad" edition = "2021" rust-version = { workspace = true } description = "Kademlia protocol for libp2p" -version = "0.45.2" +version = "0.45.3" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 318261d8d219..86a8aaef958c 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -68,7 +68,7 @@ pub struct Handler { /// List of outbound substreams that are waiting to become active next. /// Contains the request we want to send, and the user data if we expect an answer. - pending_messages: VecDeque<(KadRequestMsg, QueryId)>, + pending_messages: WakeableVecDeque<(KadRequestMsg, QueryId)>, /// List of active inbound substreams with the state they are in. inbound_substreams: SelectAll, @@ -619,7 +619,7 @@ impl ConnectionHandler for Handler { } HandlerIn::FindNodeReq { key, query_id } => { let msg = KadRequestMsg::FindNode { key }; - self.pending_messages.push_back((msg, query_id)); + self.pending_messages.push((msg, query_id)); } HandlerIn::FindNodeRes { closer_peers, @@ -627,7 +627,7 @@ impl ConnectionHandler for Handler { } => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }), HandlerIn::GetProvidersReq { key, query_id } => { let msg = KadRequestMsg::GetProviders { key }; - self.pending_messages.push_back((msg, query_id)); + self.pending_messages.push((msg, query_id)); } HandlerIn::GetProvidersRes { closer_peers, @@ -646,15 +646,15 @@ impl ConnectionHandler for Handler { query_id, } => { let msg = KadRequestMsg::AddProvider { key, provider }; - self.pending_messages.push_back((msg, query_id)); + self.pending_messages.push((msg, query_id)); } HandlerIn::GetRecord { key, query_id } => { let msg = KadRequestMsg::GetValue { key }; - self.pending_messages.push_back((msg, query_id)); + self.pending_messages.push((msg, query_id)); } HandlerIn::PutRecord { record, query_id } => { let msg = KadRequestMsg::PutValue { record }; - self.pending_messages.push_back((msg, query_id)); + self.pending_messages.push((msg, query_id)); } HandlerIn::GetRecordRes { record, @@ -760,11 +760,14 @@ impl ConnectionHandler for Handler { } if self.outbound_substreams.len() < MAX_NUM_STREAMS { - if let Some((msg, id)) = self.pending_messages.pop_front() { - self.queue_new_stream(id, msg); - return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.protocol_config.clone(), ()), - }); + match self.pending_messages.poll_unpin(cx) { + Poll::Ready((msg, id)) => { + self.queue_new_stream(id, msg); + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(self.protocol_config.clone(), ()), + }); + } + Poll::Pending => {} } } @@ -1051,6 +1054,40 @@ fn process_kad_response(event: KadResponseMsg, query_id: QueryId) -> HandlerEven } } +struct WakeableVecDeque { + inner: VecDeque, + empty_waker: Option, +} + +impl Default for WakeableVecDeque { + fn default() -> Self { + Self { + inner: Default::default(), + empty_waker: Default::default(), + } + } +} + +impl WakeableVecDeque { + fn push(&mut self, value: T) { + self.inner.push_back(value); + + if let Some(waker) = self.empty_waker.take() { + waker.wake(); + } + } + + fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll { + match self.inner.pop_front() { + Some(value) => Poll::Ready(value), + None => { + self.empty_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + } +} + #[cfg(test)] mod tests { use super::*;