From c4c965a9a452860ebf7dfb92812457845a650316 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 20 Oct 2023 17:54:39 +0300 Subject: [PATCH 01/20] fix(request-response): Remove finished jobs correctly --- protocols/request-response/src/lib.rs | 30 +++++++++++++-------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 1d810274a3a..c55215367a8 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -492,7 +492,7 @@ where .get(peer) .map(|cs| { cs.iter() - .any(|c| c.pending_inbound_responses.contains(request_id)) + .any(|c| c.pending_outbound_responses.contains(request_id)) }) .unwrap_or(false); // Check if request is still pending to be sent. @@ -513,7 +513,7 @@ where .get(peer) .map(|cs| { cs.iter() - .any(|c| c.pending_outbound_responses.contains(request_id)) + .any(|c| c.pending_inbound_responses.contains(request_id)) }) .unwrap_or(false) } @@ -539,7 +539,7 @@ where } let ix = (request.request_id.0 as usize) % connections.len(); let conn = &mut connections[ix]; - conn.pending_inbound_responses.insert(request.request_id); + conn.pending_outbound_responses.insert(request.request_id); self.pending_events.push_back(ToSwarm::NotifyHandler { peer_id: *peer, handler: NotifyHandler::One(conn.id), @@ -576,10 +576,10 @@ where &mut self, peer: &PeerId, connection: ConnectionId, - request: &RequestId, + request: RequestId, ) -> bool { self.get_connection_mut(peer, connection) - .map(|c| c.pending_inbound_responses.remove(request)) + .map(|c| c.pending_inbound_responses.remove(&request)) .unwrap_or(false) } @@ -645,7 +645,7 @@ where self.connected.remove(&peer_id); } - for request_id in connection.pending_outbound_responses { + for request_id in connection.pending_inbound_responses { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { peer: peer_id, @@ -654,7 +654,7 @@ where })); } - for request_id in connection.pending_inbound_responses { + for request_id in connection.pending_outbound_responses { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { peer: peer_id, @@ -698,7 +698,7 @@ where if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) { for request in pending_requests { connection - .pending_inbound_responses + .pending_outbound_responses .insert(request.request_id); handler.on_behaviour_event(request); } @@ -814,7 +814,7 @@ where request_id, response, } => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before receiving response.", @@ -843,7 +843,7 @@ where match self.get_connection_mut(&peer, connection) { Some(connection) => { - let inserted = connection.pending_outbound_responses.insert(request_id); + let inserted = connection.pending_inbound_responses.insert(request_id); debug_assert!(inserted, "Expect id of new request to be unknown."); } // Connection closed after `Event::Request` has been emitted. @@ -859,7 +859,7 @@ where } } handler::Event::ResponseSent(request_id) => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + let removed = self.remove_pending_inbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before response is sent." @@ -872,7 +872,7 @@ where })); } handler::Event::ResponseOmission(request_id) => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + let removed = self.remove_pending_inbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before response is omitted.", @@ -886,7 +886,7 @@ where })); } handler::Event::OutboundTimeout(request_id) => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before request times out." @@ -900,7 +900,7 @@ where })); } handler::Event::OutboundUnsupportedProtocols(request_id) => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( removed, "Expect request_id to be pending before failing to connect.", @@ -925,7 +925,7 @@ where })) } handler::Event::InboundStreamFailed { request_id, error } => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + let removed = self.remove_pending_inbound_response(&peer, connection, request_id); debug_assert!(removed, "Expect request_id to be pending upon failure"); self.pending_events From afee2d41ea723e6d7226509fde314bca45559c2f Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 20 Oct 2023 17:55:12 +0300 Subject: [PATCH 02/20] fix(request-response): Report failures --- protocols/request-response/src/handler.rs | 37 +++++++++++++++++++---- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 8a9d0c749ee..2719ef20dee 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -80,7 +80,14 @@ where inbound_request_id: Arc, - worker_streams: futures_bounded::FuturesMap, io::Error>>, + worker_streams: + futures_bounded::FuturesMap<(RequestId, Direction), Result, io::Error>>, +} + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +enum Direction { + Inbound, + Outbound, } impl Handler @@ -153,7 +160,7 @@ where if self .worker_streams - .try_push(request_id, recv.boxed()) + .try_push((request_id, Direction::Inbound), recv.boxed()) .is_err() { log::warn!("Dropping inbound stream because we are at capacity") @@ -193,7 +200,7 @@ where if self .worker_streams - .try_push(request_id, send.boxed()) + .try_push((request_id, Direction::Outbound), send.boxed()) .is_err() { log::warn!("Dropping outbound stream because we are at capacity") @@ -384,13 +391,31 @@ where loop { match self.worker_streams.poll_unpin(cx) { Poll::Ready((_, Ok(Ok(event)))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } - Poll::Ready((id, Ok(Err(e)))) => { + Poll::Ready(((id, direction), Ok(Err(e)))) => { log::debug!("Stream for request {id} failed: {e}"); + + let event = match direction { + Direction::Inbound => Event::InboundStreamFailed { + request_id: id, + error: e, + }, + Direction::Outbound => Event::OutboundStreamFailed { + request_id: id, + error: e, + }, + }; + + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } - Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => { + Poll::Ready(((id, direction), Err(futures_bounded::Timeout { .. }))) => { log::debug!("Stream for request {id} timed out"); + + if direction == Direction::Outbound { + let event = Event::OutboundTimeout(id); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + } } Poll::Pending => break, } From e447b844a86c2cfd923f66fd71222aea35fa3fd4 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Sat, 21 Oct 2023 00:09:15 +0300 Subject: [PATCH 03/20] fix tests --- protocols/request-response/tests/ping.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index e0424488f48..37f21264d49 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -28,7 +28,7 @@ use libp2p_swarm::{StreamProtocol, Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use rand::{self, Rng}; use serde::{Deserialize, Serialize}; -use std::iter; +use std::{io, iter}; #[async_std::test] #[cfg(feature = "cbor")] @@ -288,7 +288,10 @@ async fn emits_inbound_connection_closed_if_channel_is_dropped() { e => panic!("unexpected event from peer 2: {e:?}"), }; - assert_eq!(error, request_response::OutboundFailure::ConnectionClosed); + assert!(matches!( + error, + request_response::OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof, + )); } // Simple Ping-Pong Protocol From fef7698a65dbf489ce47d36af7209ebb44ed9e86 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Sat, 21 Oct 2023 10:47:35 +0300 Subject: [PATCH 04/20] add tests for error reporting on outbound failures --- .../request-response/tests/error_reporting.rs | 397 ++++++++++++++++++ 1 file changed, 397 insertions(+) create mode 100644 protocols/request-response/tests/error_reporting.rs diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs new file mode 100644 index 00000000000..4afaf0f2ef1 --- /dev/null +++ b/protocols/request-response/tests/error_reporting.rs @@ -0,0 +1,397 @@ +use async_std::channel; +use async_std::future::timeout; +use async_std::task::{sleep, spawn}; +use async_trait::async_trait; +use futures::prelude::*; +use libp2p_request_response as request_response; +use libp2p_request_response::ProtocolSupport; +use libp2p_swarm::{StreamProtocol, Swarm}; +use libp2p_swarm_test::SwarmExt; +use request_response::{Codec, OutboundFailure}; +use std::time::Duration; +use std::{io, iter}; + +#[async_std::test] +async fn report_outbound_failure_on_read_response() { + let _ = env_logger::try_init(); + + let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default(); + + let mut swarm1 = Swarm::new_ephemeral(|_| { + request_response::Behaviour::::new(protocols.clone(), cfg.clone()) + }); + let peer1_id = *swarm1.local_peer_id(); + + let mut swarm2 = Swarm::new_ephemeral(|_| { + request_response::Behaviour::::new(protocols.clone(), cfg.clone()) + }); + let peer2_id = *swarm2.local_peer_id(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // Expects Action::FailOnReadResponse, replies with Action::FailOnReadResponse + let swarm1_task = async move { + loop { + match swarm1.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request, channel, .. + }, + }) => { + assert_eq!(request, Action::FailOnReadResponse); + assert_eq!(&peer, &peer2_id); + swarm1 + .behaviour_mut() + .send_response(channel, Action::FailOnReadResponse) + .unwrap(); + } + Ok(request_response::Event::ResponseSent { peer, .. }) => { + assert_eq!(&peer, &peer2_id); + } + Ok(e) => { + panic!("Peer1: Unexpected event: {e:?}") + } + Err(..) => {} + } + } + }; + + // Expects OutboundFailure::Io failure with `FailOnReadResponse` error + let swarm2_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnReadResponse); + + loop { + match swarm2.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::OutboundFailure { + peer, + request_id, + error, + }) => { + assert_eq!(&peer, &peer1_id); + assert_eq!(request_id, req_id); + let error = match error { + OutboundFailure::Io(e) => e, + e => panic!("Peer2: Unexpected error {e:?}"), + }; + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnReadResponse" + ); + break; + } + Ok(ev) => { + panic!("Peer2: Unexpected event: {ev:?}") + } + Err(..) => {} + } + } + }; + + spawn(swarm1_task); + timeout(Duration::from_millis(100), swarm2_task) + .await + .expect("timed out on waiting FailOnReadResponse"); +} + +#[async_std::test] +async fn report_outbound_failure_on_write_request() { + let _ = env_logger::try_init(); + + let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default(); + + let mut swarm1 = Swarm::new_ephemeral(|_| { + request_response::Behaviour::::new(protocols.clone(), cfg.clone()) + }); + let peer1_id = *swarm1.local_peer_id(); + + let mut swarm2 = Swarm::new_ephemeral(|_| { + request_response::Behaviour::::new(protocols.clone(), cfg.clone()) + }); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // Consume everything + let swarm1_task = async move { + loop { + swarm1.select_next_some().await; + } + }; + + // Expects OutboundFailure::Io failure with `FailOnWriteRequest` error + let swarm2_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnWriteRequest); + + loop { + match swarm2.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::OutboundFailure { + peer, + request_id, + error, + }) => { + assert_eq!(&peer, &peer1_id); + assert_eq!(request_id, req_id); + let error = match error { + OutboundFailure::Io(e) => e, + e => panic!("Peer2: Unexpected error {e:?}"), + }; + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnWriteRequest" + ); + break; + } + Ok(ev) => { + panic!("Peer2: Unexpected event: {ev:?}") + } + Err(..) => {} + } + } + }; + + spawn(swarm1_task); + timeout(Duration::from_millis(100), swarm2_task) + .await + .expect("timed out on waiting FailOnReadResponse"); +} + +#[async_std::test] +async fn report_outbound_timeout_on_read_response() { + let _ = env_logger::try_init(); + + let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default(); + + let mut swarm1 = Swarm::new_ephemeral(|_| { + request_response::Behaviour::::new(protocols.clone(), cfg.clone()) + }); + let peer1_id = *swarm1.local_peer_id(); + + let mut swarm2 = Swarm::new_ephemeral(|_| { + request_response::Behaviour::::new( + protocols.clone(), + cfg.with_request_timeout(Duration::from_millis(100)), + ) + }); + let peer2_id = *swarm2.local_peer_id(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); + + // Expects Action::TimeoutOnReadResponse, replies with Action::TimeoutOnReadResponse + let swarm1_task = async move { + let _panic_check_tx = panic_check_tx; + + loop { + match swarm1.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request, channel, .. + }, + }) => { + assert_eq!(request, Action::TimeoutOnReadResponse); + assert_eq!(&peer, &peer2_id); + swarm1 + .behaviour_mut() + .send_response(channel, Action::TimeoutOnReadResponse) + .unwrap(); + } + Ok(request_response::Event::ResponseSent { peer, .. }) => { + assert_eq!(&peer, &peer2_id); + } + Ok(e) => { + panic!("Peer1: Unexpected event: {e:?}") + } + Err(..) => {} + } + } + }; + + // Expects OutboundFailure::Timeout + let swarm2_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::TimeoutOnReadResponse); + + loop { + match swarm2.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::OutboundFailure { + peer, + request_id, + error, + }) => { + assert_eq!(&peer, &peer1_id); + assert_eq!(request_id, req_id); + assert!(matches!(error, OutboundFailure::Timeout)); + break; + } + Ok(ev) => { + panic!("Peer2: Unexpected event: {ev:?}") + } + Err(..) => {} + } + } + }; + + spawn(swarm1_task); + timeout(Duration::from_millis(200), swarm2_task) + .await + .expect("timed out on waiting TimeoutOnReadResponse"); + + // Make sure panic wasn't a side effect of by a panic + assert!(!panic_check_rx.is_closed(), "swarm1_task panicked"); +} + +#[derive(Clone, Default)] +struct TestCodec; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Action { + FailOnReadRequest, + FailOnReadResponse, + FailOnWriteRequest, + FailOnWriteResponse, + TimeoutOnReadRequest, + TimeoutOnReadResponse, +} + +impl From for u8 { + fn from(value: Action) -> Self { + match value { + Action::FailOnReadRequest => 0, + Action::FailOnReadResponse => 1, + Action::FailOnWriteRequest => 2, + Action::FailOnWriteResponse => 3, + Action::TimeoutOnReadRequest => 4, + Action::TimeoutOnReadResponse => 5, + } + } +} + +impl TryFrom for Action { + type Error = io::Error; + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(Action::FailOnReadRequest), + 1 => Ok(Action::FailOnReadResponse), + 2 => Ok(Action::FailOnWriteRequest), + 3 => Ok(Action::FailOnWriteResponse), + 4 => Ok(Action::TimeoutOnReadRequest), + 5 => Ok(Action::TimeoutOnReadResponse), + _ => Err(io::Error::new(io::ErrorKind::Other, "invalid action")), + } + } +} + +#[async_trait] +impl Codec for TestCodec { + type Protocol = StreamProtocol; + type Request = Action; + type Response = Action; + + async fn read_request( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + assert_eq!(buf.len(), 1); + + match buf[0].try_into()? { + Action::FailOnReadRequest => { + Err(io::Error::new(io::ErrorKind::Other, "FailOnReadRequest")) + } + Action::TimeoutOnReadRequest => loop { + sleep(Duration::MAX).await; + }, + action => Ok(action), + } + } + + async fn read_response( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + assert_eq!(buf.len(), 1); + + match buf[0].try_into()? { + Action::FailOnReadResponse => { + Err(io::Error::new(io::ErrorKind::Other, "FailOnReadResponse")) + } + Action::TimeoutOnReadResponse => loop { + sleep(Duration::MAX).await; + }, + action => Ok(action), + } + } + + async fn write_request( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + // Even on FailOnWriteRequest we write to the stream, this is + // because we don't want the other read_request to fail on the + // other end. + let bytes = [req.into()]; + io.write_all(&bytes).await?; + io.flush().await?; + + if req == Action::FailOnWriteRequest { + Err(io::Error::new(io::ErrorKind::Other, "FailOnWriteRequest")) + } else { + Ok(()) + } + } + + async fn write_response( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + match res { + Action::FailOnWriteResponse => { + Err(io::Error::new(io::ErrorKind::Other, "FailOnWriteResponse")) + } + action => { + let bytes = [action.into()]; + io.write_all(&bytes).await?; + Ok(()) + } + } + } +} From 1dbb5bc84b759346861fa5d64c3e5b811e4cf318 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Mon, 23 Oct 2023 17:21:51 +0300 Subject: [PATCH 05/20] improve tests, implement the first inbound test, fix inbound failure before .await point --- protocols/request-response/src/handler.rs | 28 ++- protocols/request-response/src/lib.rs | 26 +- .../request-response/tests/error_reporting.rs | 229 +++++++++--------- 3 files changed, 146 insertions(+), 137 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 2719ef20dee..0d15ac8fdfe 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -161,8 +161,11 @@ where if self .worker_streams .try_push((request_id, Direction::Inbound), recv.boxed()) - .is_err() + .is_ok() { + self.pending_events + .push_back(Event::IncomingRequest { request_id }); + } else { log::warn!("Dropping inbound stream because we are at capacity") } } @@ -259,6 +262,8 @@ pub enum Event where TCodec: Codec, { + /// A request is going to be received. + IncomingRequest { request_id: RequestId }, /// A request has been received. Request { request_id: RequestId, @@ -293,6 +298,10 @@ where impl fmt::Debug for Event { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { + Event::IncomingRequest { request_id } => f + .debug_struct("Event::IncomingRequest") + .field("request_id", request_id) + .finish(), Event::Request { request_id, request: _, @@ -388,6 +397,18 @@ where cx: &mut Context<'_>, ) -> Poll, (), Self::ToBehaviour, Self::Error>> { + // Drain pending events that were produced before poll. + // E.g. `Event::IncomingRequest` produced by `on_fully_negotiated_inbound`. + // + // NOTE: This is needed because if `read_request` fails before reaching a + // `.await` point, the incoming request will never register and `debug_assert` + // in `InboundStreamFailed` will panic. + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { + self.pending_events.shrink_to_fit(); + } + loop { match self.worker_streams.poll_unpin(cx) { Poll::Ready((_, Ok(Ok(event)))) => { @@ -407,6 +428,9 @@ where }, }; + // TODO: How should we handle errors produced after ConnectionClose event? + // `ConnectionClose` will generate its own error. But only one of the two + // should be forwarded to the upper layer. return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } Poll::Ready(((id, direction), Err(futures_bounded::Timeout { .. }))) => { @@ -421,7 +445,7 @@ where } } - // Drain pending events. + // Drain pending events that were produced by `worker_streams`. if let Some(event) = self.pending_events.pop_front() { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index c55215367a8..51de0f71e95 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -827,6 +827,15 @@ where self.pending_events .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); } + handler::Event::IncomingRequest { request_id } => { + // This event was emmited before `Handler::pool` and it is handled before + // its task gets polled (i.e. `worker_tasks`). That means at this point no + // error should be emmited, because it will be generated by the task itself. + if let Some(connection) = self.get_connection_mut(&peer, connection) { + let inserted = connection.pending_inbound_responses.insert(request_id); + debug_assert!(inserted, "Expect id of new request to be unknown."); + } + } handler::Event::Request { request_id, request, @@ -840,23 +849,6 @@ where }; self.pending_events .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); - - match self.get_connection_mut(&peer, connection) { - Some(connection) => { - let inserted = connection.pending_inbound_responses.insert(request_id); - debug_assert!(inserted, "Expect id of new request to be unknown."); - } - // Connection closed after `Event::Request` has been emitted. - None => { - self.pending_events.push_back(ToSwarm::GenerateEvent( - Event::InboundFailure { - peer, - request_id, - error: InboundFailure::ConnectionClosed, - }, - )); - } - } } handler::Event::ResponseSent(request_id) => { let removed = self.remove_pending_inbound_response(&peer, connection, request_id); diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index 4afaf0f2ef1..292e6cc4f79 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -7,7 +7,7 @@ use libp2p_request_response as request_response; use libp2p_request_response::ProtocolSupport; use libp2p_swarm::{StreamProtocol, Swarm}; use libp2p_swarm_test::SwarmExt; -use request_response::{Codec, OutboundFailure}; +use request_response::{Codec, InboundFailure, OutboundFailure}; use std::time::Duration; use std::{io, iter}; @@ -17,43 +17,36 @@ async fn report_outbound_failure_on_read_response() { let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); let cfg = request_response::Config::default(); + let codec = TestCodec(Action::FailOnReadResponse); let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::::new(protocols.clone(), cfg.clone()) + request_response::Behaviour::with_codec(codec.clone(), protocols.clone(), cfg.clone()) }); let peer1_id = *swarm1.local_peer_id(); - let mut swarm2 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::::new(protocols.clone(), cfg.clone()) - }); + let mut swarm2 = + Swarm::new_ephemeral(|_| request_response::Behaviour::with_codec(codec, protocols, cfg)); let peer2_id = *swarm2.local_peer_id(); swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // Expects Action::FailOnReadResponse, replies with Action::FailOnReadResponse let swarm1_task = async move { loop { match swarm1.select_next_some().await.try_into_behaviour_event() { Ok(request_response::Event::Message { peer, - message: - request_response::Message::Request { - request, channel, .. - }, + message: request_response::Message::Request { channel, .. }, }) => { - assert_eq!(request, Action::FailOnReadResponse); assert_eq!(&peer, &peer2_id); - swarm1 - .behaviour_mut() - .send_response(channel, Action::FailOnReadResponse) - .unwrap(); + swarm1.behaviour_mut().send_response(channel, ()).unwrap(); } Ok(request_response::Event::ResponseSent { peer, .. }) => { assert_eq!(&peer, &peer2_id); + break; } - Ok(e) => { - panic!("Peer1: Unexpected event: {e:?}") + Ok(ev) => { + panic!("Peer1: Unexpected event: {ev:?}") } Err(..) => {} } @@ -62,9 +55,7 @@ async fn report_outbound_failure_on_read_response() { // Expects OutboundFailure::Io failure with `FailOnReadResponse` error let swarm2_task = async move { - let req_id = swarm2 - .behaviour_mut() - .send_request(&peer1_id, Action::FailOnReadResponse); + let req_id = swarm2.behaviour_mut().send_request(&peer1_id, ()); loop { match swarm2.select_next_some().await.try_into_behaviour_event() { @@ -94,10 +85,13 @@ async fn report_outbound_failure_on_read_response() { } }; - spawn(swarm1_task); + let join_handle = spawn(swarm1_task); + timeout(Duration::from_millis(100), swarm2_task) .await .expect("timed out on waiting FailOnReadResponse"); + + join_handle.await; } #[async_std::test] @@ -106,31 +100,29 @@ async fn report_outbound_failure_on_write_request() { let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); let cfg = request_response::Config::default(); + let codec = TestCodec(Action::FailOnWriteRequest); let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::::new(protocols.clone(), cfg.clone()) + request_response::Behaviour::with_codec(codec.clone(), protocols.clone(), cfg.clone()) }); let peer1_id = *swarm1.local_peer_id(); - let mut swarm2 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::::new(protocols.clone(), cfg.clone()) - }); + let mut swarm2 = + Swarm::new_ephemeral(|_| request_response::Behaviour::with_codec(codec, protocols, cfg)); swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // Consume everything let swarm1_task = async move { + // No need to take any actions, just consume everything. loop { swarm1.select_next_some().await; } }; - // Expects OutboundFailure::Io failure with `FailOnWriteRequest` error + // Expects OutboundFailure::Io failure with `FailOnWriteRequest` error. let swarm2_task = async move { - let req_id = swarm2 - .behaviour_mut() - .send_request(&peer1_id, Action::FailOnWriteRequest); + let req_id = swarm2.behaviour_mut().send_request(&peer1_id, ()); loop { match swarm2.select_next_some().await.try_into_behaviour_event() { @@ -172,17 +164,16 @@ async fn report_outbound_timeout_on_read_response() { let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); let cfg = request_response::Config::default(); + let codec = TestCodec(Action::TimeoutOnReadResponse); let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::::new(protocols.clone(), cfg.clone()) + request_response::Behaviour::with_codec(codec.clone(), protocols.clone(), cfg.clone()) }); let peer1_id = *swarm1.local_peer_id(); let mut swarm2 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::::new( - protocols.clone(), - cfg.with_request_timeout(Duration::from_millis(100)), - ) + let cfg = cfg.with_request_timeout(Duration::from_millis(100)); + request_response::Behaviour::with_codec(codec, protocols, cfg) }); let peer2_id = *swarm2.local_peer_id(); @@ -191,31 +182,25 @@ async fn report_outbound_timeout_on_read_response() { let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); - // Expects Action::TimeoutOnReadResponse, replies with Action::TimeoutOnReadResponse let swarm1_task = async move { + // Connection needs to be kept alive, so the folloing loop should not break. + // This channel is used to check if `swarm1_task` panicked or not. let _panic_check_tx = panic_check_tx; loop { match swarm1.select_next_some().await.try_into_behaviour_event() { Ok(request_response::Event::Message { peer, - message: - request_response::Message::Request { - request, channel, .. - }, + message: request_response::Message::Request { channel, .. }, }) => { - assert_eq!(request, Action::TimeoutOnReadResponse); assert_eq!(&peer, &peer2_id); - swarm1 - .behaviour_mut() - .send_response(channel, Action::TimeoutOnReadResponse) - .unwrap(); + swarm1.behaviour_mut().send_response(channel, ()).unwrap(); } Ok(request_response::Event::ResponseSent { peer, .. }) => { assert_eq!(&peer, &peer2_id); } - Ok(e) => { - panic!("Peer1: Unexpected event: {e:?}") + Ok(ev) => { + panic!("Peer1: Unexpected event: {ev:?}") } Err(..) => {} } @@ -224,9 +209,7 @@ async fn report_outbound_timeout_on_read_response() { // Expects OutboundFailure::Timeout let swarm2_task = async move { - let req_id = swarm2 - .behaviour_mut() - .send_request(&peer1_id, Action::TimeoutOnReadResponse); + let req_id = swarm2.behaviour_mut().send_request(&peer1_id, ()); loop { match swarm2.select_next_some().await.try_into_behaviour_event() { @@ -257,8 +240,65 @@ async fn report_outbound_timeout_on_read_response() { assert!(!panic_check_rx.is_closed(), "swarm1_task panicked"); } -#[derive(Clone, Default)] -struct TestCodec; +#[async_std::test] +async fn report_inbound_failure_on_read_request() { + let _ = env_logger::try_init(); + + let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default(); + let codec = TestCodec(Action::FailOnReadRequest); + + let mut swarm1 = Swarm::new_ephemeral(|_| { + request_response::Behaviour::with_codec(codec.clone(), protocols.clone(), cfg.clone()) + }); + let peer1_id = *swarm1.local_peer_id(); + + let mut swarm2 = + Swarm::new_ephemeral(|_| request_response::Behaviour::with_codec(codec, protocols, cfg)); + let peer2_id = *swarm2.local_peer_id(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // Expects OutboundFailure::Io failure with `FailOnReadRequest` error + let swarm1_task = async move { + loop { + match swarm1.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::InboundFailure { peer, error, .. }) => { + assert_eq!(peer, peer2_id); + + let error = match error { + InboundFailure::Io(e) => e, + e => panic!("Peer1: Unexpected error {e:?}"), + }; + + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!(error.into_inner().unwrap().to_string(), "FailOnReadRequest"); + break; + } + Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), + Err(..) => {} + } + } + }; + + let swarm2_task = async move { + let _req_id = swarm2.behaviour_mut().send_request(&peer1_id, ()); + + // No need to take any actions, just consume everything. + loop { + swarm2.select_next_some().await; + } + }; + + spawn(swarm2_task); + timeout(Duration::from_millis(100), swarm1_task) + .await + .expect("timed out on waiting FailOnWriteRequest"); +} + +#[derive(Clone)] +struct TestCodec(Action); #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum Action { @@ -270,128 +310,81 @@ enum Action { TimeoutOnReadResponse, } -impl From for u8 { - fn from(value: Action) -> Self { - match value { - Action::FailOnReadRequest => 0, - Action::FailOnReadResponse => 1, - Action::FailOnWriteRequest => 2, - Action::FailOnWriteResponse => 3, - Action::TimeoutOnReadRequest => 4, - Action::TimeoutOnReadResponse => 5, - } - } -} - -impl TryFrom for Action { - type Error = io::Error; - - fn try_from(value: u8) -> Result { - match value { - 0 => Ok(Action::FailOnReadRequest), - 1 => Ok(Action::FailOnReadResponse), - 2 => Ok(Action::FailOnWriteRequest), - 3 => Ok(Action::FailOnWriteResponse), - 4 => Ok(Action::TimeoutOnReadRequest), - 5 => Ok(Action::TimeoutOnReadResponse), - _ => Err(io::Error::new(io::ErrorKind::Other, "invalid action")), - } - } -} - #[async_trait] impl Codec for TestCodec { type Protocol = StreamProtocol; - type Request = Action; - type Response = Action; + type Request = (); + type Response = (); async fn read_request( &mut self, _protocol: &Self::Protocol, - io: &mut T, + _io: &mut T, ) -> io::Result where T: AsyncRead + Unpin + Send, { - let mut buf = Vec::new(); - io.read_to_end(&mut buf).await?; - assert_eq!(buf.len(), 1); - - match buf[0].try_into()? { + match self.0 { Action::FailOnReadRequest => { Err(io::Error::new(io::ErrorKind::Other, "FailOnReadRequest")) } Action::TimeoutOnReadRequest => loop { sleep(Duration::MAX).await; }, - action => Ok(action), + _ => Ok(()), } } async fn read_response( &mut self, _protocol: &Self::Protocol, - io: &mut T, + _io: &mut T, ) -> io::Result where T: AsyncRead + Unpin + Send, { - let mut buf = Vec::new(); - io.read_to_end(&mut buf).await?; - assert_eq!(buf.len(), 1); - - match buf[0].try_into()? { + match self.0 { Action::FailOnReadResponse => { Err(io::Error::new(io::ErrorKind::Other, "FailOnReadResponse")) } Action::TimeoutOnReadResponse => loop { sleep(Duration::MAX).await; }, - action => Ok(action), + _ => Ok(()), } } async fn write_request( &mut self, _protocol: &Self::Protocol, - io: &mut T, - req: Self::Request, + _io: &mut T, + _req: Self::Request, ) -> io::Result<()> where T: AsyncWrite + Unpin + Send, { - // Even on FailOnWriteRequest we write to the stream, this is - // because we don't want the other read_request to fail on the - // other end. - let bytes = [req.into()]; - io.write_all(&bytes).await?; - io.flush().await?; - - if req == Action::FailOnWriteRequest { - Err(io::Error::new(io::ErrorKind::Other, "FailOnWriteRequest")) - } else { - Ok(()) + match self.0 { + Action::FailOnWriteRequest => { + Err(io::Error::new(io::ErrorKind::Other, "FailOnWriteRequest")) + } + _ => Ok(()), } } async fn write_response( &mut self, _protocol: &Self::Protocol, - io: &mut T, - res: Self::Response, + _io: &mut T, + _res: Self::Response, ) -> io::Result<()> where T: AsyncWrite + Unpin + Send, { - match res { + match self.0 { Action::FailOnWriteResponse => { Err(io::Error::new(io::ErrorKind::Other, "FailOnWriteResponse")) } - action => { - let bytes = [action.into()]; - io.write_all(&bytes).await?; - Ok(()) - } + _ => Ok(()), } } } From 845b50e3c119f8c36a0167960da1793d0d8ea5b1 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Mon, 23 Oct 2023 22:37:44 +0300 Subject: [PATCH 06/20] typesafe request IDs --- examples/file-sharing/src/network.rs | 4 +- protocols/autonat/src/behaviour.rs | 6 +- protocols/autonat/src/behaviour/as_client.rs | 6 +- protocols/autonat/src/behaviour/as_server.rs | 4 +- protocols/perf/src/client.rs | 6 +- protocols/rendezvous/src/client.rs | 14 ++-- protocols/request-response/src/handler.rs | 70 ++++++++++--------- protocols/request-response/src/lib.rs | 72 +++++++++++--------- 8 files changed, 100 insertions(+), 82 deletions(-) diff --git a/examples/file-sharing/src/network.rs b/examples/file-sharing/src/network.rs index d6adea0ccd6..ffef85d75ae 100644 --- a/examples/file-sharing/src/network.rs +++ b/examples/file-sharing/src/network.rs @@ -8,7 +8,7 @@ use libp2p::{ identity, kad, multiaddr::Protocol, noise, - request_response::{self, ProtocolSupport, RequestId, ResponseChannel}, + request_response::{self, OutboundRequestId, ProtocolSupport, ResponseChannel}, swarm::{NetworkBehaviour, Swarm, SwarmEvent}, tcp, yamux, PeerId, }; @@ -175,7 +175,7 @@ pub(crate) struct EventLoop { pending_start_providing: HashMap>, pending_get_providers: HashMap>>, pending_request_file: - HashMap, Box>>>, + HashMap, Box>>>, } impl EventLoop { diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 5494ed336bf..f0184fd2fba 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -32,7 +32,7 @@ use instant::Instant; use libp2p_core::{multiaddr::Protocol, ConnectedPoint, Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_request_response::{ - self as request_response, ProtocolSupport, RequestId, ResponseChannel, + self as request_response, InboundRequestId, OutboundRequestId, ProtocolSupport, ResponseChannel, }; use libp2p_swarm::{ behaviour::{ @@ -187,14 +187,14 @@ pub struct Behaviour { PeerId, ( ProbeId, - RequestId, + InboundRequestId, Vec, ResponseChannel, ), >, // Ongoing outbound probes and mapped to the inner request id. - ongoing_outbound: HashMap, + ongoing_outbound: HashMap, // Connected peers with the observed address of each connection. // If the endpoint of a connection is relayed or not global (in case of Config::only_global_ips), diff --git a/protocols/autonat/src/behaviour/as_client.rs b/protocols/autonat/src/behaviour/as_client.rs index 5c6194491e4..a8b5a753a80 100644 --- a/protocols/autonat/src/behaviour/as_client.rs +++ b/protocols/autonat/src/behaviour/as_client.rs @@ -29,7 +29,7 @@ use futures_timer::Delay; use instant::Instant; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; -use libp2p_request_response::{self as request_response, OutboundFailure, RequestId}; +use libp2p_request_response::{self as request_response, OutboundFailure, OutboundRequestId}; use libp2p_swarm::{ConnectionId, ListenAddresses, PollParameters, ToSwarm}; use rand::{seq::SliceRandom, thread_rng}; use std::{ @@ -91,7 +91,7 @@ pub(crate) struct AsClient<'a> { pub(crate) throttled_servers: &'a mut Vec<(PeerId, Instant)>, pub(crate) nat_status: &'a mut NatStatus, pub(crate) confidence: &'a mut usize, - pub(crate) ongoing_outbound: &'a mut HashMap, + pub(crate) ongoing_outbound: &'a mut HashMap, pub(crate) last_probe: &'a mut Option, pub(crate) schedule_probe: &'a mut Delay, pub(crate) listen_addresses: &'a ListenAddresses, @@ -118,7 +118,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> { let probe_id = self .ongoing_outbound .remove(&request_id) - .expect("RequestId exists."); + .expect("OutboundRequestId exists."); let event = match response.result.clone() { Ok(address) => OutboundProbeEvent::Response { diff --git a/protocols/autonat/src/behaviour/as_server.rs b/protocols/autonat/src/behaviour/as_server.rs index 0cbe83f1245..df6e30c318a 100644 --- a/protocols/autonat/src/behaviour/as_server.rs +++ b/protocols/autonat/src/behaviour/as_server.rs @@ -26,7 +26,7 @@ use instant::Instant; use libp2p_core::{multiaddr::Protocol, Multiaddr}; use libp2p_identity::PeerId; use libp2p_request_response::{ - self as request_response, InboundFailure, RequestId, ResponseChannel, + self as request_response, InboundFailure, InboundRequestId, ResponseChannel, }; use libp2p_swarm::{ dial_opts::{DialOpts, PeerCondition}, @@ -85,7 +85,7 @@ pub(crate) struct AsServer<'a> { PeerId, ( ProbeId, - RequestId, + InboundRequestId, Vec, ResponseChannel, ), diff --git a/protocols/perf/src/client.rs b/protocols/perf/src/client.rs index 6e34a9072f4..670b0e9d299 100644 --- a/protocols/perf/src/client.rs +++ b/protocols/perf/src/client.rs @@ -37,10 +37,10 @@ use crate::{protocol::Response, RunDuration, RunParams}; /// Connection identifier. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] -pub struct RunId(request_response::RequestId); +pub struct RunId(request_response::OutboundRequestId); -impl From for RunId { - fn from(value: request_response::RequestId) -> Self { +impl From for RunId { + fn from(value: request_response::OutboundRequestId) -> Self { Self(value) } } diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index 8459dc21c7e..ac2eba05829 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -26,7 +26,7 @@ use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use libp2p_core::{Endpoint, Multiaddr, PeerRecord}; use libp2p_identity::{Keypair, PeerId, SigningError}; -use libp2p_request_response::{ProtocolSupport, RequestId}; +use libp2p_request_response::{OutboundRequestId, ProtocolSupport}; use libp2p_swarm::{ ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, @@ -41,8 +41,8 @@ pub struct Behaviour { keypair: Keypair, - waiting_for_register: HashMap, - waiting_for_discovery: HashMap)>, + waiting_for_register: HashMap, + waiting_for_discovery: HashMap)>, /// Hold addresses of all peers that we have discovered so far. /// @@ -337,7 +337,7 @@ impl NetworkBehaviour for Behaviour { } impl Behaviour { - fn event_for_outbound_failure(&mut self, req_id: &RequestId) -> Option { + fn event_for_outbound_failure(&mut self, req_id: &OutboundRequestId) -> Option { if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) { return Some(Event::RegisterFailed { rendezvous_node, @@ -357,7 +357,11 @@ impl Behaviour { None } - fn handle_response(&mut self, request_id: &RequestId, response: Message) -> Option { + fn handle_response( + &mut self, + request_id: &OutboundRequestId, + response: Message, + ) -> Option { match response { RegisterResponse(Ok(ttl)) => { if let Some((rendezvous_node, namespace)) = diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 0d15ac8fdfe..8995590a4ad 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -24,7 +24,7 @@ pub use protocol::ProtocolSupport; use crate::codec::Codec; use crate::handler::protocol::Protocol; -use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; +use crate::{InboundRequestId, OutboundRequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use futures::channel::mpsc; use futures::{channel::oneshot, prelude::*}; @@ -67,27 +67,26 @@ where requested_outbound: VecDeque>, /// A channel for receiving inbound requests. inbound_receiver: mpsc::Receiver<( - RequestId, + InboundRequestId, TCodec::Request, oneshot::Sender, )>, /// The [`mpsc::Sender`] for the above receiver. Cloned for each inbound request. inbound_sender: mpsc::Sender<( - RequestId, + InboundRequestId, TCodec::Request, oneshot::Sender, )>, inbound_request_id: Arc, - worker_streams: - futures_bounded::FuturesMap<(RequestId, Direction), Result, io::Error>>, + worker_streams: futures_bounded::FuturesMap, io::Error>>, } -#[derive(Clone, Copy, PartialEq, Eq, Hash)] -enum Direction { - Inbound, - Outbound, +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +enum RequestId { + Inbound(InboundRequestId), + Outbound(OutboundRequestId), } impl Handler @@ -130,7 +129,7 @@ where >, ) { let mut codec = self.codec.clone(); - let request_id = RequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)); + let request_id = InboundRequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)); let mut sender = self.inbound_sender.clone(); let recv = async move { @@ -160,7 +159,7 @@ where if self .worker_streams - .try_push((request_id, Direction::Inbound), recv.boxed()) + .try_push(RequestId::Inbound(request_id), recv.boxed()) .is_ok() { self.pending_events @@ -203,7 +202,7 @@ where if self .worker_streams - .try_push((request_id, Direction::Outbound), send.boxed()) + .try_push(RequestId::Outbound(request_id), send.boxed()) .is_err() { log::warn!("Dropping outbound stream because we are at capacity") @@ -263,34 +262,34 @@ where TCodec: Codec, { /// A request is going to be received. - IncomingRequest { request_id: RequestId }, + IncomingRequest { request_id: InboundRequestId }, /// A request has been received. Request { - request_id: RequestId, + request_id: InboundRequestId, request: TCodec::Request, sender: oneshot::Sender, }, /// A response has been received. Response { - request_id: RequestId, + request_id: OutboundRequestId, response: TCodec::Response, }, /// A response to an inbound request has been sent. - ResponseSent(RequestId), + ResponseSent(InboundRequestId), /// A response to an inbound request was omitted as a result /// of dropping the response `sender` of an inbound `Request`. - ResponseOmission(RequestId), + ResponseOmission(InboundRequestId), /// An outbound request timed out while sending the request /// or waiting for the response. - OutboundTimeout(RequestId), + OutboundTimeout(OutboundRequestId), /// An outbound request failed to negotiate a mutually supported protocol. - OutboundUnsupportedProtocols(RequestId), + OutboundUnsupportedProtocols(OutboundRequestId), OutboundStreamFailed { - request_id: RequestId, + request_id: OutboundRequestId, error: io::Error, }, InboundStreamFailed { - request_id: RequestId, + request_id: InboundRequestId, error: io::Error, }, } @@ -348,7 +347,7 @@ impl fmt::Debug for Event { } pub struct OutboundMessage { - pub(crate) request_id: RequestId, + pub(crate) request_id: OutboundRequestId, pub(crate) request: TCodec::Request, pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, } @@ -414,15 +413,15 @@ where Poll::Ready((_, Ok(Ok(event)))) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } - Poll::Ready(((id, direction), Ok(Err(e)))) => { - log::debug!("Stream for request {id} failed: {e}"); + Poll::Ready((id, Ok(Err(e)))) => { + log::debug!("Stream for request {id:?} failed: {e}"); - let event = match direction { - Direction::Inbound => Event::InboundStreamFailed { + let event = match id { + RequestId::Inbound(id) => Event::InboundStreamFailed { request_id: id, error: e, }, - Direction::Outbound => Event::OutboundStreamFailed { + RequestId::Outbound(id) => Event::OutboundStreamFailed { request_id: id, error: e, }, @@ -433,12 +432,17 @@ where // should be forwarded to the upper layer. return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } - Poll::Ready(((id, direction), Err(futures_bounded::Timeout { .. }))) => { - log::debug!("Stream for request {id} timed out"); - - if direction == Direction::Outbound { - let event = Event::OutboundTimeout(id); - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => { + log::debug!("Stream for request {id:?} timed out"); + + match id { + RequestId::Inbound(_id) => { + // TODO + } + RequestId::Outbound(id) => { + let event = Event::OutboundTimeout(id); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + } } } Poll::Pending => break, diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 51de0f71e95..295d3b56ff3 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -102,7 +102,7 @@ pub enum Message { /// A request message. Request { /// The ID of this request. - request_id: RequestId, + request_id: InboundRequestId, /// The request message. request: TRequest, /// The channel waiting for the response. @@ -117,7 +117,7 @@ pub enum Message { /// The ID of the request that produced this response. /// /// See [`Behaviour::send_request`]. - request_id: RequestId, + request_id: OutboundRequestId, /// The response message. response: TResponse, }, @@ -138,7 +138,7 @@ pub enum Event { /// The peer to whom the request was sent. peer: PeerId, /// The (local) ID of the failed request. - request_id: RequestId, + request_id: OutboundRequestId, /// The error that occurred. error: OutboundFailure, }, @@ -147,7 +147,7 @@ pub enum Event { /// The peer from whom the request was received. peer: PeerId, /// The ID of the failed inbound request. - request_id: RequestId, + request_id: InboundRequestId, /// The error that occurred. error: InboundFailure, }, @@ -159,7 +159,7 @@ pub enum Event { /// The peer to whom the response was sent. peer: PeerId, /// The ID of the inbound request whose response was sent. - request_id: RequestId, + request_id: InboundRequestId, }, } @@ -270,17 +270,27 @@ impl ResponseChannel { } } -/// The ID of an inbound or outbound request. +/// The ID of an inbound request. /// -/// Note: [`RequestId`]'s uniqueness is only guaranteed between two -/// inbound and likewise between two outbound requests. There is no -/// uniqueness guarantee in a set of both inbound and outbound -/// [`RequestId`]s nor in a set of inbound or outbound requests -/// originating from different [`Behaviour`]'s. +/// Note: [`InboundRequestId`]'s uniqueness is only guaranteed between +/// two inbound requests of the same originating [`Behaviour`]. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct RequestId(u64); +pub struct InboundRequestId(u64); -impl fmt::Display for RequestId { +impl fmt::Display for InboundRequestId { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +/// The ID of an outbound request. +/// +/// Note: [`OutboundRequestId`]'s uniqueness is only guaranteed between +/// two outbound requests of the same originating [`Behaviour`]. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct OutboundRequestId(u64); + +impl fmt::Display for OutboundRequestId { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.0) } @@ -333,9 +343,9 @@ where /// The supported outbound protocols. outbound_protocols: SmallVec<[TCodec::Protocol; 2]>, /// The next (local) request ID. - next_request_id: RequestId, + next_outbound_request_id: OutboundRequestId, /// The next (inbound) request ID. - next_inbound_id: Arc, + next_inbound_request_id: Arc, /// The protocol configuration. config: Config, /// The protocol codec for reading and writing requests and responses. @@ -389,8 +399,8 @@ where Behaviour { inbound_protocols, outbound_protocols, - next_request_id: RequestId(1), - next_inbound_id: Arc::new(AtomicU64::new(1)), + next_outbound_request_id: OutboundRequestId(1), + next_inbound_request_id: Arc::new(AtomicU64::new(1)), config: cfg, codec, pending_events: VecDeque::new(), @@ -412,8 +422,8 @@ where /// > address discovery, or known addresses of peers must be /// > managed via [`Behaviour::add_address`] and /// > [`Behaviour::remove_address`]. - pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId { - let request_id = self.next_request_id(); + pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId { + let request_id = self.next_outbound_request_id(); let request = OutboundMessage { request_id, request, @@ -485,7 +495,7 @@ where /// Checks whether an outbound request to the peer with the provided /// [`PeerId`] initiated by [`Behaviour::send_request`] is still /// pending, i.e. waiting for a response. - pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &RequestId) -> bool { + pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &OutboundRequestId) -> bool { // Check if request is already sent on established connection. let est_conn = self .connected @@ -508,7 +518,7 @@ where /// Checks whether an inbound request from the peer with the provided /// [`PeerId`] is still pending, i.e. waiting for a response by the local /// node through [`Behaviour::send_response`]. - pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &RequestId) -> bool { + pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &InboundRequestId) -> bool { self.connected .get(peer) .map(|cs| { @@ -518,10 +528,10 @@ where .unwrap_or(false) } - /// Returns the next request ID. - fn next_request_id(&mut self) -> RequestId { - let request_id = self.next_request_id; - self.next_request_id.0 += 1; + /// Returns the next outbound request ID. + fn next_outbound_request_id(&mut self) -> OutboundRequestId { + let request_id = self.next_outbound_request_id; + self.next_outbound_request_id.0 += 1; request_id } @@ -560,7 +570,7 @@ where &mut self, peer: &PeerId, connection: ConnectionId, - request: RequestId, + request: OutboundRequestId, ) -> bool { self.get_connection_mut(peer, connection) .map(|c| c.pending_outbound_responses.remove(&request)) @@ -576,7 +586,7 @@ where &mut self, peer: &PeerId, connection: ConnectionId, - request: RequestId, + request: InboundRequestId, ) -> bool { self.get_connection_mut(peer, connection) .map(|c| c.pending_inbound_responses.remove(&request)) @@ -726,7 +736,7 @@ where self.inbound_protocols.clone(), self.codec.clone(), self.config.request_timeout, - self.next_inbound_id.clone(), + self.next_inbound_request_id.clone(), self.config.max_concurrent_streams, ); @@ -769,7 +779,7 @@ where self.inbound_protocols.clone(), self.codec.clone(), self.config.request_timeout, - self.next_inbound_id.clone(), + self.next_inbound_request_id.clone(), self.config.max_concurrent_streams, ); @@ -958,10 +968,10 @@ struct Connection { /// Pending outbound responses where corresponding inbound requests have /// been received on this connection and emitted via `poll` but have not yet /// been answered. - pending_outbound_responses: HashSet, + pending_outbound_responses: HashSet, /// Pending inbound responses for previously sent requests on this /// connection. - pending_inbound_responses: HashSet, + pending_inbound_responses: HashSet, } impl Connection { From a4abd739a910a69361a070044eae022972e57fdc Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Tue, 24 Oct 2023 00:08:03 +0300 Subject: [PATCH 07/20] fix autonat tests --- protocols/autonat/tests/test_client.rs | 8 ++++---- protocols/autonat/tests/test_server.rs | 9 ++++++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/protocols/autonat/tests/test_client.rs b/protocols/autonat/tests/test_client.rs index 1911d1a6b2d..743f4cc1b51 100644 --- a/protocols/autonat/tests/test_client.rs +++ b/protocols/autonat/tests/test_client.rs @@ -61,7 +61,7 @@ async fn test_auto_probe() { match client.next_behaviour_event().await { Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => { assert!(peer.is_none()); - assert_eq!(error, OutboundProbeError::NoAddresses); + assert!(matches!(error, OutboundProbeError::NoAddresses)); } other => panic!("Unexpected behaviour event: {other:?}."), } @@ -181,10 +181,10 @@ async fn test_confidence() { peer, error, } if !test_public => { - assert_eq!( + assert!(matches!( error, OutboundProbeError::Response(ResponseError::DialError) - ); + )); (peer.unwrap(), probe_id) } other => panic!("Unexpected Outbound Event: {other:?}"), @@ -261,7 +261,7 @@ async fn test_throttle_server_period() { match client.next_behaviour_event().await { Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => { assert!(peer.is_none()); - assert_eq!(error, OutboundProbeError::NoServer); + assert!(matches!(error, OutboundProbeError::NoServer)); } other => panic!("Unexpected behaviour event: {other:?}."), } diff --git a/protocols/autonat/tests/test_server.rs b/protocols/autonat/tests/test_server.rs index 1bb5f624793..fa08bbf3471 100644 --- a/protocols/autonat/tests/test_server.rs +++ b/protocols/autonat/tests/test_server.rs @@ -168,7 +168,10 @@ async fn test_dial_error() { }) => { assert_eq!(probe_id, request_probe_id); assert_eq!(peer, client_id); - assert_eq!(error, InboundProbeError::Response(ResponseError::DialError)); + assert!(matches!( + error, + InboundProbeError::Response(ResponseError::DialError) + )); } other => panic!("Unexpected behaviour event: {other:?}."), } @@ -252,10 +255,10 @@ async fn test_throttle_peer_max() { }) => { assert_eq!(client_id, peer); assert_ne!(first_probe_id, probe_id); - assert_eq!( + assert!(matches!( error, InboundProbeError::Response(ResponseError::DialRefused) - ) + )); } other => panic!("Unexpected behaviour event: {other:?}."), }; From 5f945bd736ab6045dedb82b6c8363fedd0cbb36f Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Tue, 24 Oct 2023 00:24:34 +0300 Subject: [PATCH 08/20] fix docs --- protocols/request-response/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 295d3b56ff3..cc11f5541a8 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -564,7 +564,7 @@ where /// Remove pending outbound response for the given peer and connection. /// /// Returns `true` if the provided connection to the given peer is still - /// alive and the [`RequestId`] was previously present and is now removed. + /// alive and the [`OutboundRequestId`] was previously present and is now removed. /// Returns `false` otherwise. fn remove_pending_outbound_response( &mut self, @@ -580,7 +580,7 @@ where /// Remove pending inbound response for the given peer and connection. /// /// Returns `true` if the provided connection to the given peer is still - /// alive and the [`RequestId`] was previously present and is now removed. + /// alive and the [`InboundRequestId`] was previously present and is now removed. /// Returns `false` otherwise. fn remove_pending_inbound_response( &mut self, From 6ef5d8a68f9c4c45652ef8844485c50fd41f2a3a Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Tue, 24 Oct 2023 19:34:22 +0300 Subject: [PATCH 09/20] add Event::InboundTimeout --- protocols/request-response/src/handler.rs | 48 ++++++++++++++--------- protocols/request-response/src/lib.rs | 14 +++++++ 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 8995590a4ad..134767d3fae 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -288,6 +288,9 @@ where request_id: OutboundRequestId, error: io::Error, }, + /// An inbound request timed out while waiting for the request + /// or sending the response. + InboundTimeout(InboundRequestId), InboundStreamFailed { request_id: InboundRequestId, error: io::Error, @@ -337,6 +340,10 @@ impl fmt::Debug for Event { .field("request_id", &request_id) .field("error", &error) .finish(), + Event::InboundTimeout(request_id) => f + .debug_tuple("Event::InboundTimeout") + .field(request_id) + .finish(), Event::InboundStreamFailed { request_id, error } => f .debug_struct("Event::InboundStreamFailed") .field("request_id", &request_id) @@ -414,17 +421,21 @@ where return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } Poll::Ready((id, Ok(Err(e)))) => { - log::debug!("Stream for request {id:?} failed: {e}"); - let event = match id { - RequestId::Inbound(id) => Event::InboundStreamFailed { - request_id: id, - error: e, - }, - RequestId::Outbound(id) => Event::OutboundStreamFailed { - request_id: id, - error: e, - }, + RequestId::Inbound(id) => { + log::debug!("Stream for inbound request {id} failed: {e}"); + Event::InboundStreamFailed { + request_id: id, + error: e, + } + } + RequestId::Outbound(id) => { + log::debug!("Stream for outbound request {id} failed: {e}"); + Event::OutboundStreamFailed { + request_id: id, + error: e, + } + } }; // TODO: How should we handle errors produced after ConnectionClose event? @@ -433,17 +444,18 @@ where return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => { - log::debug!("Stream for request {id:?} timed out"); - - match id { - RequestId::Inbound(_id) => { - // TODO + let event = match id { + RequestId::Inbound(id) => { + log::debug!("Stream for inbound request {id} timed out"); + Event::InboundTimeout(id) } RequestId::Outbound(id) => { - let event = Event::OutboundTimeout(id); - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + log::debug!("Stream for outbound request {id} timed out"); + Event::OutboundTimeout(id) } - } + }; + + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } Poll::Pending => break, } diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index cc11f5541a8..9463fd08628 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -926,6 +926,20 @@ where error: OutboundFailure::Io(error), })) } + handler::Event::InboundTimeout(request_id) => { + let removed = self.remove_pending_inbound_response(&peer, connection, request_id); + debug_assert!( + removed, + "Expect request_id to be pending before request times out." + ); + + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::Timeout, + })); + } handler::Event::InboundStreamFailed { request_id, error } => { let removed = self.remove_pending_inbound_response(&peer, connection, request_id); debug_assert!(removed, "Expect request_id to be pending upon failure"); From 0b542ac0dd6dd6314c29d7e85f60877d19893fe6 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Wed, 25 Oct 2023 12:10:44 +0300 Subject: [PATCH 10/20] do not report error for requests user didn't see --- protocols/request-response/src/handler.rs | 26 +------- protocols/request-response/src/lib.rs | 72 +++++++++++++---------- 2 files changed, 41 insertions(+), 57 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 134767d3fae..b4ea8dc0e1a 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -160,11 +160,8 @@ where if self .worker_streams .try_push(RequestId::Inbound(request_id), recv.boxed()) - .is_ok() + .is_err() { - self.pending_events - .push_back(Event::IncomingRequest { request_id }); - } else { log::warn!("Dropping inbound stream because we are at capacity") } } @@ -261,8 +258,6 @@ pub enum Event where TCodec: Codec, { - /// A request is going to be received. - IncomingRequest { request_id: InboundRequestId }, /// A request has been received. Request { request_id: InboundRequestId, @@ -300,10 +295,6 @@ where impl fmt::Debug for Event { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Event::IncomingRequest { request_id } => f - .debug_struct("Event::IncomingRequest") - .field("request_id", request_id) - .finish(), Event::Request { request_id, request: _, @@ -403,18 +394,6 @@ where cx: &mut Context<'_>, ) -> Poll, (), Self::ToBehaviour, Self::Error>> { - // Drain pending events that were produced before poll. - // E.g. `Event::IncomingRequest` produced by `on_fully_negotiated_inbound`. - // - // NOTE: This is needed because if `read_request` fails before reaching a - // `.await` point, the incoming request will never register and `debug_assert` - // in `InboundStreamFailed` will panic. - if let Some(event) = self.pending_events.pop_front() { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); - } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { - self.pending_events.shrink_to_fit(); - } - loop { match self.worker_streams.poll_unpin(cx) { Poll::Ready((_, Ok(Ok(event)))) => { @@ -438,9 +417,6 @@ where } }; - // TODO: How should we handle errors produced after ConnectionClose event? - // `ConnectionClose` will generate its own error. But only one of the two - // should be forwarded to the upper layer. return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => { diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 9463fd08628..46f2bbf8d47 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -837,29 +837,28 @@ where self.pending_events .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); } - handler::Event::IncomingRequest { request_id } => { - // This event was emmited before `Handler::pool` and it is handled before - // its task gets polled (i.e. `worker_tasks`). That means at this point no - // error should be emmited, because it will be generated by the task itself. - if let Some(connection) = self.get_connection_mut(&peer, connection) { - let inserted = connection.pending_inbound_responses.insert(request_id); - debug_assert!(inserted, "Expect id of new request to be unknown."); - } - } handler::Event::Request { request_id, request, sender, - } => { - let channel = ResponseChannel { sender }; - let message = Message::Request { - request_id, - request, - channel, - }; - self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); - } + } => match self.get_connection_mut(&peer, connection) { + Some(connection) => { + let inserted = connection.pending_inbound_responses.insert(request_id); + debug_assert!(inserted, "Expect id of new request to be unknown."); + + let channel = ResponseChannel { sender }; + let message = Message::Request { + request_id, + request, + channel, + }; + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); + } + None => { + log::debug!("Connection ({connection}) closed after `Event::Request` ({request_id}) has been emitted."); + } + }, handler::Event::ResponseSent(request_id) => { let removed = self.remove_pending_inbound_response(&peer, connection, request_id); debug_assert!( @@ -933,23 +932,32 @@ where "Expect request_id to be pending before request times out." ); - self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { - peer, - request_id, - error: InboundFailure::Timeout, - })); + if removed { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::Timeout, + })); + } else { + // This happens when timeout is emitted before `read_request` finishes. + log::debug!("Inbound request timeout for an unknown request_id ({request_id})"); + } } handler::Event::InboundStreamFailed { request_id, error } => { let removed = self.remove_pending_inbound_response(&peer, connection, request_id); - debug_assert!(removed, "Expect request_id to be pending upon failure"); - self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { - peer, - request_id, - error: InboundFailure::Io(error), - })) + if removed { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::Io(error), + })); + } else { + // This happens when `read_request` fails. + log::debug!("Inbound failure is reported for an unknown request_id ({request_id}): {error}"); + } } } } From c713910a43c0f9aeb47a011577983722c6347531 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Wed, 25 Oct 2023 14:16:52 +0300 Subject: [PATCH 11/20] finalize testcases --- protocols/request-response/Cargo.toml | 2 +- .../request-response/tests/error_reporting.rs | 503 +++++++++++++++--- 2 files changed, 421 insertions(+), 84 deletions(-) diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index a328c3141ce..5e1df1c1345 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -32,7 +32,7 @@ json = ["dep:serde", "dep:serde_json", "libp2p-swarm/macros"] cbor = ["dep:serde", "dep:cbor4ii", "libp2p-swarm/macros"] [dev-dependencies] -async-std = { version = "1.6.2", features = ["attributes"] } +async-std = { version = "1.6.2", features = ["attributes", "unstable"] } env_logger = "0.10.0" libp2p-noise = { workspace = true } libp2p-tcp = { workspace = true, features = ["async-io"] } diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index 292e6cc4f79..cbe79e6d6ef 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -17,45 +17,66 @@ async fn report_outbound_failure_on_read_response() { let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); let cfg = request_response::Config::default(); - let codec = TestCodec(Action::FailOnReadResponse); let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::with_codec(codec.clone(), protocols.clone(), cfg.clone()) + request_response::Behaviour::::new(protocols.clone(), cfg.clone()) }); let peer1_id = *swarm1.local_peer_id(); let mut swarm2 = - Swarm::new_ephemeral(|_| request_response::Behaviour::with_codec(codec, protocols, cfg)); + Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); let peer2_id = *swarm2.local_peer_id(); swarm1.listen().await; swarm2.connect(&mut swarm1).await; + // On panic `panic_check_rx` will be closed + let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); + + // Server let swarm1_task = async move { + let _panic_check_tx = panic_check_tx; + let mut req_id = None; + loop { match swarm1.select_next_some().await.try_into_behaviour_event() { Ok(request_response::Event::Message { peer, - message: request_response::Message::Request { channel, .. }, + message: + request_response::Message::Request { + request_id, + request, + channel, + .. + }, }) => { - assert_eq!(&peer, &peer2_id); - swarm1.behaviour_mut().send_response(channel, ()).unwrap(); - } - Ok(request_response::Event::ResponseSent { peer, .. }) => { - assert_eq!(&peer, &peer2_id); - break; + assert_eq!(peer, peer2_id); + assert_eq!(request, Action::FailOnReadResponse); + req_id = Some(request_id); + swarm1 + .behaviour_mut() + .send_response(channel, Action::FailOnReadResponse) + .unwrap(); } - Ok(ev) => { - panic!("Peer1: Unexpected event: {ev:?}") + Ok(request_response::Event::ResponseSent { + peer, request_id, .. + }) => { + assert_eq!(peer, peer2_id); + assert_eq!(req_id, Some(request_id)); } + Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), Err(..) => {} } } }; + // Client + // // Expects OutboundFailure::Io failure with `FailOnReadResponse` error let swarm2_task = async move { - let req_id = swarm2.behaviour_mut().send_request(&peer1_id, ()); + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnReadResponse); loop { match swarm2.select_next_some().await.try_into_behaviour_event() { @@ -64,12 +85,14 @@ async fn report_outbound_failure_on_read_response() { request_id, error, }) => { - assert_eq!(&peer, &peer1_id); + assert_eq!(peer, peer1_id); assert_eq!(request_id, req_id); + let error = match error { OutboundFailure::Io(e) => e, e => panic!("Peer2: Unexpected error {e:?}"), }; + assert_eq!(error.kind(), io::ErrorKind::Other); assert_eq!( error.into_inner().unwrap().to_string(), @@ -77,21 +100,18 @@ async fn report_outbound_failure_on_read_response() { ); break; } - Ok(ev) => { - panic!("Peer2: Unexpected event: {ev:?}") - } + Ok(ev) => panic!("Peer2: Unexpected event: {ev:?}"), Err(..) => {} } } }; - let join_handle = spawn(swarm1_task); - + spawn(swarm1_task); timeout(Duration::from_millis(100), swarm2_task) .await .expect("timed out on waiting FailOnReadResponse"); - join_handle.await; + assert!(!panic_check_rx.is_closed(), "swarm1_task panicked"); } #[async_std::test] @@ -100,29 +120,42 @@ async fn report_outbound_failure_on_write_request() { let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); let cfg = request_response::Config::default(); - let codec = TestCodec(Action::FailOnWriteRequest); let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::with_codec(codec.clone(), protocols.clone(), cfg.clone()) + request_response::Behaviour::::new(protocols.clone(), cfg.clone()) }); let peer1_id = *swarm1.local_peer_id(); let mut swarm2 = - Swarm::new_ephemeral(|_| request_response::Behaviour::with_codec(codec, protocols, cfg)); + Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); swarm1.listen().await; swarm2.connect(&mut swarm1).await; + // On panic `panic_check_rx` will be closed + let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); + + // Server + // + // Expects no events because `Event::Request` is produced after `read_request`. let swarm1_task = async move { - // No need to take any actions, just consume everything. + let _panic_check_tx = panic_check_tx; + loop { - swarm1.select_next_some().await; + match swarm1.select_next_some().await.try_into_behaviour_event() { + Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), + Err(..) => {} + } } }; + // Client + // // Expects OutboundFailure::Io failure with `FailOnWriteRequest` error. let swarm2_task = async move { - let req_id = swarm2.behaviour_mut().send_request(&peer1_id, ()); + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnWriteRequest); loop { match swarm2.select_next_some().await.try_into_behaviour_event() { @@ -131,12 +164,14 @@ async fn report_outbound_failure_on_write_request() { request_id, error, }) => { - assert_eq!(&peer, &peer1_id); + assert_eq!(peer, peer1_id); assert_eq!(request_id, req_id); + let error = match error { OutboundFailure::Io(e) => e, e => panic!("Peer2: Unexpected error {e:?}"), }; + assert_eq!(error.kind(), io::ErrorKind::Other); assert_eq!( error.into_inner().unwrap().to_string(), @@ -144,9 +179,7 @@ async fn report_outbound_failure_on_write_request() { ); break; } - Ok(ev) => { - panic!("Peer2: Unexpected event: {ev:?}") - } + Ok(ev) => panic!("Peer2: Unexpected event: {ev:?}"), Err(..) => {} } } @@ -155,7 +188,9 @@ async fn report_outbound_failure_on_write_request() { spawn(swarm1_task); timeout(Duration::from_millis(100), swarm2_task) .await - .expect("timed out on waiting FailOnReadResponse"); + .expect("timed out on waiting FailOnWriteRequest"); + + assert!(!panic_check_rx.is_closed(), "swarm1_task panicked"); } #[async_std::test] @@ -164,52 +199,67 @@ async fn report_outbound_timeout_on_read_response() { let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); let cfg = request_response::Config::default(); - let codec = TestCodec(Action::TimeoutOnReadResponse); let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::with_codec(codec.clone(), protocols.clone(), cfg.clone()) + request_response::Behaviour::::new(protocols.clone(), cfg.clone()) }); let peer1_id = *swarm1.local_peer_id(); let mut swarm2 = Swarm::new_ephemeral(|_| { let cfg = cfg.with_request_timeout(Duration::from_millis(100)); - request_response::Behaviour::with_codec(codec, protocols, cfg) + request_response::Behaviour::::new(protocols, cfg) }); let peer2_id = *swarm2.local_peer_id(); swarm1.listen().await; swarm2.connect(&mut swarm1).await; + // On panic `panic_check_rx` will be closed let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); + // Server let swarm1_task = async move { - // Connection needs to be kept alive, so the folloing loop should not break. - // This channel is used to check if `swarm1_task` panicked or not. let _panic_check_tx = panic_check_tx; + let mut req_id = None; loop { match swarm1.select_next_some().await.try_into_behaviour_event() { Ok(request_response::Event::Message { peer, - message: request_response::Message::Request { channel, .. }, + message: + request_response::Message::Request { + request_id, + request, + channel, + }, }) => { - assert_eq!(&peer, &peer2_id); - swarm1.behaviour_mut().send_response(channel, ()).unwrap(); - } - Ok(request_response::Event::ResponseSent { peer, .. }) => { - assert_eq!(&peer, &peer2_id); + assert_eq!(peer, peer2_id); + assert_eq!(request, Action::TimeoutOnReadResponse); + req_id = Some(request_id); + swarm1 + .behaviour_mut() + .send_response(channel, Action::TimeoutOnReadResponse) + .unwrap(); } - Ok(ev) => { - panic!("Peer1: Unexpected event: {ev:?}") + Ok(request_response::Event::ResponseSent { + peer, request_id, .. + }) => { + assert_eq!(peer, peer2_id); + assert_eq!(req_id, Some(request_id)); } + Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), Err(..) => {} } } }; + // Client + // // Expects OutboundFailure::Timeout let swarm2_task = async move { - let req_id = swarm2.behaviour_mut().send_request(&peer1_id, ()); + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::TimeoutOnReadResponse); loop { match swarm2.select_next_some().await.try_into_behaviour_event() { @@ -218,14 +268,12 @@ async fn report_outbound_timeout_on_read_response() { request_id, error, }) => { - assert_eq!(&peer, &peer1_id); + assert_eq!(peer, peer1_id); assert_eq!(request_id, req_id); assert!(matches!(error, OutboundFailure::Timeout)); break; } - Ok(ev) => { - panic!("Peer2: Unexpected event: {ev:?}") - } + Ok(ev) => panic!("Peer2: Unexpected event: {ev:?}"), Err(..) => {} } } @@ -236,7 +284,6 @@ async fn report_outbound_timeout_on_read_response() { .await .expect("timed out on waiting TimeoutOnReadResponse"); - // Make sure panic wasn't a side effect of by a panic assert!(!panic_check_rx.is_closed(), "swarm1_task panicked"); } @@ -246,26 +293,127 @@ async fn report_inbound_failure_on_read_request() { let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); let cfg = request_response::Config::default(); - let codec = TestCodec(Action::FailOnReadRequest); let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::with_codec(codec.clone(), protocols.clone(), cfg.clone()) + request_response::Behaviour::::new(protocols.clone(), cfg.clone()) }); let peer1_id = *swarm1.local_peer_id(); let mut swarm2 = - Swarm::new_ephemeral(|_| request_response::Behaviour::with_codec(codec, protocols, cfg)); + Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // On panic `panic_check_rx` will be closed + let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); + + // Server + // + // Expects no events because `Event::Request` is produced after `read_request`. + let swarm1_task = async move { + let _panic_check_tx = panic_check_tx; + + loop { + match swarm1.select_next_some().await.try_into_behaviour_event() { + Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), + Err(..) => {} + } + } + }; + + // Expects io::ErrorKind::UnexpectedEof + let swarm2_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnReadRequest); + + loop { + match swarm2.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::OutboundFailure { + peer, + request_id, + error, + }) => { + assert_eq!(peer, peer1_id); + assert_eq!(request_id, req_id); + + let error = match error { + OutboundFailure::Io(e) => e, + e => panic!("Peer2: Unexpected error {e:?}"), + }; + + assert_eq!(error.kind(), io::ErrorKind::UnexpectedEof); + break; + } + Ok(ev) => panic!("Peer2: Unexpected event: {ev:?}"), + Err(..) => {} + } + } + }; + + spawn(swarm1_task); + timeout(Duration::from_millis(100), swarm2_task) + .await + .expect("timed out on waiting FailOnWriteRequest"); + + assert!(!panic_check_rx.is_closed(), "swarm1_task panicked"); +} + +#[async_std::test] +async fn report_inbound_failure_on_write_response() { + let _ = env_logger::try_init(); + + let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default(); + + let mut swarm1 = Swarm::new_ephemeral(|_| { + request_response::Behaviour::::new(protocols.clone(), cfg.clone()) + }); + let peer1_id = *swarm1.local_peer_id(); + + let mut swarm2 = + Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); let peer2_id = *swarm2.local_peer_id(); swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // Expects OutboundFailure::Io failure with `FailOnReadRequest` error + // On panic `panic_check_rx` will be closed + let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); + + // Server + // + // Expects OutboundFailure::Io failure with `FailOnWriteResponse` error let swarm1_task = async move { + let mut req_id = None; + loop { match swarm1.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::InboundFailure { peer, error, .. }) => { + Ok(request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request_id, + request, + channel, + }, + }) => { + assert_eq!(peer, peer2_id); + assert_eq!(request, Action::FailOnWriteResponse); + req_id = Some(request_id); + swarm1 + .behaviour_mut() + .send_response(channel, Action::FailOnWriteResponse) + .unwrap(); + } + Ok(request_response::Event::InboundFailure { + peer, + request_id, + error, + }) => { assert_eq!(peer, peer2_id); + assert_eq!(req_id, Some(request_id)); let error = match error { InboundFailure::Io(e) => e, @@ -273,7 +421,10 @@ async fn report_inbound_failure_on_read_request() { }; assert_eq!(error.kind(), io::ErrorKind::Other); - assert_eq!(error.into_inner().unwrap().to_string(), "FailOnReadRequest"); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnWriteResponse" + ); break; } Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), @@ -282,109 +433,295 @@ async fn report_inbound_failure_on_read_request() { } }; + // Client + // + // Expects OutboundFailure::ConnectionClosed or io::ErrorKind::UnexpectedEof let swarm2_task = async move { - let _req_id = swarm2.behaviour_mut().send_request(&peer1_id, ()); + let _panic_check_tx = panic_check_tx; + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnWriteResponse); - // No need to take any actions, just consume everything. loop { - swarm2.select_next_some().await; + match swarm2.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::OutboundFailure { + peer, + request_id, + error, + }) => { + assert_eq!(peer, peer1_id); + assert_eq!(request_id, req_id); + + match error { + OutboundFailure::ConnectionClosed => { + // Connections was closed before `read_response` + } + OutboundFailure::Io(e) => { + assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); + } + e => panic!("Peer2: Unexpected error {e:?}"), + } + } + Ok(ev) => panic!("Peer2: Unexpected event: {ev:?}"), + Err(..) => {} + } } }; spawn(swarm2_task); timeout(Duration::from_millis(100), swarm1_task) .await - .expect("timed out on waiting FailOnWriteRequest"); + .expect("timed out on waiting TimeoutOnWriteResponse"); + + assert!(!panic_check_rx.is_closed(), "swarm2_task panicked"); } -#[derive(Clone)] -struct TestCodec(Action); +#[async_std::test] +async fn report_inbound_timeout_on_write_response() { + let _ = env_logger::try_init(); + + let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default(); + + let mut swarm1 = Swarm::new_ephemeral(|_| { + let cfg = cfg.clone().with_request_timeout(Duration::from_millis(100)); + request_response::Behaviour::::new(protocols.clone(), cfg.clone()) + }); + let peer1_id = *swarm1.local_peer_id(); + + let mut swarm2 = + Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); + let peer2_id = *swarm2.local_peer_id(); + + swarm1.listen().await; + swarm2.connect(&mut swarm1).await; + + // On panic `panic_check_rx` will be closed + let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); + + // Expects InboundFailure::Timeout + let swarm1_task = async move { + let mut req_id = None; + + loop { + match swarm1.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request_id, + request, + channel, + }, + }) => { + assert_eq!(peer, peer2_id); + assert_eq!(request, Action::TimeoutOnWriteResponse); + req_id = Some(request_id); + swarm1 + .behaviour_mut() + .send_response(channel, Action::TimeoutOnWriteResponse) + .unwrap(); + } + Ok(request_response::Event::InboundFailure { + peer, + request_id, + error, + }) => { + assert_eq!(peer, peer2_id); + assert_eq!(req_id, Some(request_id)); + assert!(matches!(error, InboundFailure::Timeout)); + break; + } + Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), + Err(..) => {} + } + } + }; + + // Expects OutboundFailure::ConnectionClosed or io::ErrorKind::UnexpectedEof + let swarm2_task = async move { + let _panic_check_tx = panic_check_tx; + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::TimeoutOnWriteResponse); + + loop { + match swarm2.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::OutboundFailure { + peer, + request_id, + error, + }) => { + assert_eq!(peer, peer1_id); + assert_eq!(request_id, req_id); + + match error { + OutboundFailure::ConnectionClosed => { + // Connections was closed before `read_response` + } + OutboundFailure::Io(e) => { + assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof) + } + e => panic!("Peer2: Unexpected error {e:?}"), + } + } + Ok(ev) => panic!("Peer2: Unexpected event: {ev:?}"), + Err(..) => {} + } + } + }; + + spawn(swarm2_task); + timeout(Duration::from_millis(200), swarm1_task) + .await + .expect("timed out on waiting TimeoutOnWriteResponse"); + + assert!(!panic_check_rx.is_closed(), "swarm2_task panicked"); +} + +#[derive(Clone, Default)] +struct TestCodec; #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum Action { FailOnReadRequest, FailOnReadResponse, + TimeoutOnReadResponse, FailOnWriteRequest, FailOnWriteResponse, - TimeoutOnReadRequest, - TimeoutOnReadResponse, + TimeoutOnWriteResponse, +} + +impl From for u8 { + fn from(value: Action) -> Self { + match value { + Action::FailOnReadRequest => 0, + Action::FailOnReadResponse => 1, + Action::TimeoutOnReadResponse => 2, + Action::FailOnWriteRequest => 3, + Action::FailOnWriteResponse => 4, + Action::TimeoutOnWriteResponse => 5, + } + } +} + +impl TryFrom for Action { + type Error = io::Error; + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(Action::FailOnReadRequest), + 1 => Ok(Action::FailOnReadResponse), + 2 => Ok(Action::TimeoutOnReadResponse), + 3 => Ok(Action::FailOnWriteRequest), + 4 => Ok(Action::FailOnWriteResponse), + 5 => Ok(Action::TimeoutOnWriteResponse), + _ => Err(io::Error::new(io::ErrorKind::Other, "invalid action")), + } + } } #[async_trait] impl Codec for TestCodec { type Protocol = StreamProtocol; - type Request = (); - type Response = (); + type Request = Action; + type Response = Action; async fn read_request( &mut self, _protocol: &Self::Protocol, - _io: &mut T, + io: &mut T, ) -> io::Result where T: AsyncRead + Unpin + Send, { - match self.0 { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + + if buf.is_empty() { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + assert_eq!(buf.len(), 1); + + match buf[0].try_into()? { Action::FailOnReadRequest => { Err(io::Error::new(io::ErrorKind::Other, "FailOnReadRequest")) } - Action::TimeoutOnReadRequest => loop { - sleep(Duration::MAX).await; - }, - _ => Ok(()), + action => Ok(action), } } async fn read_response( &mut self, _protocol: &Self::Protocol, - _io: &mut T, + io: &mut T, ) -> io::Result where T: AsyncRead + Unpin + Send, { - match self.0 { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + + if buf.is_empty() { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + assert_eq!(buf.len(), 1); + + match buf[0].try_into()? { Action::FailOnReadResponse => { Err(io::Error::new(io::ErrorKind::Other, "FailOnReadResponse")) } Action::TimeoutOnReadResponse => loop { sleep(Duration::MAX).await; }, - _ => Ok(()), + action => Ok(action), } } async fn write_request( &mut self, _protocol: &Self::Protocol, - _io: &mut T, - _req: Self::Request, + io: &mut T, + req: Self::Request, ) -> io::Result<()> where T: AsyncWrite + Unpin + Send, { - match self.0 { + match req { Action::FailOnWriteRequest => { Err(io::Error::new(io::ErrorKind::Other, "FailOnWriteRequest")) } - _ => Ok(()), + action => { + let bytes = [action.into()]; + io.write_all(&bytes).await?; + Ok(()) + } } } async fn write_response( &mut self, _protocol: &Self::Protocol, - _io: &mut T, - _res: Self::Response, + io: &mut T, + res: Self::Response, ) -> io::Result<()> where T: AsyncWrite + Unpin + Send, { - match self.0 { + match res { Action::FailOnWriteResponse => { Err(io::Error::new(io::ErrorKind::Other, "FailOnWriteResponse")) } - _ => Ok(()), + Action::TimeoutOnWriteResponse => loop { + sleep(Duration::MAX).await; + }, + action => { + let bytes = [action.into()]; + io.write_all(&bytes).await?; + Ok(()) + } } } } From fb0f69b778fe05b2ea8c6e7fa3438d34348d67e6 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Wed, 25 Oct 2023 14:31:13 +0300 Subject: [PATCH 12/20] fix docs and add next_inbound_request_id method` --- protocols/request-response/Cargo.toml | 2 +- protocols/request-response/src/handler.rs | 7 ++++++- protocols/request-response/src/lib.rs | 4 ++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 5e1df1c1345..a328c3141ce 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -32,7 +32,7 @@ json = ["dep:serde", "dep:serde_json", "libp2p-swarm/macros"] cbor = ["dep:serde", "dep:cbor4ii", "libp2p-swarm/macros"] [dev-dependencies] -async-std = { version = "1.6.2", features = ["attributes", "unstable"] } +async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.10.0" libp2p-noise = { workspace = true } libp2p-tcp = { workspace = true, features = ["async-io"] } diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index b4ea8dc0e1a..437db2f383a 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -118,6 +118,11 @@ where } } + /// Returns the next inbound request ID. + fn next_inbound_request_id(&mut self) -> InboundRequestId { + InboundRequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)) + } + fn on_fully_negotiated_inbound( &mut self, FullyNegotiatedInbound { @@ -129,7 +134,7 @@ where >, ) { let mut codec = self.codec.clone(); - let request_id = InboundRequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)); + let request_id = self.next_inbound_request_id(); let mut sender = self.inbound_sender.clone(); let recv = async move { diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 46f2bbf8d47..649a63167d8 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -273,7 +273,7 @@ impl ResponseChannel { /// The ID of an inbound request. /// /// Note: [`InboundRequestId`]'s uniqueness is only guaranteed between -/// two inbound requests of the same originating [`Behaviour`]. +/// inbound requests of the same originating [`Behaviour`]. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct InboundRequestId(u64); @@ -286,7 +286,7 @@ impl fmt::Display for InboundRequestId { /// The ID of an outbound request. /// /// Note: [`OutboundRequestId`]'s uniqueness is only guaranteed between -/// two outbound requests of the same originating [`Behaviour`]. +/// outbound requests of the same originating [`Behaviour`]. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct OutboundRequestId(u64); From 71af1cdb55d5cdb0b96f5d62e89c1fe43f448e64 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 26 Oct 2023 09:58:29 +0300 Subject: [PATCH 13/20] reduce indentation --- protocols/request-response/src/handler.rs | 56 ++++++++++------------- 1 file changed, 24 insertions(+), 32 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 437db2f383a..fbb2e948e99 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -404,39 +404,31 @@ where Poll::Ready((_, Ok(Ok(event)))) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } - Poll::Ready((id, Ok(Err(e)))) => { - let event = match id { - RequestId::Inbound(id) => { - log::debug!("Stream for inbound request {id} failed: {e}"); - Event::InboundStreamFailed { - request_id: id, - error: e, - } - } - RequestId::Outbound(id) => { - log::debug!("Stream for outbound request {id} failed: {e}"); - Event::OutboundStreamFailed { - request_id: id, - error: e, - } - } - }; - - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + Poll::Ready((RequestId::Inbound(id), Ok(Err(e)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundStreamFailed { + request_id: id, + error: e, + }, + )); } - Poll::Ready((id, Err(futures_bounded::Timeout { .. }))) => { - let event = match id { - RequestId::Inbound(id) => { - log::debug!("Stream for inbound request {id} timed out"); - Event::InboundTimeout(id) - } - RequestId::Outbound(id) => { - log::debug!("Stream for outbound request {id} timed out"); - Event::OutboundTimeout(id) - } - }; - - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + Poll::Ready((RequestId::Outbound(id), Ok(Err(e)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundStreamFailed { + request_id: id, + error: e, + }, + )); + } + Poll::Ready((RequestId::Inbound(id), Err(futures_bounded::Timeout { .. }))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundTimeout(id), + )); + } + Poll::Ready((RequestId::Outbound(id), Err(futures_bounded::Timeout { .. }))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundTimeout(id), + )); } Poll::Pending => break, } From 7b770824bef45c1a80606669d4dd5d9b307d7762 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 26 Oct 2023 09:58:57 +0300 Subject: [PATCH 14/20] remove debug_assert --- protocols/request-response/src/lib.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 649a63167d8..e181d452a67 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -927,10 +927,6 @@ where } handler::Event::InboundTimeout(request_id) => { let removed = self.remove_pending_inbound_response(&peer, connection, request_id); - debug_assert!( - removed, - "Expect request_id to be pending before request times out." - ); if removed { self.pending_events From 4cc072178c5c35ae4f3780ea67f31e9b9423dcc9 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 26 Oct 2023 10:03:11 +0300 Subject: [PATCH 15/20] fix clippy --- protocols/request-response/src/handler.rs | 62 +++++++++++------------ 1 file changed, 30 insertions(+), 32 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index fbb2e948e99..b754653907b 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -399,39 +399,37 @@ where cx: &mut Context<'_>, ) -> Poll, (), Self::ToBehaviour, Self::Error>> { - loop { - match self.worker_streams.poll_unpin(cx) { - Poll::Ready((_, Ok(Ok(event)))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); - } - Poll::Ready((RequestId::Inbound(id), Ok(Err(e)))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundStreamFailed { - request_id: id, - error: e, - }, - )); - } - Poll::Ready((RequestId::Outbound(id), Ok(Err(e)))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundStreamFailed { - request_id: id, - error: e, - }, - )); - } - Poll::Ready((RequestId::Inbound(id), Err(futures_bounded::Timeout { .. }))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::InboundTimeout(id), - )); - } - Poll::Ready((RequestId::Outbound(id), Err(futures_bounded::Timeout { .. }))) => { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - Event::OutboundTimeout(id), - )); - } - Poll::Pending => break, + match self.worker_streams.poll_unpin(cx) { + Poll::Ready((_, Ok(Ok(event)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); } + Poll::Ready((RequestId::Inbound(id), Ok(Err(e)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundStreamFailed { + request_id: id, + error: e, + }, + )); + } + Poll::Ready((RequestId::Outbound(id), Ok(Err(e)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundStreamFailed { + request_id: id, + error: e, + }, + )); + } + Poll::Ready((RequestId::Inbound(id), Err(futures_bounded::Timeout { .. }))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::InboundTimeout(id), + )); + } + Poll::Ready((RequestId::Outbound(id), Err(futures_bounded::Timeout { .. }))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundTimeout(id), + )); + } + Poll::Pending => {} } // Drain pending events that were produced by `worker_streams`. From 21a34fc719531d0536f5c95fa628c9d6c6544f3d Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 26 Oct 2023 11:06:46 +0300 Subject: [PATCH 16/20] cleaner test --- Cargo.lock | 1 + protocols/request-response/Cargo.toml | 1 + .../request-response/tests/error_reporting.rs | 192 ++++++++++-------- 3 files changed, 108 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6d4198e00dd..4af6722bdbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2994,6 +2994,7 @@ dependencies = [ name = "libp2p-request-response" version = "0.26.0" dependencies = [ + "anyhow", "async-std", "async-trait", "cbor4ii", diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index a328c3141ce..5c894bcd60f 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -32,6 +32,7 @@ json = ["dep:serde", "dep:serde_json", "libp2p-swarm/macros"] cbor = ["dep:serde", "dep:cbor4ii", "libp2p-swarm/macros"] [dev-dependencies] +anyhow = "1.0.75" async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.10.0" libp2p-noise = { workspace = true } diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index cbe79e6d6ef..7321eff5529 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -1,13 +1,17 @@ +use anyhow::{bail, Result}; use async_std::channel; use async_std::future::timeout; use async_std::task::{sleep, spawn}; use async_trait::async_trait; use futures::prelude::*; +use libp2p_identity::PeerId; use libp2p_request_response as request_response; use libp2p_request_response::ProtocolSupport; use libp2p_swarm::{StreamProtocol, Swarm}; use libp2p_swarm_test::SwarmExt; -use request_response::{Codec, InboundFailure, OutboundFailure}; +use request_response::{ + Codec, InboundFailure, InboundRequestId, OutboundFailure, OutboundRequestId, ResponseChannel, +}; use std::time::Duration; use std::{io, iter}; @@ -15,103 +19,50 @@ use std::{io, iter}; async fn report_outbound_failure_on_read_response() { let _ = env_logger::try_init(); - let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); - let cfg = request_response::Config::default(); - - let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::::new(protocols.clone(), cfg.clone()) - }); - let peer1_id = *swarm1.local_peer_id(); - - let mut swarm2 = - Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); - let peer2_id = *swarm2.local_peer_id(); + let (peer1_id, mut swarm1) = new_swarm_with_timeout(Duration::from_millis(100)); + let (peer2_id, mut swarm2) = new_swarm_with_timeout(Duration::from_millis(100)); swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // On panic `panic_check_rx` will be closed - let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); + let join_handle = spawn(async move { + let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(action, Action::FailOnReadResponse); + swarm1 + .behaviour_mut() + .send_response(resp_channel, Action::FailOnReadResponse) + .unwrap(); - // Server - let swarm1_task = async move { - let _panic_check_tx = panic_check_tx; - let mut req_id = None; + let (peer, req_id_done) = wait_response_sent(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(req_id_done, req_id); - loop { - match swarm1.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::Message { - peer, - message: - request_response::Message::Request { - request_id, - request, - channel, - .. - }, - }) => { - assert_eq!(peer, peer2_id); - assert_eq!(request, Action::FailOnReadResponse); - req_id = Some(request_id); - swarm1 - .behaviour_mut() - .send_response(channel, Action::FailOnReadResponse) - .unwrap(); - } - Ok(request_response::Event::ResponseSent { - peer, request_id, .. - }) => { - assert_eq!(peer, peer2_id); - assert_eq!(req_id, Some(request_id)); - } - Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), - Err(..) => {} - } - } - }; - - // Client - // - // Expects OutboundFailure::Io failure with `FailOnReadResponse` error - let swarm2_task = async move { - let req_id = swarm2 - .behaviour_mut() - .send_request(&peer1_id, Action::FailOnReadResponse); + // Wait a bit for the other side + sleep(Duration::from_millis(10)).await; + }); - loop { - match swarm2.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::OutboundFailure { - peer, - request_id, - error, - }) => { - assert_eq!(peer, peer1_id); - assert_eq!(request_id, req_id); + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnReadResponse); - let error = match error { - OutboundFailure::Io(e) => e, - e => panic!("Peer2: Unexpected error {e:?}"), - }; + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); - assert_eq!(error.kind(), io::ErrorKind::Other); - assert_eq!( - error.into_inner().unwrap().to_string(), - "FailOnReadResponse" - ); - break; - } - Ok(ev) => panic!("Peer2: Unexpected event: {ev:?}"), - Err(..) => {} - } - } + let error = match error { + OutboundFailure::Io(e) => e, + e => panic!("Unexpected error {e:?}"), }; - spawn(swarm1_task); - timeout(Duration::from_millis(100), swarm2_task) - .await - .expect("timed out on waiting FailOnReadResponse"); + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnReadResponse" + ); - assert!(!panic_check_rx.is_closed(), "swarm1_task panicked"); + // Panics if task panicked + join_handle.await; } #[async_std::test] @@ -579,6 +530,75 @@ async fn report_inbound_timeout_on_write_response() { assert!(!panic_check_rx.is_closed(), "swarm2_task panicked"); } +fn new_swarm_with_timeout( + timeout: Duration, +) -> (PeerId, Swarm>) { + let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default().with_request_timeout(timeout); + + let swarm = + Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); + let peed_id = *swarm.local_peer_id(); + + (peed_id, swarm) +} + +async fn wait_request( + swarm: &mut Swarm>, +) -> Result<(PeerId, InboundRequestId, Action, ResponseChannel)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request_id, + request, + channel, + }, + }) => { + return Ok((peer, request_id, request, channel)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + +async fn wait_response_sent( + swarm: &mut Swarm>, +) -> Result<(PeerId, InboundRequestId)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::ResponseSent { + peer, request_id, .. + }) => { + return Ok((peer, request_id)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + +async fn wait_outbound_failure( + swarm: &mut Swarm>, +) -> Result<(PeerId, OutboundRequestId, OutboundFailure)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::OutboundFailure { + peer, + request_id, + error, + }) => { + return Ok((peer, request_id, error)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + #[derive(Clone, Default)] struct TestCodec; From 0f27d04a4b04788fe940b027de3ca4c301650d48 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 26 Oct 2023 11:29:59 +0300 Subject: [PATCH 17/20] even cleaner test --- .../request-response/tests/error_reporting.rs | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index 7321eff5529..3c181163d2d 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -25,7 +25,7 @@ async fn report_outbound_failure_on_read_response() { swarm1.listen().await; swarm2.connect(&mut swarm1).await; - let join_handle = spawn(async move { + let swarm1_task = async move { let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); assert_eq!(peer, peer2_id); assert_eq!(action, Action::FailOnReadResponse); @@ -40,29 +40,30 @@ async fn report_outbound_failure_on_read_response() { // Wait a bit for the other side sleep(Duration::from_millis(10)).await; - }); + }; - let req_id = swarm2 - .behaviour_mut() - .send_request(&peer1_id, Action::FailOnReadResponse); + let swarm2_task = async move { + let req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnReadResponse); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); - let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); - assert_eq!(peer, peer1_id); - assert_eq!(req_id_done, req_id); + let error = match error { + OutboundFailure::Io(e) => e, + e => panic!("Unexpected error {e:?}"), + }; - let error = match error { - OutboundFailure::Io(e) => e, - e => panic!("Unexpected error {e:?}"), + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnReadResponse" + ); }; - assert_eq!(error.kind(), io::ErrorKind::Other); - assert_eq!( - error.into_inner().unwrap().to_string(), - "FailOnReadResponse" - ); - - // Panics if task panicked - join_handle.await; + futures::future::join(swarm1_task, swarm2_task).await; } #[async_std::test] From 4184300150daa2931633b3073bd84926b7a3177c Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 26 Oct 2023 12:46:07 +0300 Subject: [PATCH 18/20] cleaner tests --- .../request-response/tests/error_reporting.rs | 664 +++++++----------- 1 file changed, 247 insertions(+), 417 deletions(-) diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index 3c181163d2d..993ee9d465d 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -1,7 +1,5 @@ use anyhow::{bail, Result}; -use async_std::channel; -use async_std::future::timeout; -use async_std::task::{sleep, spawn}; +use async_std::task::sleep; use async_trait::async_trait; use futures::prelude::*; use libp2p_identity::PeerId; @@ -12,6 +10,7 @@ use libp2p_swarm_test::SwarmExt; use request_response::{ Codec, InboundFailure, InboundRequestId, OutboundFailure, OutboundRequestId, ResponseChannel, }; +use std::pin::pin; use std::time::Duration; use std::{io, iter}; @@ -19,12 +18,13 @@ use std::{io, iter}; async fn report_outbound_failure_on_read_response() { let _ = env_logger::try_init(); - let (peer1_id, mut swarm1) = new_swarm_with_timeout(Duration::from_millis(100)); - let (peer2_id, mut swarm2) = new_swarm_with_timeout(Duration::from_millis(100)); + let (peer1_id, mut swarm1) = new_swarm(); + let (peer2_id, mut swarm2) = new_swarm(); swarm1.listen().await; swarm2.connect(&mut swarm1).await; + // Server let swarm1_task = async move { let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); assert_eq!(peer, peer2_id); @@ -38,10 +38,13 @@ async fn report_outbound_failure_on_read_response() { assert_eq!(peer, peer2_id); assert_eq!(req_id_done, req_id); - // Wait a bit for the other side - sleep(Duration::from_millis(10)).await; + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead + wait_no_events(&mut swarm1).await; }; + // Client + // + // Expects OutboundFailure::Io failure with `FailOnReadResponse` error let swarm2_task = async move { let req_id = swarm2 .behaviour_mut() @@ -53,7 +56,7 @@ async fn report_outbound_failure_on_read_response() { let error = match error { OutboundFailure::Io(e) => e, - e => panic!("Unexpected error {e:?}"), + e => panic!("Unexpected error: {e:?}"), }; assert_eq!(error.kind(), io::ErrorKind::Other); @@ -63,42 +66,27 @@ async fn report_outbound_failure_on_read_response() { ); }; - futures::future::join(swarm1_task, swarm2_task).await; + let swarm1_task = pin!(swarm1_task); + let swarm2_task = pin!(swarm2_task); + futures::future::select(swarm1_task, swarm2_task).await; } #[async_std::test] async fn report_outbound_failure_on_write_request() { let _ = env_logger::try_init(); - let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); - let cfg = request_response::Config::default(); - - let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::::new(protocols.clone(), cfg.clone()) - }); - let peer1_id = *swarm1.local_peer_id(); - - let mut swarm2 = - Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); + let (peer1_id, mut swarm1) = new_swarm(); + let (_peer2_id, mut swarm2) = new_swarm(); swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // On panic `panic_check_rx` will be closed - let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); - // Server // // Expects no events because `Event::Request` is produced after `read_request`. let swarm1_task = async move { - let _panic_check_tx = panic_check_tx; - - loop { - match swarm1.select_next_some().await.try_into_behaviour_event() { - Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), - Err(..) => {} - } - } + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead + wait_no_events(&mut swarm1).await; }; // Client @@ -109,100 +97,54 @@ async fn report_outbound_failure_on_write_request() { .behaviour_mut() .send_request(&peer1_id, Action::FailOnWriteRequest); - loop { - match swarm2.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::OutboundFailure { - peer, - request_id, - error, - }) => { - assert_eq!(peer, peer1_id); - assert_eq!(request_id, req_id); - - let error = match error { - OutboundFailure::Io(e) => e, - e => panic!("Peer2: Unexpected error {e:?}"), - }; - - assert_eq!(error.kind(), io::ErrorKind::Other); - assert_eq!( - error.into_inner().unwrap().to_string(), - "FailOnWriteRequest" - ); - break; - } - Ok(ev) => panic!("Peer2: Unexpected event: {ev:?}"), - Err(..) => {} - } - } - }; + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + + let error = match error { + OutboundFailure::Io(e) => e, + e => panic!("Unexpected error: {e:?}"), + }; - spawn(swarm1_task); - timeout(Duration::from_millis(100), swarm2_task) - .await - .expect("timed out on waiting FailOnWriteRequest"); + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnWriteRequest" + ); + }; - assert!(!panic_check_rx.is_closed(), "swarm1_task panicked"); + let swarm1_task = pin!(swarm1_task); + let swarm2_task = pin!(swarm2_task); + futures::future::select(swarm1_task, swarm2_task).await; } #[async_std::test] async fn report_outbound_timeout_on_read_response() { let _ = env_logger::try_init(); - let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); - let cfg = request_response::Config::default(); - - let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::::new(protocols.clone(), cfg.clone()) - }); - let peer1_id = *swarm1.local_peer_id(); - - let mut swarm2 = Swarm::new_ephemeral(|_| { - let cfg = cfg.with_request_timeout(Duration::from_millis(100)); - request_response::Behaviour::::new(protocols, cfg) - }); - let peer2_id = *swarm2.local_peer_id(); + // `swarm1` needs to have a bigger timeout to avoid racing + let (peer1_id, mut swarm1) = new_swarm_with_timeout(Duration::from_millis(200)); + let (peer2_id, mut swarm2) = new_swarm_with_timeout(Duration::from_millis(100)); swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // On panic `panic_check_rx` will be closed - let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); - // Server let swarm1_task = async move { - let _panic_check_tx = panic_check_tx; - let mut req_id = None; - - loop { - match swarm1.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::Message { - peer, - message: - request_response::Message::Request { - request_id, - request, - channel, - }, - }) => { - assert_eq!(peer, peer2_id); - assert_eq!(request, Action::TimeoutOnReadResponse); - req_id = Some(request_id); - swarm1 - .behaviour_mut() - .send_response(channel, Action::TimeoutOnReadResponse) - .unwrap(); - } - Ok(request_response::Event::ResponseSent { - peer, request_id, .. - }) => { - assert_eq!(peer, peer2_id); - assert_eq!(req_id, Some(request_id)); - } - Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), - Err(..) => {} - } - } + let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(action, Action::TimeoutOnReadResponse); + swarm1 + .behaviour_mut() + .send_response(resp_channel, Action::TimeoutOnReadResponse) + .unwrap(); + + let (peer, req_id_done) = wait_response_sent(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(req_id_done, req_id); + + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead + wait_no_events(&mut swarm1).await; }; // Client @@ -213,65 +155,33 @@ async fn report_outbound_timeout_on_read_response() { .behaviour_mut() .send_request(&peer1_id, Action::TimeoutOnReadResponse); - loop { - match swarm2.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::OutboundFailure { - peer, - request_id, - error, - }) => { - assert_eq!(peer, peer1_id); - assert_eq!(request_id, req_id); - assert!(matches!(error, OutboundFailure::Timeout)); - break; - } - Ok(ev) => panic!("Peer2: Unexpected event: {ev:?}"), - Err(..) => {} - } - } + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + assert!(matches!(error, OutboundFailure::Timeout)); }; - spawn(swarm1_task); - timeout(Duration::from_millis(200), swarm2_task) - .await - .expect("timed out on waiting TimeoutOnReadResponse"); - - assert!(!panic_check_rx.is_closed(), "swarm1_task panicked"); + let swarm1_task = pin!(swarm1_task); + let swarm2_task = pin!(swarm2_task); + futures::future::select(swarm1_task, swarm2_task).await; } #[async_std::test] async fn report_inbound_failure_on_read_request() { let _ = env_logger::try_init(); - let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); - let cfg = request_response::Config::default(); - - let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::::new(protocols.clone(), cfg.clone()) - }); - let peer1_id = *swarm1.local_peer_id(); - - let mut swarm2 = - Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); + let (peer1_id, mut swarm1) = new_swarm(); + let (_peer2_id, mut swarm2) = new_swarm(); swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // On panic `panic_check_rx` will be closed - let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); - // Server // // Expects no events because `Event::Request` is produced after `read_request`. let swarm1_task = async move { - let _panic_check_tx = panic_check_tx; - - loop { - match swarm1.select_next_some().await.try_into_behaviour_event() { - Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), - Err(..) => {} - } - } + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead + wait_no_events(&mut swarm1).await; }; // Expects io::ErrorKind::UnexpectedEof @@ -280,324 +190,144 @@ async fn report_inbound_failure_on_read_request() { .behaviour_mut() .send_request(&peer1_id, Action::FailOnReadRequest); - loop { - match swarm2.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::OutboundFailure { - peer, - request_id, - error, - }) => { - assert_eq!(peer, peer1_id); - assert_eq!(request_id, req_id); - - let error = match error { - OutboundFailure::Io(e) => e, - e => panic!("Peer2: Unexpected error {e:?}"), - }; - - assert_eq!(error.kind(), io::ErrorKind::UnexpectedEof); - break; - } - Ok(ev) => panic!("Peer2: Unexpected event: {ev:?}"), - Err(..) => {} - } - } - }; + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); - spawn(swarm1_task); - timeout(Duration::from_millis(100), swarm2_task) - .await - .expect("timed out on waiting FailOnWriteRequest"); + match error { + OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof => {} + e => panic!("Unexpected error: {e:?}"), + }; + }; - assert!(!panic_check_rx.is_closed(), "swarm1_task panicked"); + let swarm1_task = pin!(swarm1_task); + let swarm2_task = pin!(swarm2_task); + futures::future::select(swarm1_task, swarm2_task).await; } #[async_std::test] async fn report_inbound_failure_on_write_response() { let _ = env_logger::try_init(); - let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); - let cfg = request_response::Config::default(); - - let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::::new(protocols.clone(), cfg.clone()) - }); - let peer1_id = *swarm1.local_peer_id(); - - let mut swarm2 = - Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); - let peer2_id = *swarm2.local_peer_id(); + let (peer1_id, mut swarm1) = new_swarm(); + let (peer2_id, mut swarm2) = new_swarm(); swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // On panic `panic_check_rx` will be closed - let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); - // Server // // Expects OutboundFailure::Io failure with `FailOnWriteResponse` error let swarm1_task = async move { - let mut req_id = None; - - loop { - match swarm1.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::Message { - peer, - message: - request_response::Message::Request { - request_id, - request, - channel, - }, - }) => { - assert_eq!(peer, peer2_id); - assert_eq!(request, Action::FailOnWriteResponse); - req_id = Some(request_id); - swarm1 - .behaviour_mut() - .send_response(channel, Action::FailOnWriteResponse) - .unwrap(); - } - Ok(request_response::Event::InboundFailure { - peer, - request_id, - error, - }) => { - assert_eq!(peer, peer2_id); - assert_eq!(req_id, Some(request_id)); - - let error = match error { - InboundFailure::Io(e) => e, - e => panic!("Peer1: Unexpected error {e:?}"), - }; - - assert_eq!(error.kind(), io::ErrorKind::Other); - assert_eq!( - error.into_inner().unwrap().to_string(), - "FailOnWriteResponse" - ); - break; - } - Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), - Err(..) => {} - } - } + let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(action, Action::FailOnWriteResponse); + swarm1 + .behaviour_mut() + .send_response(resp_channel, Action::FailOnWriteResponse) + .unwrap(); + + let (peer, req_id_done, error) = wait_inbound_failure(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(req_id_done, req_id); + + let error = match error { + InboundFailure::Io(e) => e, + e => panic!("Unexpected error: {e:?}"), + }; + + assert_eq!(error.kind(), io::ErrorKind::Other); + assert_eq!( + error.into_inner().unwrap().to_string(), + "FailOnWriteResponse" + ); }; // Client // // Expects OutboundFailure::ConnectionClosed or io::ErrorKind::UnexpectedEof let swarm2_task = async move { - let _panic_check_tx = panic_check_tx; let req_id = swarm2 .behaviour_mut() .send_request(&peer1_id, Action::FailOnWriteResponse); - loop { - match swarm2.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::OutboundFailure { - peer, - request_id, - error, - }) => { - assert_eq!(peer, peer1_id); - assert_eq!(request_id, req_id); - - match error { - OutboundFailure::ConnectionClosed => { - // Connections was closed before `read_response` - } - OutboundFailure::Io(e) => { - assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); - } - e => panic!("Peer2: Unexpected error {e:?}"), - } - } - Ok(ev) => panic!("Peer2: Unexpected event: {ev:?}"), - Err(..) => {} + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); + + match error { + OutboundFailure::ConnectionClosed => { + // ConnectionClosed is allowed here because we mainly test the behavior + // of `swarm1_task`. } - } - }; + OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof => {} + e => panic!("Unexpected error: {e:?}"), + }; - spawn(swarm2_task); - timeout(Duration::from_millis(100), swarm1_task) - .await - .expect("timed out on waiting TimeoutOnWriteResponse"); + // Keep alive the task, so only `swarm1_task` can finish + wait_no_events(&mut swarm2).await; + }; - assert!(!panic_check_rx.is_closed(), "swarm2_task panicked"); + let swarm1_task = pin!(swarm1_task); + let swarm2_task = pin!(swarm2_task); + futures::future::select(swarm1_task, swarm2_task).await; } #[async_std::test] async fn report_inbound_timeout_on_write_response() { let _ = env_logger::try_init(); - let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); - let cfg = request_response::Config::default(); - - let mut swarm1 = Swarm::new_ephemeral(|_| { - let cfg = cfg.clone().with_request_timeout(Duration::from_millis(100)); - request_response::Behaviour::::new(protocols.clone(), cfg.clone()) - }); - let peer1_id = *swarm1.local_peer_id(); - - let mut swarm2 = - Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); - let peer2_id = *swarm2.local_peer_id(); + // `swarm2` needs to have a bigger timeout to avoid racing + let (peer1_id, mut swarm1) = new_swarm_with_timeout(Duration::from_millis(100)); + let (peer2_id, mut swarm2) = new_swarm_with_timeout(Duration::from_millis(200)); swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // On panic `panic_check_rx` will be closed - let (panic_check_tx, panic_check_rx) = channel::bounded::<()>(1); - + // Server + // // Expects InboundFailure::Timeout let swarm1_task = async move { - let mut req_id = None; - - loop { - match swarm1.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::Message { - peer, - message: - request_response::Message::Request { - request_id, - request, - channel, - }, - }) => { - assert_eq!(peer, peer2_id); - assert_eq!(request, Action::TimeoutOnWriteResponse); - req_id = Some(request_id); - swarm1 - .behaviour_mut() - .send_response(channel, Action::TimeoutOnWriteResponse) - .unwrap(); - } - Ok(request_response::Event::InboundFailure { - peer, - request_id, - error, - }) => { - assert_eq!(peer, peer2_id); - assert_eq!(req_id, Some(request_id)); - assert!(matches!(error, InboundFailure::Timeout)); - break; - } - Ok(ev) => panic!("Peer1: Unexpected event: {ev:?}"), - Err(..) => {} - } - } + let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(action, Action::TimeoutOnWriteResponse); + swarm1 + .behaviour_mut() + .send_response(resp_channel, Action::TimeoutOnWriteResponse) + .unwrap(); + + let (peer, req_id_done, error) = wait_inbound_failure(&mut swarm1).await.unwrap(); + assert_eq!(peer, peer2_id); + assert_eq!(req_id_done, req_id); + assert!(matches!(error, InboundFailure::Timeout)); }; // Expects OutboundFailure::ConnectionClosed or io::ErrorKind::UnexpectedEof let swarm2_task = async move { - let _panic_check_tx = panic_check_tx; let req_id = swarm2 .behaviour_mut() .send_request(&peer1_id, Action::TimeoutOnWriteResponse); - loop { - match swarm2.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::OutboundFailure { - peer, - request_id, - error, - }) => { - assert_eq!(peer, peer1_id); - assert_eq!(request_id, req_id); - - match error { - OutboundFailure::ConnectionClosed => { - // Connections was closed before `read_response` - } - OutboundFailure::Io(e) => { - assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof) - } - e => panic!("Peer2: Unexpected error {e:?}"), - } - } - Ok(ev) => panic!("Peer2: Unexpected event: {ev:?}"), - Err(..) => {} - } - } - }; - - spawn(swarm2_task); - timeout(Duration::from_millis(200), swarm1_task) - .await - .expect("timed out on waiting TimeoutOnWriteResponse"); - - assert!(!panic_check_rx.is_closed(), "swarm2_task panicked"); -} - -fn new_swarm_with_timeout( - timeout: Duration, -) -> (PeerId, Swarm>) { - let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); - let cfg = request_response::Config::default().with_request_timeout(timeout); - - let swarm = - Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); - let peed_id = *swarm.local_peer_id(); - - (peed_id, swarm) -} + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, req_id); -async fn wait_request( - swarm: &mut Swarm>, -) -> Result<(PeerId, InboundRequestId, Action, ResponseChannel)> { - loop { - match swarm.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::Message { - peer, - message: - request_response::Message::Request { - request_id, - request, - channel, - }, - }) => { - return Ok((peer, request_id, request, channel)); + match error { + OutboundFailure::ConnectionClosed => { + // ConnectionClosed is allowed here because we mainly test the behavior + // of `swarm1_task`. } - Ok(ev) => bail!("Unexpected event: {ev:?}"), - Err(..) => {} + OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof => {} + e => panic!("Unexpected error: {e:?}"), } - } -} -async fn wait_response_sent( - swarm: &mut Swarm>, -) -> Result<(PeerId, InboundRequestId)> { - loop { - match swarm.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::ResponseSent { - peer, request_id, .. - }) => { - return Ok((peer, request_id)); - } - Ok(ev) => bail!("Unexpected event: {ev:?}"), - Err(..) => {} - } - } -} + // Keep alive the task, so only `swarm1_task` can finish + wait_no_events(&mut swarm2).await; + }; -async fn wait_outbound_failure( - swarm: &mut Swarm>, -) -> Result<(PeerId, OutboundRequestId, OutboundFailure)> { - loop { - match swarm.select_next_some().await.try_into_behaviour_event() { - Ok(request_response::Event::OutboundFailure { - peer, - request_id, - error, - }) => { - return Ok((peer, request_id, error)); - } - Ok(ev) => bail!("Unexpected event: {ev:?}"), - Err(..) => {} - } - } + let swarm1_task = pin!(swarm1_task); + let swarm2_task = pin!(swarm2_task); + futures::future::select(swarm1_task, swarm2_task).await; } #[derive(Clone, Default)] @@ -746,3 +476,103 @@ impl Codec for TestCodec { } } } + +fn new_swarm_with_timeout( + timeout: Duration, +) -> (PeerId, Swarm>) { + let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default().with_request_timeout(timeout); + + let swarm = + Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); + let peed_id = *swarm.local_peer_id(); + + (peed_id, swarm) +} + +fn new_swarm() -> (PeerId, Swarm>) { + new_swarm_with_timeout(Duration::from_millis(100)) +} + +async fn wait_no_events(swarm: &mut Swarm>) { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(ev) => panic!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + +async fn wait_request( + swarm: &mut Swarm>, +) -> Result<(PeerId, InboundRequestId, Action, ResponseChannel)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request_id, + request, + channel, + }, + }) => { + return Ok((peer, request_id, request, channel)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + +async fn wait_response_sent( + swarm: &mut Swarm>, +) -> Result<(PeerId, InboundRequestId)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::ResponseSent { + peer, request_id, .. + }) => { + return Ok((peer, request_id)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + +async fn wait_inbound_failure( + swarm: &mut Swarm>, +) -> Result<(PeerId, InboundRequestId, InboundFailure)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::InboundFailure { + peer, + request_id, + error, + }) => { + return Ok((peer, request_id, error)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} + +async fn wait_outbound_failure( + swarm: &mut Swarm>, +) -> Result<(PeerId, OutboundRequestId, OutboundFailure)> { + loop { + match swarm.select_next_some().await.try_into_behaviour_event() { + Ok(request_response::Event::OutboundFailure { + peer, + request_id, + error, + }) => { + return Ok((peer, request_id, error)); + } + Ok(ev) => bail!("Unexpected event: {ev:?}"), + Err(..) => {} + } + } +} From 14b28fd5d350710fbb7d154669e5024574f11934 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 26 Oct 2023 12:52:48 +0300 Subject: [PATCH 19/20] fix clippy --- protocols/request-response/tests/error_reporting.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index 993ee9d465d..e3d82191bd8 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -496,9 +496,8 @@ fn new_swarm() -> (PeerId, Swarm>) { async fn wait_no_events(swarm: &mut Swarm>) { loop { - match swarm.select_next_some().await.try_into_behaviour_event() { - Ok(ev) => panic!("Unexpected event: {ev:?}"), - Err(..) => {} + if let Ok(ev) = swarm.select_next_some().await.try_into_behaviour_event() { + panic!("Unexpected event: {ev:?}") } } } From af3adaeab97b9b66379a8578e115d8306ade79d7 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 26 Oct 2023 13:22:04 +0300 Subject: [PATCH 20/20] review changes --- .../request-response/tests/error_reporting.rs | 94 +++++++------------ 1 file changed, 36 insertions(+), 58 deletions(-) diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index e3d82191bd8..cf651d395f5 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -24,8 +24,7 @@ async fn report_outbound_failure_on_read_response() { swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // Server - let swarm1_task = async move { + let server_task = async move { let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); assert_eq!(peer, peer2_id); assert_eq!(action, Action::FailOnReadResponse); @@ -42,10 +41,8 @@ async fn report_outbound_failure_on_read_response() { wait_no_events(&mut swarm1).await; }; - // Client - // // Expects OutboundFailure::Io failure with `FailOnReadResponse` error - let swarm2_task = async move { + let client_task = async move { let req_id = swarm2 .behaviour_mut() .send_request(&peer1_id, Action::FailOnReadResponse); @@ -66,9 +63,9 @@ async fn report_outbound_failure_on_read_response() { ); }; - let swarm1_task = pin!(swarm1_task); - let swarm2_task = pin!(swarm2_task); - futures::future::select(swarm1_task, swarm2_task).await; + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; } #[async_std::test] @@ -81,18 +78,12 @@ async fn report_outbound_failure_on_write_request() { swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // Server - // // Expects no events because `Event::Request` is produced after `read_request`. - let swarm1_task = async move { - // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead - wait_no_events(&mut swarm1).await; - }; + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead. + let server_task = wait_no_events(&mut swarm1); - // Client - // // Expects OutboundFailure::Io failure with `FailOnWriteRequest` error. - let swarm2_task = async move { + let client_task = async move { let req_id = swarm2 .behaviour_mut() .send_request(&peer1_id, Action::FailOnWriteRequest); @@ -113,9 +104,9 @@ async fn report_outbound_failure_on_write_request() { ); }; - let swarm1_task = pin!(swarm1_task); - let swarm2_task = pin!(swarm2_task); - futures::future::select(swarm1_task, swarm2_task).await; + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; } #[async_std::test] @@ -129,8 +120,7 @@ async fn report_outbound_timeout_on_read_response() { swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // Server - let swarm1_task = async move { + let server_task = async move { let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); assert_eq!(peer, peer2_id); assert_eq!(action, Action::TimeoutOnReadResponse); @@ -147,10 +137,8 @@ async fn report_outbound_timeout_on_read_response() { wait_no_events(&mut swarm1).await; }; - // Client - // // Expects OutboundFailure::Timeout - let swarm2_task = async move { + let client_task = async move { let req_id = swarm2 .behaviour_mut() .send_request(&peer1_id, Action::TimeoutOnReadResponse); @@ -161,9 +149,9 @@ async fn report_outbound_timeout_on_read_response() { assert!(matches!(error, OutboundFailure::Timeout)); }; - let swarm1_task = pin!(swarm1_task); - let swarm2_task = pin!(swarm2_task); - futures::future::select(swarm1_task, swarm2_task).await; + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; } #[async_std::test] @@ -176,16 +164,12 @@ async fn report_inbound_failure_on_read_request() { swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // Server - // // Expects no events because `Event::Request` is produced after `read_request`. - let swarm1_task = async move { - // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead - wait_no_events(&mut swarm1).await; - }; + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead. + let server_task = wait_no_events(&mut swarm1); // Expects io::ErrorKind::UnexpectedEof - let swarm2_task = async move { + let client_task = async move { let req_id = swarm2 .behaviour_mut() .send_request(&peer1_id, Action::FailOnReadRequest); @@ -200,9 +184,9 @@ async fn report_inbound_failure_on_read_request() { }; }; - let swarm1_task = pin!(swarm1_task); - let swarm2_task = pin!(swarm2_task); - futures::future::select(swarm1_task, swarm2_task).await; + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; } #[async_std::test] @@ -215,10 +199,8 @@ async fn report_inbound_failure_on_write_response() { swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // Server - // // Expects OutboundFailure::Io failure with `FailOnWriteResponse` error - let swarm1_task = async move { + let server_task = async move { let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); assert_eq!(peer, peer2_id); assert_eq!(action, Action::FailOnWriteResponse); @@ -243,10 +225,8 @@ async fn report_inbound_failure_on_write_response() { ); }; - // Client - // // Expects OutboundFailure::ConnectionClosed or io::ErrorKind::UnexpectedEof - let swarm2_task = async move { + let client_task = async move { let req_id = swarm2 .behaviour_mut() .send_request(&peer1_id, Action::FailOnWriteResponse); @@ -258,19 +238,19 @@ async fn report_inbound_failure_on_write_response() { match error { OutboundFailure::ConnectionClosed => { // ConnectionClosed is allowed here because we mainly test the behavior - // of `swarm1_task`. + // of `server_task`. } OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof => {} e => panic!("Unexpected error: {e:?}"), }; - // Keep alive the task, so only `swarm1_task` can finish + // Keep alive the task, so only `server_task` can finish wait_no_events(&mut swarm2).await; }; - let swarm1_task = pin!(swarm1_task); - let swarm2_task = pin!(swarm2_task); - futures::future::select(swarm1_task, swarm2_task).await; + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; } #[async_std::test] @@ -284,10 +264,8 @@ async fn report_inbound_timeout_on_write_response() { swarm1.listen().await; swarm2.connect(&mut swarm1).await; - // Server - // // Expects InboundFailure::Timeout - let swarm1_task = async move { + let server_task = async move { let (peer, req_id, action, resp_channel) = wait_request(&mut swarm1).await.unwrap(); assert_eq!(peer, peer2_id); assert_eq!(action, Action::TimeoutOnWriteResponse); @@ -303,7 +281,7 @@ async fn report_inbound_timeout_on_write_response() { }; // Expects OutboundFailure::ConnectionClosed or io::ErrorKind::UnexpectedEof - let swarm2_task = async move { + let client_task = async move { let req_id = swarm2 .behaviour_mut() .send_request(&peer1_id, Action::TimeoutOnWriteResponse); @@ -315,19 +293,19 @@ async fn report_inbound_timeout_on_write_response() { match error { OutboundFailure::ConnectionClosed => { // ConnectionClosed is allowed here because we mainly test the behavior - // of `swarm1_task`. + // of `server_task`. } OutboundFailure::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof => {} e => panic!("Unexpected error: {e:?}"), } - // Keep alive the task, so only `swarm1_task` can finish + // Keep alive the task, so only `server_task` can finish wait_no_events(&mut swarm2).await; }; - let swarm1_task = pin!(swarm1_task); - let swarm2_task = pin!(swarm2_task); - futures::future::select(swarm1_task, swarm2_task).await; + let server_task = pin!(server_task); + let client_task = pin!(client_task); + futures::future::select(server_task, client_task).await; } #[derive(Clone, Default)]