Skip to content

Commit

Permalink
Refactor session_keeper to use telio_pinger
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasz-grz committed Feb 7, 2025
1 parent ac1d09c commit 5d003c1
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 139 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/telio-pinger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ impl Pinger {
}
Client::new(&config_builder.build())
}

pub fn get_client(self, proto: ICMP) -> Option<Arc<Client>> {
match proto {
ICMP::V4 => Some(self.client_v4),
ICMP::V6 => self.client_v6,
}
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions crates/telio-traversal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ telio-sockets.workspace = true
telio-task.workspace = true
telio-utils.workspace = true
telio-wg.workspace = true
telio-pinger.workspace = true

[dev-dependencies]
env_logger.workspace = true
Expand Down
166 changes: 27 additions & 139 deletions crates/telio-traversal/src/session_keeper.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
use async_trait::async_trait;
use socket2::Type;
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
use std::time::Duration;
use surge_ping::{
Client as PingerClient, Config as PingerConfig, ConfigBuilder, PingIdentifier, PingSequence,
SurgeError, ICMP,
};
use surge_ping::SurgeError;
use telio_crypto::PublicKey;
use telio_pinger::Pinger;
use telio_sockets::SocketPool;
use telio_task::{task_exec, BoxAction, Runtime, Task};
use telio_utils::{
dual_target, repeated_actions, telio_log_debug, telio_log_warn, DualTarget, RepeatedActions,
};
const PING_PAYLOAD_SIZE: usize = 56;

/// Possible [SessionKeeper] errors.
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -62,120 +57,27 @@ pub struct SessionKeeper {
impl SessionKeeper {
pub fn start(sock_pool: Arc<SocketPool>, batch_all: bool) -> Result<Self> {
telio_log_debug!("Starting with batch_all({})", batch_all);
let (client_v4, client_v6) = (
PingerClient::new(&Self::make_builder(ICMP::V4).build())
.map_err(|e| Error::PingerCreationError(ICMP::V4, e))?,
PingerClient::new(&Self::make_builder(ICMP::V6).build())
.map_err(|e| Error::PingerCreationError(ICMP::V6, e))?,
);

sock_pool.make_internal(client_v4.get_socket().get_native_sock())?;
sock_pool.make_internal(client_v6.get_socket().get_native_sock())?;
let pinger = Pinger::new(1, true, sock_pool)?;

Ok(Self {
batch_all,
task: Task::start(State {
pingers: Pingers {
pinger_client_v4: client_v4,
pinger_client_v6: client_v6,
},
pinger,
actions: RepeatedActions::default(),
sock_pool,
}),
})
}

fn make_builder(proto: ICMP) -> ConfigBuilder {
let mut config_builder = PingerConfig::builder().kind(proto);
if cfg!(any(target_os = "macos", target_os = "tvos",)) {
config_builder = config_builder.sock_type_hint(Type::RAW);
}
// Raw sockets are not allowed on iOS, instead use
// socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
//
// TODO: LLT-5991: Refactor writing ICMP packets directly into the tunnel
// TODO: Check if raw sockets are supported on tvos
if cfg!(target_os = "ios") {
config_builder = config_builder.sock_type_hint(Type::DGRAM);
}
if cfg!(not(target_os = "android")) {
match proto {
ICMP::V4 => {
config_builder = config_builder.bind((Ipv4Addr::UNSPECIFIED, 0).into());
}
ICMP::V6 => {
config_builder = config_builder.bind((Ipv6Addr::UNSPECIFIED, 0).into());
}
}
}
config_builder
}

pub async fn stop(self) {
let _ = self.task.stop().await.resume_unwind();
}

#[cfg(test)]
async fn get_pinger_client(&self, proto: ICMP) -> Result<PingerClient> {
task_exec!(&self.task, async move |s| {
Ok(match proto {
ICMP::V4 => s.pingers.pinger_client_v4.clone(),
ICMP::V6 => s.pingers.pinger_client_v6.clone(),
})
})
.await
.map_err(Error::Task)
}
}

async fn ping(pingers: &Pingers, targets: (&PublicKey, &DualTarget)) -> Result<()> {
let (primary, secondary) = targets.1.get_targets()?;
let public_key = targets.0;

let primary_client = match primary {
IpAddr::V4(_) => &pingers.pinger_client_v4,
IpAddr::V6(_) => &pingers.pinger_client_v6,
};

let mut ping_id = PingIdentifier(rand::random());
telio_log_debug!(
"Pinging primary target {:?} on {:?}, {:#06x}",
public_key,
primary,
ping_id.0
);

if let Err(e) = primary_client
.pinger(primary, ping_id)
.await
.send_ping(PingSequence(0), &[0; PING_PAYLOAD_SIZE])
.await
{
telio_log_warn!("Primary target failed: {}", e.to_string());

if let Some(second) = secondary {
ping_id = PingIdentifier(rand::random());
telio_log_debug!(
"Pinging secondary target {:?} on {:?}, {:#06x}",
public_key,
second,
ping_id.0
);

let secondary_client = match second {
IpAddr::V4(_) => &pingers.pinger_client_v4,
IpAddr::V6(_) => &pingers.pinger_client_v6,
};

secondary_client
.pinger(second, ping_id)
.await
.send_ping(PingSequence(0), &[0; PING_PAYLOAD_SIZE])
.await?;
}
async fn get_pinger(&self) -> Result<Pinger> {
task_exec!(&self.task, async move |s| Ok(s.pinger.clone()))
.await
.map_err(Error::Task)
}

Ok(())
}

#[async_trait]
Expand Down Expand Up @@ -210,29 +112,17 @@ impl SessionKeeperTrait for SessionKeeper {
interval,
Arc::new(move |c| {
Box::pin(async move {
// This is a temporary solution for iOS due to NECP re-binding the socket to
// the main interface after every write, refer to LLT-5886
//
// TODO: LLT-5991: Refactor writing ICMP packets directly into the tunnel
if cfg!(target_os = "ios") {
if let Err(e) = c.sock_pool.make_internal(
c.pingers.pinger_client_v4.get_socket().get_native_sock(),
) {
telio_log_warn!("Failed to make socket internal, error: {:?}", e);
let result = c.pinger.perform(dual_target).await;
let failed = match (result.v4, result.v6) {
(None, None) => true,
(Some(v4res), None) => v4res.successful_pings == 0,
(None, Some(v6res)) => v6res.successful_pings == 0,
(Some(v4res), Some(v6res)) => {
(v4res.successful_pings == 0) && (v6res.successful_pings == 0)
}
if let Err(e) = c.sock_pool.make_internal(
c.pingers.pinger_client_v6.get_socket().get_native_sock(),
) {
telio_log_warn!("Failed to make socket internal, error: {:?}", e);
}
}

if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await {
telio_log_warn!(
"Failed to ping, peer with key: {:?}, error: {:?}",
public_key,
e
);
};
if failed {
telio_log_warn!("Failed to ping node: {:?}", public_key);
}
Ok(())
})
Expand Down Expand Up @@ -268,15 +158,9 @@ impl SessionKeeperTrait for SessionKeeper {
}
}

struct Pingers {
pinger_client_v4: PingerClient,
pinger_client_v6: PingerClient,
}

struct State {
pingers: Pingers,
pinger: Pinger,
actions: RepeatedActions<PublicKey, Self, Result<()>>,
sock_pool: Arc<SocketPool>,
}

#[async_trait]
Expand Down Expand Up @@ -314,6 +198,7 @@ impl Runtime for State {
mod tests {
use super::*;
use std::net::{Ipv4Addr, Ipv6Addr};
use surge_ping::ICMP;
use telio_crypto::PublicKey;
use telio_sockets::NativeProtector;
use telio_test::assert_elapsed;
Expand Down Expand Up @@ -346,14 +231,17 @@ mod tests {
.await
.unwrap();

let sock = sess_keep
.get_pinger_client(ICMP::V4)
let pinger_client = sess_keep
.get_pinger()
.await
.unwrap()
.get_socket();
.get_client(ICMP::V4)
.unwrap();

let sock = pinger_client.get_socket();

// Drop the pinger client explicitly, for it to stop listening on ICMP socket
drop(sess_keep.get_pinger_client(ICMP::V4).await.unwrap());
drop(pinger_client);

// Runtime
tokio::spawn(async move {
Expand Down

0 comments on commit 5d003c1

Please sign in to comment.