diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index 2857d23655c..256bb463b5a 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -605,7 +605,6 @@ impl NetworkBehaviour for Behaviour { src_peer_id, src_connection_id, inbound_circuit_req, - dst_handler_notifier, dst_stream, dst_pending_data, } => { @@ -616,7 +615,6 @@ impl NetworkBehaviour for Behaviour { circuit_id, dst_peer_id: event_source, inbound_circuit_req, - dst_handler_notifier, dst_stream, dst_pending_data, }), diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index 13619cb45c6..60997a107e6 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -24,7 +24,6 @@ use crate::protocol::{inbound_hop, outbound_stop}; use crate::{proto, HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME}; use bytes::Bytes; use either::Either; -use futures::channel::oneshot::{self, Canceled}; use futures::future::{BoxFuture, FutureExt, TryFutureExt}; use futures::io::AsyncWriteExt; use futures::stream::{FuturesUnordered, StreamExt}; @@ -79,7 +78,6 @@ pub enum In { circuit_id: CircuitId, dst_peer_id: PeerId, inbound_circuit_req: inbound_hop::CircuitReq, - dst_handler_notifier: oneshot::Sender<()>, dst_stream: Stream, dst_pending_data: Bytes, }, @@ -126,7 +124,6 @@ impl fmt::Debug for In { circuit_id, inbound_circuit_req: _, dst_peer_id, - dst_handler_notifier: _, dst_stream: _, dst_pending_data: _, } => f @@ -195,7 +192,6 @@ pub enum Event { src_peer_id: PeerId, src_connection_id: ConnectionId, inbound_circuit_req: inbound_hop::CircuitReq, - dst_handler_notifier: oneshot::Sender<()>, dst_stream: Stream, dst_pending_data: Bytes, }, @@ -292,7 +288,6 @@ impl fmt::Debug for Event { src_peer_id, src_connection_id, inbound_circuit_req: _, - dst_handler_notifier: _, dst_stream: _, dst_pending_data: _, } => f @@ -372,11 +367,6 @@ pub struct Handler { PeerId, Result<(), inbound_hop::UpgradeError>, )>, - /// Tracks substreams lend out to other [`Handler`]s. - /// - /// Contains a [`futures::future::Future`] for each lend out substream that - /// resolves once the substream is dropped. - alive_lend_out_substreams: FuturesUnordered>, /// Futures relaying data for circuit between two peers. circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>, @@ -411,7 +401,6 @@ impl Handler { reservation_request_future: Default::default(), circuit_accept_futures: Default::default(), circuit_deny_futures: Default::default(), - alive_lend_out_substreams: Default::default(), circuits: Default::default(), active_reservation: Default::default(), pending_connect_requests: Default::default(), @@ -442,12 +431,9 @@ impl Handler { .pop_front() .expect("opened a stream without a pending stop command"); - let (tx, rx) = oneshot::channel(); - self.alive_lend_out_substreams.push(rx); - if self .workers - .try_push(outbound_stop::connect(stream, stop_command, tx).map(Either::Right)) + .try_push(outbound_stop::connect(stream, stop_command).map(Either::Right)) .is_err() { log::warn!("Dropping outbound stream because we are at capacity") @@ -587,7 +573,6 @@ impl ConnectionHandler for Handler { circuit_id, dst_peer_id, inbound_circuit_req, - dst_handler_notifier, dst_stream, dst_pending_data, } => { @@ -600,7 +585,6 @@ impl ConnectionHandler for Handler { src_stream, src_pending_data, dst_peer_id, - dst_handler_notifier, dst_stream, dst_pending_data, }) @@ -693,7 +677,6 @@ impl ConnectionHandler for Handler { src_peer_id: circuit.src_peer_id, src_connection_id: circuit.src_connection_id, inbound_circuit_req: circuit.inbound_circuit_req, - dst_handler_notifier: circuit.dst_handler_notifier, dst_stream: circuit.dst_stream, dst_pending_data: circuit.dst_pending_data, }, @@ -761,7 +744,6 @@ impl ConnectionHandler for Handler { mut src_stream, src_pending_data, dst_peer_id, - dst_handler_notifier, mut dst_stream, dst_pending_data, } = parts; @@ -785,8 +767,6 @@ impl ConnectionHandler for Handler { ) .await?; - // Inform destination handler that the stream to the destination is dropped. - drop(dst_handler_notifier); Ok(()) } .map(move |r| (circuit_id, dst_peer_id, r)) @@ -870,11 +850,6 @@ impl ConnectionHandler for Handler { None => {} } - // Check lend out substreams. - while let Poll::Ready(Some(Err(Canceled))) = - self.alive_lend_out_substreams.poll_next_unpin(cx) - {} - // Check keep alive status. if self.active_reservation.is_none() { if self.idle_at.is_none() { @@ -925,7 +900,6 @@ struct CircuitParts { src_stream: Stream, src_pending_data: Bytes, dst_peer_id: PeerId, - dst_handler_notifier: oneshot::Sender<()>, dst_stream: Stream, dst_pending_data: Bytes, } diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index df668ec1798..b15b3d68ae1 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -29,7 +29,6 @@ use crate::protocol::{self, inbound_stop, outbound_hop}; use bytes::Bytes; use either::Either; use futures::channel::mpsc::Receiver; -use futures::channel::oneshot; use futures::future::{BoxFuture, FutureExt}; use futures::io::{AsyncRead, AsyncWrite}; use futures::ready; @@ -386,22 +385,13 @@ pub(crate) enum ConnectionState { Operational { read_buffer: Bytes, substream: Stream, - /// "Drop notifier" pattern to signal to the transport that the connection has been dropped. - /// - /// This is flagged as "dead-code" by the compiler because we never read from it here. - /// However, it is actual use is to trigger the `Canceled` error in the `Transport` when this `Sender` is dropped. - #[allow(dead_code)] - drop_notifier: oneshot::Sender, }, } impl Unpin for ConnectionState {} impl ConnectionState { - pub(crate) fn new_inbound( - circuit: inbound_stop::Circuit, - drop_notifier: oneshot::Sender, - ) -> Self { + pub(crate) fn new_inbound(circuit: inbound_stop::Circuit) -> Self { ConnectionState::InboundAccepting { accept: async { let (substream, read_buffer) = circuit @@ -411,22 +401,16 @@ impl ConnectionState { Ok(ConnectionState::Operational { read_buffer, substream, - drop_notifier, }) } .boxed(), } } - pub(crate) fn new_outbound( - substream: Stream, - read_buffer: Bytes, - drop_notifier: oneshot::Sender, - ) -> Self { + pub(crate) fn new_outbound(substream: Stream, read_buffer: Bytes) -> Self { ConnectionState::Operational { substream, read_buffer, - drop_notifier, } } } diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index fb7428e3133..66bbc5896b1 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -145,12 +145,6 @@ pub struct Handler { reservation: Reservation, - /// Tracks substreams lent out to the transport. - /// - /// Contains a [`futures::future::Future`] for each lend out substream that - /// resolves once the substream is dropped. - alive_lend_out_substreams: FuturesUnordered>, - open_circuit_futs: futures_bounded::FuturesSet>, @@ -177,7 +171,6 @@ impl Handler { MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), reservation: Reservation::None, - alive_lend_out_substreams: Default::default(), open_circuit_futs: futures_bounded::FuturesSet::new( STREAM_TIMEOUT, MAX_CONCURRENT_STREAMS_PER_CONNECTION, @@ -413,9 +406,7 @@ impl ConnectionHandler for Handler { let src_peer_id = circuit.src_peer_id(); let limit = circuit.limit(); - let (tx, rx) = oneshot::channel(); - self.alive_lend_out_substreams.push(rx); - let connection = super::ConnectionState::new_inbound(circuit, tx); + let connection = super::ConnectionState::new_inbound(circuit); pending_msgs.push_back( transport::ToListenerMsg::IncomingRelayedConnection { @@ -471,15 +462,6 @@ impl ConnectionHandler for Handler { // Send errors to transport. while let Poll::Ready(Some(())) = self.send_error_futs.poll_next_unpin(cx) {} - // Check status of lend out substreams. - loop { - match self.alive_lend_out_substreams.poll_next_unpin(cx) { - Poll::Ready(Some(Err(oneshot::Canceled))) => {} - Poll::Ready(Some(Ok(v))) => void::unreachable(v), - Poll::Ready(None) | Poll::Pending => break, - } - } - Poll::Pending } @@ -526,9 +508,6 @@ impl ConnectionHandler for Handler { } } outbound_hop::OutboundStreamInfo::CircuitConnection(cmd) => { - let (tx, rx) = oneshot::channel(); - self.alive_lend_out_substreams.push(rx); - if self .outbound_circuits .try_push( @@ -536,7 +515,6 @@ impl ConnectionHandler for Handler { stream, self.remote_peer_id, cmd, - tx, ) .map_ok(Either::Right), ) diff --git a/protocols/relay/src/protocol/outbound_hop.rs b/protocols/relay/src/protocol/outbound_hop.rs index d43dd63fe32..6a222db55c1 100644 --- a/protocols/relay/src/protocol/outbound_hop.rs +++ b/protocols/relay/src/protocol/outbound_hop.rs @@ -26,7 +26,6 @@ use futures::prelude::*; use futures_timer::Delay; use log::debug; use thiserror::Error; -use void::Void; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; @@ -183,7 +182,6 @@ pub(crate) async fn handle_connection_message_response( protocol: Stream, remote_peer_id: PeerId, con_command: Command, - tx: oneshot::Sender, ) -> Result, CircuitFailedReason>, FatalUpgradeError> { let msg = proto::HopMessage { type_pb: proto::HopMessageType::CONNECT, @@ -259,7 +257,7 @@ pub(crate) async fn handle_connection_message_response( ); match con_command.send_back.send(Ok(priv_client::Connection { - state: priv_client::ConnectionState::new_outbound(io, read_buffer.freeze(), tx), + state: priv_client::ConnectionState::new_outbound(io, read_buffer.freeze()), })) { Ok(()) => Ok(Ok(Some(Circuit { limit }))), Err(_) => { diff --git a/protocols/relay/src/protocol/outbound_stop.rs b/protocols/relay/src/protocol/outbound_stop.rs index e4502957995..6f715f14f14 100644 --- a/protocols/relay/src/protocol/outbound_stop.rs +++ b/protocols/relay/src/protocol/outbound_stop.rs @@ -22,7 +22,6 @@ use std::time::Duration; use asynchronous_codec::{Framed, FramedParts}; use bytes::Bytes; -use futures::channel::oneshot::{self}; use futures::prelude::*; use thiserror::Error; @@ -77,7 +76,6 @@ pub enum FatalUpgradeError { pub(crate) async fn connect( io: Stream, stop_command: PendingConnect, - tx: oneshot::Sender<()>, ) -> Result, FatalUpgradeError> { let msg = proto::StopMessage { type_pb: proto::StopMessageType::CONNECT, @@ -164,7 +162,6 @@ pub(crate) async fn connect( src_peer_id: stop_command.src_peer_id, src_connection_id: stop_command.src_connection_id, inbound_circuit_req: stop_command.inbound_circuit_req, - dst_handler_notifier: tx, dst_stream: io, dst_pending_data: read_buffer.freeze(), })) @@ -175,7 +172,6 @@ pub(crate) struct Circuit { pub(crate) src_peer_id: PeerId, pub(crate) src_connection_id: ConnectionId, pub(crate) inbound_circuit_req: inbound_hop::CircuitReq, - pub(crate) dst_handler_notifier: oneshot::Sender<()>, pub(crate) dst_stream: Stream, pub(crate) dst_pending_data: Bytes, }