Skip to content

Commit

Permalink
fix(request-response): cleanup connected on dial/listen failures
Browse files Browse the repository at this point in the history
Its possible for dial and listen failures to be sent even after handle_established_*_connection
methods are called. Therefore we need to clean up any state about those failed connections.

Prior to this change the state would get out of sync and cause a debug_assert_eq panic.
  • Loading branch information
nathanielc committed Nov 1, 2023
1 parent 823d0b2 commit fbbea4e
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 8 deletions.
43 changes: 39 additions & 4 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ use libp2p_identity::PeerId;
use libp2p_swarm::{
behaviour::{AddressChange, ConnectionClosed, DialFailure, FromSwarm},
dial_opts::DialOpts,
ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
ConnectionDenied, ConnectionHandler, ConnectionId, ListenFailure, NetworkBehaviour,
NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use smallvec::SmallVec;
use std::{
Expand Down Expand Up @@ -677,7 +677,24 @@ where
}
}

fn on_dial_failure(&mut self, DialFailure { peer_id, .. }: DialFailure) {
// Removes the connection for a peer if it exists.
fn remove_connection_for_peer(&mut self, peer: PeerId, connection_id: ConnectionId) {
if let Some(connections) = self.connected.get_mut(&peer) {
connections
.iter()
.position(|c| c.id == connection_id)
.map(|p: usize| connections.remove(p));
}
}

fn on_dial_failure(
&mut self,
DialFailure {
peer_id,
connection_id,
..
}: DialFailure,
) {
if let Some(peer) = peer_id {
// If there are pending outgoing requests when a dial failure occurs,
// it is implied that we are not connected to the peer, since pending
Expand All @@ -695,6 +712,24 @@ where
}));
}
}

// Its possible that this dial failure is for an existing connection.
// If so we need to remove the connection.
self.remove_connection_for_peer(peer, connection_id);
}
}
fn on_listen_failure(
&mut self,
ListenFailure {
peer_id,
connection_id,
..
}: ListenFailure,
) {
if let Some(peer) = peer_id {
// Its possible that this listen failure is for an existing connection.
// If so we need to remove the connection.
self.remove_connection_for_peer(peer, connection_id);
}
}

Expand Down Expand Up @@ -804,7 +839,7 @@ where
}
FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
FromSwarm::ListenFailure(_) => {}
FromSwarm::ListenFailure(listen_failure) => self.on_listen_failure(listen_failure),
FromSwarm::NewListener(_) => {}
FromSwarm::NewListenAddr(_) => {}
FromSwarm::ExpiredListenAddr(_) => {}
Expand Down
8 changes: 5 additions & 3 deletions swarm-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,15 +346,17 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> syn::Result<Toke
local_addr,
send_back_addr,
connection_id,
error
error,
peer_id,
}));
},
None => quote! {
self.#enum_n.on_swarm_event(#from_swarm::ListenFailure(#listen_failure {
local_addr,
send_back_addr,
connection_id,
error
error,
peer_id,
}));
},
});
Expand Down Expand Up @@ -839,7 +841,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> syn::Result<Toke
#dial_failure { peer_id, connection_id, error })
=> { #(#on_dial_failure_stmts)* }
#from_swarm::ListenFailure(
#listen_failure { local_addr, send_back_addr, connection_id, error })
#listen_failure { local_addr, send_back_addr, connection_id, error, peer_id })
=> { #(#on_listen_failure_stmts)* }
#from_swarm::NewListener(
#new_listener { listener_id })
Expand Down
9 changes: 8 additions & 1 deletion swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ pub trait NetworkBehaviour: 'static {
/// At this point, we have verified their [`PeerId`] and we know, which particular [`Multiaddr`] succeeded in the dial.
/// In order to actually use this connection, this function must return a [`ConnectionHandler`].
/// Returning an error will immediately close the connection.
///
/// Note when any composed behaviour returns an error the connection will be closed and a
/// [`FromSwarm::ListenFailure`] event will be emitted.
fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
Expand Down Expand Up @@ -182,6 +185,9 @@ pub trait NetworkBehaviour: 'static {
/// At this point, we have verified their [`PeerId`] and we know, which particular [`Multiaddr`] succeeded in the dial.
/// In order to actually use this connection, this function must return a [`ConnectionHandler`].
/// Returning an error will immediately close the connection.
///
/// Note when any composed behaviour returns an error the connection will be closed and a
/// [`FromSwarm::DialFailure`] event will be emitted.
fn handle_established_outbound_connection(
&mut self,
_connection_id: ConnectionId,
Expand Down Expand Up @@ -266,7 +272,7 @@ pub enum ToSwarm<TOutEvent, TInEvent> {
/// The emphasis on a **new** candidate is important.
/// Protocols MUST take care to only emit a candidate once per "source".
/// For example, the observed address of a TCP connection does not change throughout its lifetime.
/// Thus, only one candidate should be emitted per connection.
/// Thus, only one candidate should be emitted per connection.
///
/// This makes the report frequency of an address a meaningful data-point for consumers of this event.
/// This address will be shared with all [`NetworkBehaviour`]s via [`FromSwarm::NewExternalAddrCandidate`].
Expand Down Expand Up @@ -488,6 +494,7 @@ pub struct ListenFailure<'a> {
pub send_back_addr: &'a Multiaddr,
pub error: &'a ListenError,
pub connection_id: ConnectionId,
pub peer_id: Option<PeerId>,
}

/// [`FromSwarm`] variant that informs the behaviour that a new listener was created.
Expand Down
3 changes: 3 additions & 0 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@ where
send_back_addr: &send_back_addr,
error: &listen_error,
connection_id: id,
peer_id: Some(peer_id),
},
));

Expand Down Expand Up @@ -847,6 +848,7 @@ where
send_back_addr: &send_back_addr,
error: &error,
connection_id: id,
peer_id: None,
}));
self.pending_swarm_events
.push_back(SwarmEvent::IncomingConnectionError {
Expand Down Expand Up @@ -950,6 +952,7 @@ where
send_back_addr: &send_back_addr,
error: &listen_error,
connection_id,
peer_id: None,
}));

self.pending_swarm_events
Expand Down

0 comments on commit fbbea4e

Please sign in to comment.