Skip to content

Commit

Permalink
Add SocketPool to Pinger
Browse files Browse the repository at this point in the history
In order to add SocketPool, Pinger had to be broken out of telio-utils
because of circular dependencies. LinkDetection had to be also updated
to use the new Pinger.
  • Loading branch information
tomasz-grz committed Jan 23, 2025
1 parent cf173ca commit 724ca62
Show file tree
Hide file tree
Showing 18 changed files with 225 additions and 51 deletions.
2 changes: 2 additions & 0 deletions .unreleased/LLT-5886_protected_pinger
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Use the SocketPool component in order to bind the Pinger raw socket to the tunnel interface on macOS.
LinkDetection component was also updated to use the new Pinger.
17 changes: 16 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ telio-task.workspace = true
telio-traversal.workspace = true
telio-utils.workspace = true
telio-wg.workspace = true
telio-pinger.workspace = true
once_cell.workspace = true
nat-detect.workspace = true
smart-default.workspace = true
Expand Down Expand Up @@ -199,6 +200,7 @@ telio-utils = { version = "0.1.0", path = "./crates/telio-utils" }
telio-wg = { version = "0.1.0", path = "./crates/telio-wg" }
telio-pq = { version = "0.1.0", path = "./crates/telio-pq" }
telio-pmtu = { version = "0.1.0", path = "./crates/telio-pmtu" }
telio-pinger = { version = "0.1.0", path = "./crates/telio-pinger" }

[profile.release]
opt-level = "s"
Expand Down
4 changes: 3 additions & 1 deletion crates/telio-nurse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ telio-proto.workspace = true
telio-task.workspace = true
telio-utils.workspace = true
telio-wg.workspace = true
telio-pinger.workspace = true
telio-sockets.workspace = true
once_cell.workspace = true

