From cccf87beee5090432409f8c221bb3aac095187e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20RIBEAU?= Date: Wed, 29 Nov 2023 16:37:22 +0100 Subject: [PATCH] fix(kad): pushing to `pending_messages` do not wake up `Handler::poll` --- protocols/kad/CHANGELOG.md | 4 ++- protocols/kad/src/handler.rs | 50 +++++++++++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 4740e4b1f958..fe044656e8f6 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,7 +1,9 @@ -## 0.45.2 +## 0.45.2 - unreleased - Ensure `Multiaddr` handled and returned by `Behaviour` are `/p2p` terminated. See [PR 4596](https://github.com/libp2p/rust-libp2p/pull/4596). +- Fix missing wake-up of `Handler` when new messages arrive from the `NetworkBehaviour`. + See [PR 4961](https://github.com/libp2p/rust-libp2p/pull/4961). ## 0.45.1 diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 318261d8d219..ef1cdb0c7de8 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -68,7 +68,7 @@ pub struct Handler { /// List of outbound substreams that are waiting to become active next. /// Contains the request we want to send, and the user data if we expect an answer. - pending_messages: VecDeque<(KadRequestMsg, QueryId)>, + pending_messages: WakeableVecDeque<(KadRequestMsg, QueryId)>, /// List of active inbound substreams with the state they are in. inbound_substreams: SelectAll, @@ -760,11 +760,14 @@ impl ConnectionHandler for Handler { } if self.outbound_substreams.len() < MAX_NUM_STREAMS { - if let Some((msg, id)) = self.pending_messages.pop_front() { - self.queue_new_stream(id, msg); - return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.protocol_config.clone(), ()), - }); + match self.pending_messages.poll_unpin(cx) { + Poll::Ready((msg, id)) => { + self.queue_new_stream(id, msg); + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(self.protocol_config.clone(), ()), + }); + } + Poll::Pending => {} } } @@ -1051,6 +1054,41 @@ fn process_kad_response(event: KadResponseMsg, query_id: QueryId) -> HandlerEven } } +struct WakeableVecDeque { + inner: VecDeque, + empty_waker: Option, +} + +impl Default for WakeableVecDeque { + fn default() -> Self { + Self { + inner: Default::default(), + empty_waker: Default::default(), + } + } +} + +impl WakeableVecDeque { + fn push_back(&mut self, value: T) { + self.inner.push_back(value); + + if let Some(waker) = self.empty_waker.take() { + waker.wake(); + } + } + + #[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] // &mut Context is idiomatic. + fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll { + match self.inner.pop_front() { + Some(value) => Poll::Ready(value), + None => { + self.empty_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + } +} + #[cfg(test)] mod tests { use super::*;