Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: report protocol mismatch error #1577

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions sn_cli/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ async fn main() -> Result<()> {

// get the broadcaster as we want to have our own progress bar.
let broadcaster = ClientEventsBroadcaster::default();
let progress_bar_handler = spawn_connection_progress_bar(broadcaster.subscribe());
let (progress_bar, progress_bar_handler) =
spawn_connection_progress_bar(broadcaster.subscribe());

let result = Client::new(
secret_key,
Expand All @@ -121,11 +122,15 @@ async fn main() -> Result<()> {
Some(broadcaster),
)
.await;

// await on the progress bar to complete before handling the client result. If client errors out, we would
// want to make the progress bar clean up gracefully.
let client = match result {
Ok(client) => client,
Err(err) => {
// clean up progress bar
progress_bar.finish_with_message("Could not connect to the network");
return Err(err.into());
}
};
progress_bar_handler.await?;
let client = result?;

// default to verifying storage
let should_verify_store = !opt.no_verify;
Expand Down Expand Up @@ -153,17 +158,18 @@ async fn main() -> Result<()> {

/// Helper to subscribe to the client events broadcaster and spin up a progress bar that terminates when the
/// client successfully connects to the network or if it errors out.
fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()> {
fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> (ProgressBar, JoinHandle<()>) {
// Network connection progress bar
let progress_bar = ProgressBar::new_spinner();
let progress_bar_clone = progress_bar.clone();
progress_bar.enable_steady_tick(Duration::from_millis(120));
progress_bar.set_message("Connecting to The SAFE Network...");
let new_style = progress_bar.style().tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈🔗");
progress_bar.set_style(new_style);

progress_bar.set_message("Connecting to The SAFE Network...");

tokio::spawn(async move {
let handle = tokio::spawn(async move {
let mut peers_connected = 0;
loop {
match rx.recv().await {
Expand All @@ -190,7 +196,8 @@ fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()>
_ => {}
}
}
})
});
(progress_bar_clone, handle)
}

fn get_client_secret_key(root_dir: &PathBuf) -> Result<SecretKey> {
Expand Down
30 changes: 26 additions & 4 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ impl Client {
// loop to connect to the network
let mut is_connected = false;
let connection_timeout = connection_timeout.unwrap_or(CONNECTION_TIMEOUT);
let mut unsupported_protocol_tracker: Option<(String, String)> = None;

debug!("Client connection timeout: {connection_timeout:?}");
let mut connection_timeout_interval = interval(connection_timeout);
// first tick completes immediately
connection_timeout_interval.tick().await;
Expand All @@ -199,16 +201,26 @@ impl Client {
tokio::select! {
_ = connection_timeout_interval.tick() => {
if !is_connected {
if let Some((our_protocol, their_protocols)) = unsupported_protocol_tracker {
error!("Timeout: Client could not connect to the network as it does not support the protocol");
break Err(Error::UnsupportedProtocol(our_protocol, their_protocols));
}
error!("Timeout: Client failed to connect to the network within {connection_timeout:?}");
return Err(Error::ConnectionTimeout(connection_timeout));
break Err(Error::ConnectionTimeout(connection_timeout));
}
}
event = client_events_rx.recv() => {
match event {
// we do not error out directly as we might still connect if the other initial peers are from
// the correct network.
Ok(ClientEvent::PeerWithUnsupportedProtocol { our_protocol, their_protocol }) => {
warn!(%our_protocol, %their_protocol, "Client tried to connect to a peer with an unsupported protocol. Tracking the latest one");
unsupported_protocol_tracker = Some((our_protocol, their_protocol));
}
Ok(ClientEvent::ConnectedToNetwork) => {
is_connected = true;
info!("Client connected to the Network {is_connected:?}.");
break;
break Ok(());
}
Ok(ClientEvent::InactiveClient(timeout)) => {
if is_connected {
Expand All @@ -220,12 +232,12 @@ impl Client {
Err(err) => {
error!("Unexpected error during client startup {err:?}");
println!("Unexpected error during client startup {err:?}");
return Err(err.into());
break Err(err.into());
}
_ => {}
}
}}
}
}?;

Ok(client)
}
Expand All @@ -251,6 +263,16 @@ impl Client {
debug!("{peers_added}/{CLOSE_GROUP_SIZE} initial peers found.",);
}
}
NetworkEvent::PeerWithUnsupportedProtocol {
our_protocol,
their_protocol,
} => {
self.events_broadcaster
.broadcast(ClientEvent::PeerWithUnsupportedProtocol {
our_protocol,
their_protocol,
});
}
_other => {}
}

Expand Down
3 changes: 3 additions & 0 deletions sn_client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ pub enum Error {
#[error("Could not find register after batch sync: {0:?}")]
RegisterNotFoundAfterUpload(XorName),

#[error("Could not connect due to incompatible network protocols. Our protocol: {0} Network protocol: {1}")]
UnsupportedProtocol(String, String),

// ------ Upload Errors --------
#[error("Overflow occurred while adding values")]
NumericOverflow,
Expand Down
5 changes: 5 additions & 0 deletions sn_client/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ pub enum ClientEvent {
/// A peer has been added to the Routing table.
/// Also contains the max number of peers to connect to before we receive ClientEvent::ConnectedToNetwork
PeerAdded { max_peers_to_connect: usize },
/// We've encountered a Peer with an unsupported protocol.
PeerWithUnsupportedProtocol {
our_protocol: String,
their_protocol: String,
},
/// The client has been connected to the network
ConnectedToNetwork,
/// No network activity has been received for a given duration
Expand Down
32 changes: 18 additions & 14 deletions sn_faucet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,37 +60,40 @@ async fn main() -> Result<()> {

let secret_key = bls::SecretKey::random();
let broadcaster = ClientEventsBroadcaster::default();
let handle = spawn_connection_progress_bar(broadcaster.subscribe());
let (progress_bar, handle) = spawn_connection_progress_bar(broadcaster.subscribe());
let result = Client::new(secret_key, bootstrap_peers, None, Some(broadcaster)).await;

// await on the progress bar to complete before handling the client result. If client errors out, we would
// want to make the progress bar clean up gracefully.
handle.await?;
match result {
Ok(client) => {
if let Err(err) = faucet_cmds(opt.cmd.clone(), &client).await {
error!("Failed to run faucet cmd {:?} with err {err:?}", opt.cmd)
}
let client = match result {
Ok(client) => client,
Err(err) => {
// clean up progress bar
progress_bar.finish_with_message("Could not connect to the network");
error!("Failed to get Client with err {err:?}");
return Err(err.into());
}
Err(err) => error!("Failed to get Client with err {err:?}"),
};
handle.await?;

if let Err(err) = faucet_cmds(opt.cmd.clone(), &client).await {
error!("Failed to run faucet cmd {:?} with err {err:?}", opt.cmd)
}

Ok(())
}

/// Helper to subscribe to the client events broadcaster and spin up a progress bar that terminates when the
/// client successfully connects to the network or if it errors out.
fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()> {
fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> (ProgressBar, JoinHandle<()>) {
// Network connection progress bar
let progress_bar = ProgressBar::new_spinner();
let progress_bar_clone = progress_bar.clone();
progress_bar.enable_steady_tick(Duration::from_millis(120));
progress_bar.set_message("Connecting to The SAFE Network...");
let new_style = progress_bar.style().tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈🔗");
progress_bar.set_style(new_style);

progress_bar.set_message("Connecting to The SAFE Network...");

tokio::spawn(async move {
let handle = tokio::spawn(async move {
let mut peers_connected = 0;
loop {
match rx.recv().await {
Expand All @@ -117,7 +120,8 @@ fn spawn_connection_progress_bar(mut rx: ClientEventsReceiver) -> JoinHandle<()>
_ => {}
}
}
})
});
(progress_bar_clone, handle)
}

#[derive(Parser)]
Expand Down
1 change: 1 addition & 0 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ impl NetworkBuilder {

// 1mb packet size
let _ = kad_cfg
.set_kbucket_inserts(libp2p::kad::BucketInserts::Manual)
.set_max_packet_size(MAX_PACKET_SIZE)
// Require iterative queries to use disjoint paths for increased resiliency in the presence of potentially adversarial nodes.
.disjoint_query_paths(true)
Expand Down
53 changes: 38 additions & 15 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use libp2p::{
Multiaddr, PeerId, TransportError,
};
use rand::{rngs::OsRng, Rng};
use sn_protocol::version::IDENTIFY_NODE_VERSION_STR;
use sn_protocol::version::{IDENTIFY_NODE_VERSION_STR, IDENTIFY_PROTOCOL_STR};
use sn_protocol::{
get_port_from_multiaddr,
messages::{CmdResponse, Query, Request, Response},
Expand Down Expand Up @@ -107,8 +107,13 @@ pub enum NetworkEvent {
},
/// Peer has been added to the Routing Table. And the number of connected peers.
PeerAdded(PeerId, usize),
// Peer has been removed from the Routing Table. And the number of connected peers.
/// Peer has been removed from the Routing Table. And the number of connected peers.
PeerRemoved(PeerId, usize),
/// The peer does not support our protocol
PeerWithUnsupportedProtocol {
our_protocol: String,
their_protocol: String,
},
/// The records bearing these keys are to be fetched from the holder or the network
KeysToFetchForReplication(Vec<(PeerId, RecordKey)>),
/// Started listening on a new address
Expand All @@ -120,13 +125,9 @@ pub enum NetworkEvent {
/// List of peer nodes that failed to fetch replication copy from.
FailedToFetchHolders(BTreeSet<PeerId>),
/// A peer in RT that supposed to be verified.
BadNodeVerification {
peer_id: PeerId,
},
BadNodeVerification { peer_id: PeerId },
/// Quotes to be verified
QuoteVerification {
quotes: Vec<(PeerId, PaymentQuote)>,
},
QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> },
/// Carry out chunk proof check against the specified record and peer
ChunkProofVerification {
peer_id: PeerId,
Expand All @@ -153,6 +154,12 @@ impl Debug for NetworkEvent {
"NetworkEvent::PeerRemoved({peer_id:?}, {connected_peers})"
)
}
NetworkEvent::PeerWithUnsupportedProtocol {
our_protocol,
their_protocol,
} => {
write!(f, "NetworkEvent::PeerWithUnsupportedProtocol({our_protocol:?}, {their_protocol:?})")
}
NetworkEvent::KeysToFetchForReplication(list) => {
let keys_len = list.len();
write!(f, "NetworkEvent::KeysForReplication({keys_len:?})")
Expand Down Expand Up @@ -219,10 +226,20 @@ impl SwarmDriver {
libp2p::identify::Event::Received { peer_id, info } => {
trace!(%peer_id, ?info, "identify: received info");

if info.protocol_version != IDENTIFY_PROTOCOL_STR.to_string() {
warn!(?info.protocol_version, "identify: {peer_id:?} does not have the same protocol. Our IDENTIFY_PROTOCOL_STR: {:?}", IDENTIFY_PROTOCOL_STR.as_str());

self.send_event(NetworkEvent::PeerWithUnsupportedProtocol {
our_protocol: IDENTIFY_PROTOCOL_STR.to_string(),
their_protocol: info.protocol_version,
});

return Ok(());
}

let has_dialed = self.dialed_peers.contains(&peer_id);
let peer_is_agent = info
.agent_version
.starts_with(&IDENTIFY_NODE_VERSION_STR.to_string());
Comment on lines -223 to -225
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should match the whole string here. And we are checking the peer is a node, so renamed the variable.

let peer_is_node =
info.agent_version == IDENTIFY_NODE_VERSION_STR.to_string();

// If we're not in local mode, only add globally reachable addresses.
// Strip the `/p2p/...` part of the multiaddresses.
Expand All @@ -243,8 +260,8 @@ impl SwarmDriver {

// When received an identify from un-dialed peer, try to dial it
// The dial shall trigger the same identify to be sent again and confirm
// peer is external accessable, hence safe to be added into RT.
if !self.local && peer_is_agent && !has_dialed {
// peer is external accessible, hence safe to be added into RT.
if !self.local && peer_is_node && !has_dialed {
// Only need to dial back for not fulfilled kbucket
let (kbucket_full, ilog2) = if let Some(kbucket) =
self.swarm.behaviour_mut().kademlia.kbucket(peer_id)
Expand Down Expand Up @@ -275,7 +292,7 @@ impl SwarmDriver {
};

if !kbucket_full {
info!(%peer_id, ?addrs, "received identify info from undialed peer for not full kbucket {:?}, dail back to confirm external accesable", ilog2);
info!(%peer_id, ?addrs, "received identify info from undialed peer for not full kbucket {ilog2:?}, dial back to confirm external accessible");
self.dialed_peers
.push(peer_id)
.map_err(|_| NetworkError::CircularVecPopFrontError)?;
Expand All @@ -297,7 +314,7 @@ impl SwarmDriver {
}

// If we are not local, we care only for peers that we dialed and thus are reachable.
if self.local || has_dialed && peer_is_agent {
if self.local || has_dialed && peer_is_node {
// To reduce the bad_node check resource usage,
// during the connection establish process, only check cached black_list
// The periodical check, which involves network queries shall filter
Expand Down Expand Up @@ -1029,6 +1046,12 @@ impl SwarmDriver {
event_string = "kad_event::UnroutablePeer";
trace!(peer_id = %peer, "kad::Event: UnroutablePeer");
}
kad::Event::RoutablePeer { peer, .. } => {
// We get this when we don't add a peer via the identify step.
// And we don't want to add these as they were rejected by identify for some reason.
event_string = "kad_event::RoutablePeer";
trace!(peer_id = %peer, "kad::Event: RoutablePeer");
}
other => {
event_string = "kad_event::Other";
trace!("kad::Event ignored: {other:?}");
Expand Down
3 changes: 3 additions & 0 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ impl Node {
Self::try_interval_replication(net);
});
}
NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
event_header = "PeerWithUnsupportedProtocol";
}
NetworkEvent::NewListenAddr(_) => {
event_header = "NewListenAddr";
if !cfg!(feature = "local-discovery") {
Expand Down
Loading