[dev-dependencies]
telio-sockets.workspace = true
telio-sockets = { workspace = true, features = ["mockall"] }
telio-wg = { workspace = true, features = ["mockall"] }
tokio = { workspace = true, features = ["net", "sync", "test-util"] }
telio-nurse = { workspace = true, features = ["mockall"] }
Expand Down
19 changes: 18 additions & 1 deletion crates/telio-nurse/src/nurse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use telio_crypto::{PublicKey, SecretKey};
use telio_lana::*;
use telio_model::event::Event;
use telio_sockets::SocketPool;
use telio_task::{
io::{chan, mc_chan, Chan, McChan},
task_exec, ExecError, Runtime, RuntimeExt, Task, WaitResponse,
Expand Down Expand Up @@ -60,9 +61,20 @@ impl Nurse {
io: NurseIo<'_>,
aggregator: Arc<ConnectivityDataAggregator>,
ipv6_enabled: bool,
socket_pool: Arc<SocketPool>,
) -> Self {
Self {
task: Task::start(State::new(public_key, config, io, aggregator, ipv6_enabled).await),
task: Task::start(
State::new(
public_key,
config,
io,
aggregator,
ipv6_enabled,
socket_pool,
)
.await,
),
}
}

Expand Down Expand Up @@ -120,6 +132,9 @@ impl State {
/// * `public_key` - Used for heartbeat requests.
/// * `config` - Contains configuration for heartbeats and QoS.
/// * `io` - Nurse io channels.
/// * `aggregator` - ConnectivityDataAggregator.
/// * `ipv6_enabled` - IPv6 support.
/// * `socket_pool` - SocketPool used to protect the sockets.
///
/// # Returns
///
Expand All @@ -130,6 +145,7 @@ impl State {
io: NurseIo<'_>,
aggregator: Arc<ConnectivityDataAggregator>,
ipv6_enabled: bool,
socket_pool: Arc<SocketPool>,
) -> Self {
let meshnet_id = Self::meshnet_id();
telio_log_debug!("Meshnet ID: {meshnet_id}");
Expand Down Expand Up @@ -181,6 +197,7 @@ impl State {
config_update_channel: config_update_channel.subscribe(),
},
ipv6_enabled,
socket_pool,
)))
} else {
None
Expand Down
24 changes: 18 additions & 6 deletions crates/telio-nurse/src/qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use telio_model::features::RttType;
use telio_task::{io::mc_chan, Runtime, RuntimeExt, WaitResponse};
use telio_wg::uapi::{AnalyticsEvent, PeerState};

use telio_utils::{
interval, telio_log_debug, telio_log_trace, DualPingResults, DualTarget, IpStack, Pinger,
};
use telio_pinger::{DualPingResults, Pinger};
use telio_sockets::SocketPool;
use telio_utils::{interval, telio_log_debug, telio_log_trace, DualTarget, IpStack};

use crate::{config::QoSConfig, data::MeshConfigUpdateEvent};

Expand Down Expand Up @@ -240,15 +240,22 @@ impl Analytics {
///
/// * `config` - Config for QoS component.
/// * `io` - Channel(s) for communicating with WireGuard.
/// * `ipv6_enabled` - IPv6 support.
/// * `socket_pool` - SocketPool used to protect the sockets.
///
/// # Returns
///
/// A new `Analytics` instance with the given configuration but with no nodes.
pub fn new(config: QoSConfig, io: Io, ipv6_enabled: bool) -> Self {
pub fn new(
config: QoSConfig,
io: Io,
ipv6_enabled: bool,
socket_pool: Arc<SocketPool>,
) -> Self {
let (ping_channel_tx, ping_channel_rx) = mpsc::channel(1);

let ping_backend = if config.rtt_types.contains(&RttType::Ping) {
Arc::new(Pinger::new(config.rtt_tries, ipv6_enabled).ok())
Arc::new(Pinger::new(config.rtt_tries, ipv6_enabled, socket_pool).ok())
} else {
Arc::new(None)
};
Expand Down Expand Up @@ -600,6 +607,7 @@ mod tests {
collections::HashSet,
net::{Ipv4Addr, Ipv6Addr},
};
use telio_sockets::protector::MockProtector;
use telio_task::{
io::{mc_chan::Tx, McChan},
task_exec, Task,
Expand Down Expand Up @@ -932,8 +940,12 @@ mod tests {
buckets: 5,
};

let mut protect = MockProtector::default();
protect.expect_make_internal().returning(|_| Ok(()));
let socket_pool = Arc::new(SocketPool::new(protect));

(
Analytics::new(config, io, true),
Analytics::new(config, io, true, socket_pool),
manual_trigger_channel.tx,
wg_channel.tx,
)
Expand Down
19 changes: 19 additions & 0 deletions crates/telio-pinger/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "telio-pinger"
version = "0.1.0"
edition = "2021"
license = "GPL-3.0-only"
repository = "https://github.com/NordSecurity/libtelio"
publish = false

[dependencies]
socket2.workspace = true
telio-utils.workspace = true
telio-sockets.workspace = true
surge-ping.workspace = true
rand.workspace = true
tracing.workspace = true

[dev-dependencies]
tokio = { workspace = true, features = ["net", "sync", "test-util"] }
telio-sockets = { workspace = true, features = ["mockall"] }
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::time::Duration;
use std::{convert::TryInto, net::IpAddr};
use surge_ping::{Client, Config as PingerConfig, PingIdentifier, PingSequence, ICMP};

use crate::{telio_log_debug, telio_log_error, DualTarget};
use telio_sockets::SocketPool;
use telio_utils::{telio_log_debug, telio_log_error, telio_log_trace, DualTarget};

/// Information needed to check the reachability of endpoints.
///
Expand Down Expand Up @@ -42,20 +43,33 @@ pub struct DualPingResults {
impl Pinger {
const PING_TIMEOUT: Duration = Duration::from_secs(5);

/// Create new instance of `Ping`.
/// Create new instance of `Ping` with a socket pool.
///
/// # Arguments
///
/// * `no_of_tries` - How many pings should be sent.
pub fn new(no_of_tries: u32, ipv6: bool) -> std::io::Result<Self> {
/// * `ipv6` - Enable IPv6 support.
/// * `socket_pool` - Optional SocketPool used to protect the sockets.
pub fn new(
no_of_tries: u32,
ipv6: bool,
socket_pool: Arc<SocketPool>,
) -> std::io::Result<Self> {
let client_v6 = if ipv6 {
Some(Arc::new(Self::build_client(ICMP::V6)?))
let client_v6 = Arc::new(Self::build_client(ICMP::V6)?);
telio_log_trace!("Making pinger IPv6 socket internal");
socket_pool.make_internal(client_v6.get_socket().get_native_sock())?;
Some(client_v6)
} else {
None
};

let client_v4 = Arc::new(Self::build_client(ICMP::V4)?);
telio_log_trace!("Making pinger IPv4 socket internal");
socket_pool.make_internal(client_v4.get_socket().get_native_sock())?;

Ok(Self {
client_v4: Arc::new(Self::build_client(ICMP::V4)?),
client_v4,
client_v6,
no_of_tries,
})
Expand Down Expand Up @@ -181,3 +195,42 @@ impl Pinger {
Client::new(&config_builder.build())
}
}

#[cfg(test)]
mod tests {
use super::*;
use telio_sockets::protector::MockProtector;

// Basic constructor test
#[tokio::test]
async fn test_pinger_new_v6_sock_pool() {
let mut protect = MockProtector::default();
protect.expect_make_internal().returning(|_| Ok(()));

let pinger = Pinger::new(1, true, Arc::new(SocketPool::new(protect)))
.expect("Failed to create Pinger");
assert!(pinger.client_v4.get_socket().get_native_sock() > 0);
assert!(pinger.client_v6.is_some());
assert_eq!(pinger.no_of_tries, 1);
}

// Basic ping test
#[tokio::test]
async fn test_ping_localhost() {
let mut protect = MockProtector::default();
protect.expect_make_internal().returning(|_| Ok(()));

let pinger = Pinger::new(2, false, Arc::new(SocketPool::new(protect)))
.expect("Failed to create Pinger");

let target =
DualTarget::new(("127.0.0.1".parse().ok(), None)).expect("Failed to create target");

let result = pinger.perform(target).await;
assert!(
result.v4.unwrap().successful_pings > 0,
"Expected at least one successful ping to 127.0.0.1"
);
assert!(result.v6.is_none());
}
}
1 change: 1 addition & 0 deletions crates/telio-sockets/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ parking_lot.workspace = true
socket2.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
mockall = { workspace = true, optional = true }

telio-utils.workspace = true
once_cell.workspace = true
Expand Down
30 changes: 30 additions & 0 deletions crates/telio-sockets/src/protector.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
//! Provides cross-platform abstractions for managing socket-level protection
//!
//! Operations like binding to an external or internal interface, applying firewall marks,
//! watching for default interface and route changes, etc.
//!
//! Some methods are no_op on specific platforms:
//! - [`Protector::set_fwmark`] on macOS and Windows since they don't have iptables.
//! - [`Protector::set_tunnel_interface`] on Linux since firewall marks are used to route packets.
//! - [`Protector::make_internal`] on Linux and Windows since the sockets by default are bound to the tunnel interface.
use std::{io, panic::RefUnwindSafe, sync::Arc};

use crate::native::NativeSocket;
Expand All @@ -18,22 +28,39 @@ pub mod platform;
#[path = "protector/unsupported.rs"]
pub mod platform;

/// Re-export the implementation for the current platform.
pub use platform::NativeProtector;

/// Alias for a closure that accepts a native socket.
///
/// Used as a callback in the [`make_external_protector`] function.
pub type Protect = Arc<dyn Fn(NativeSocket) + Send + Sync + RefUnwindSafe + 'static>;

/// A trait describing common operations on a socket.
///
/// Used to manage binding, automatic re-binding and routing rules on various platforms.
/// Some of the methods are no-op on specific platforms.
#[cfg_attr(any(test, feature = "mockall"), mockall::automock)]
pub trait Protector: Send + Sync {
/// Configure the provided socket to send packets externally (outside of the tunnel).
fn make_external(&self, socket: NativeSocket) -> io::Result<()>;

/// Configure the provided socket to send packets internally (inside of the tunnel).
fn make_internal(&self, socket: NativeSocket) -> io::Result<()>;

/// Clean up any references associated with the given socket.
fn clean(&self, socket: NativeSocket);

/// Update the firewall mark for this socket used in iptables and routing rules.
fn set_fwmark(&self, fwmark: u32);

/// Update the tunnel interface identifier to be applied when making the socket internal.
fn set_tunnel_interface(&self, interface: u64);
}

/// A blanket implementation of `Arc<Protector>`.
///
/// Used to call [`Protector`] methods directly without having to dereference.
impl<T: Protector + ?Sized> Protector for Arc<T> {
fn make_external(&self, socket: NativeSocket) -> io::Result<()> {
self.as_ref().make_external(socket)
Expand All @@ -56,6 +83,9 @@ impl<T: Protector + ?Sized> Protector for Arc<T> {
}
}

/// Construct a [`Protector`] instance that applies a closure.
///
/// The closure is called only during [`Protector::make_external`], all other methods are no-op.
pub fn make_external_protector(protect: Protect) -> Arc<(dyn Protector + 'static)> {
struct ProtectorMakeExternalCb(Protect);
impl Protector for ProtectorMakeExternalCb {
Expand Down
Loading

0 comments on commit 724ca62

Please sign in to comment.