From 1b39aaf480eeae4799143e7c3a53d9f23a31aa4a Mon Sep 17 00:00:00 2001 From: dgarus Date: Tue, 1 Aug 2023 15:56:08 +0300 Subject: [PATCH] fix review comment --- protocols/relay/src/behaviour/handler.rs | 82 ++++++++++-------------- 1 file changed, 33 insertions(+), 49 deletions(-) diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index 08cc9d69d83..09579dbc038 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -388,9 +388,7 @@ pub struct Handler { circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>, stop_requested_streams: VecDeque, - outbound_stop_futs: FuturesUnordered< - BoxFuture<'static, Result<(Event, Option>), FatalUpgradeError>>, - >, + outbound_stop_futs: FuturesUnordered>>, } impl Handler { @@ -416,7 +414,7 @@ impl Handler { &self, io: Stream, stop_command: StopCommand, - ) -> BoxFuture<'static, Result<(Event, Option>), FatalUpgradeError>> { + ) -> BoxFuture<'static, Result> { let msg = proto::StopMessage { type_pb: proto::StopMessageType::CONNECT, peer: Some(proto::Peer { @@ -436,6 +434,9 @@ impl Handler { status: None, }; + let (tx, rx) = oneshot::channel(); + self.alive_lend_out_substreams.push(rx); + let mut substream = Framed::new(io, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE)); async move { substream.send(msg).await?; @@ -461,34 +462,26 @@ impl Handler { Some(proto_status) => match proto_status { Status::OK => {} Status::RESOURCE_LIMIT_EXCEEDED => { - return Ok(( - Event::OutboundConnectNegotiationFailed { - circuit_id: stop_command.circuit_id, - src_peer_id: stop_command.src_peer_id, - src_connection_id: stop_command.src_connection_id, - inbound_circuit_req: stop_command.inbound_circuit_req, - status: proto_status, - error: StreamUpgradeError::Apply( - CircuitFailedReason::ResourceLimitExceeded, - ), - }, - None, - )) + return Ok(Event::OutboundConnectNegotiationFailed { + circuit_id: stop_command.circuit_id, + src_peer_id: stop_command.src_peer_id, + src_connection_id: stop_command.src_connection_id, + inbound_circuit_req: stop_command.inbound_circuit_req, + status: proto_status, + error: StreamUpgradeError::Apply( + CircuitFailedReason::ResourceLimitExceeded, + ), + }) } Status::PERMISSION_DENIED => { - return Ok(( - Event::OutboundConnectNegotiationFailed { - circuit_id: stop_command.circuit_id, - src_peer_id: stop_command.src_peer_id, - src_connection_id: stop_command.src_connection_id, - inbound_circuit_req: stop_command.inbound_circuit_req, - status: proto_status, - error: StreamUpgradeError::Apply( - CircuitFailedReason::PermissionDenied, - ), - }, - None, - )) + return Ok(Event::OutboundConnectNegotiationFailed { + circuit_id: stop_command.circuit_id, + src_peer_id: stop_command.src_peer_id, + src_connection_id: stop_command.src_connection_id, + inbound_circuit_req: stop_command.inbound_circuit_req, + status: proto_status, + error: StreamUpgradeError::Apply(CircuitFailedReason::PermissionDenied), + }) } s => return Err(FatalUpgradeError::UnexpectedStatus(s)), }, @@ -508,19 +501,15 @@ impl Handler { "Expect a flushed Framed to have an empty write buffer." ); - let (tx, rx) = oneshot::channel(); - Ok(( - Event::OutboundConnectNegotiated { - circuit_id: stop_command.circuit_id, - src_peer_id: stop_command.src_peer_id, - src_connection_id: stop_command.src_connection_id, - inbound_circuit_req: stop_command.inbound_circuit_req, - dst_handler_notifier: tx, - dst_stream: io, - dst_pending_data: read_buffer.freeze(), - }, - Some(rx), - )) + Ok(Event::OutboundConnectNegotiated { + circuit_id: stop_command.circuit_id, + src_peer_id: stop_command.src_peer_id, + src_connection_id: stop_command.src_connection_id, + inbound_circuit_req: stop_command.inbound_circuit_req, + dst_handler_notifier: tx, + dst_stream: io, + dst_pending_data: read_buffer.freeze(), + }) } .boxed() } @@ -744,12 +733,7 @@ impl ConnectionHandler for Handler { // Send stop commands if let Poll::Ready(Some(result)) = self.outbound_stop_futs.poll_next_unpin(cx) { return match result { - Ok((event, receiver)) => { - if let Some(rx) = receiver { - self.alive_lend_out_substreams.push(rx); - } - Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) - } + Ok(event) => Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)), Err(e) => Poll::Ready(ConnectionHandlerEvent::Close(StreamUpgradeError::Apply( Either::Right(e), ))),