diff --git a/Cargo.lock b/Cargo.lock index b3b830e0ce20..c398c67f7a5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3487,10 +3487,8 @@ name = "talpid-openvpn" version = "0.0.0" dependencies = [ "async-trait", - "duct", "err-derive", "futures", - "is-terminal", "log", "once_cell", "os_pipe", diff --git a/Cargo.toml b/Cargo.toml index ffa8d8784a33..f68e491d9fb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ windows-sys = "0.48.0" chrono = { version = "0.4.26", default-features = false} clap = { version = "4.2.7", features = ["cargo", "derive"] } +once_cell = "1.13" [profile.release] diff --git a/android/translations-converter/Cargo.toml b/android/translations-converter/Cargo.toml index d0868d70e118..1c3b89e1d8f9 100644 --- a/android/translations-converter/Cargo.toml +++ b/android/translations-converter/Cargo.toml @@ -11,7 +11,7 @@ publish.workspace = true [dependencies] err-derive = { workspace = true } htmlize = { version = "1.0.2", features = ["unescape"] } -once_cell = "1.13" +once_cell = { workspace = true } regex = "1" serde = { version = "1", features = ["derive"] } quick-xml = { version = "0.27.1", features = ["serialize"] } diff --git a/mullvad-api/Cargo.toml b/mullvad-api/Cargo.toml index 83d8bf0723e9..08db7de3062c 100644 --- a/mullvad-api/Cargo.toml +++ b/mullvad-api/Cargo.toml @@ -26,7 +26,7 @@ tokio = { workspace = true, features = ["macros", "time", "rt-multi-thread", "ne tokio-rustls = "0.24.1" tokio-socks = "0.5.1" rustls-pemfile = "1.0.3" -once_cell = "1.13" +once_cell = { workspace = true } mullvad-fs = { path = "../mullvad-fs" } mullvad-types = { path = "../mullvad-types" } diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml index aaa893e170b3..cc89500abdcf 100644 --- a/mullvad-daemon/Cargo.toml +++ b/mullvad-daemon/Cargo.toml @@ -17,7 +17,7 @@ chrono = { workspace = true } err-derive = { workspace = true } fern = { version = "0.6", features = ["colored"] } futures = "0.3" -once_cell = "1.13" +once_cell = { workspace = true } libc = "0.2" log = { workspace = true } regex = "1.0" diff --git a/mullvad-management-interface/Cargo.toml b/mullvad-management-interface/Cargo.toml index 5a36da5a2952..dc70a47905d3 100644 --- a/mullvad-management-interface/Cargo.toml +++ b/mullvad-management-interface/Cargo.toml @@ -25,7 +25,7 @@ log = { workspace = true } [target.'cfg(unix)'.dependencies] nix = "0.23" -once_cell = "1.13" +once_cell = { workspace = true } [build-dependencies] tonic-build = { workspace = true, default-features = false, features = ["transport", "prost"] } diff --git a/mullvad-paths/Cargo.toml b/mullvad-paths/Cargo.toml index 3b4e01792026..78ddeefa9acc 100644 --- a/mullvad-paths/Cargo.toml +++ b/mullvad-paths/Cargo.toml @@ -16,7 +16,7 @@ log = { workspace = true } [target.'cfg(windows)'.dependencies] widestring = "1.0" -once_cell = "1.13" +once_cell = { workspace = true } [target.'cfg(windows)'.dependencies.windows-sys] workspace = true diff --git a/mullvad-problem-report/Cargo.toml b/mullvad-problem-report/Cargo.toml index 5bddff56408f..9001b4a18180 100644 --- a/mullvad-problem-report/Cargo.toml +++ b/mullvad-problem-report/Cargo.toml @@ -11,7 +11,7 @@ publish.workspace = true [dependencies] dirs = "5.0.1" err-derive = { workspace = true } -once_cell = "1.13" +once_cell = { workspace = true } log = { workspace = true } regex = "1.0" uuid = { version = "1.4.1", features = ["v4"] } diff --git a/mullvad-relay-selector/Cargo.toml b/mullvad-relay-selector/Cargo.toml index 6574b866a4a2..aa954ba610cb 100644 --- a/mullvad-relay-selector/Cargo.toml +++ b/mullvad-relay-selector/Cargo.toml @@ -25,4 +25,4 @@ mullvad-api = { path = "../mullvad-api" } mullvad-types = { path = "../mullvad-types" } [dev-dependencies] -once_cell = "1.13" +once_cell = { workspace = true } diff --git a/mullvad-setup/Cargo.toml b/mullvad-setup/Cargo.toml index df509a5e80f9..22fc63b54fd1 100644 --- a/mullvad-setup/Cargo.toml +++ b/mullvad-setup/Cargo.toml @@ -16,7 +16,7 @@ path = "src/main.rs" clap = { workspace = true } env_logger = { workspace = true } err-derive = { workspace = true } -once_cell = "1.13" +once_cell = { workspace = true } mullvad-management-interface = { path = "../mullvad-management-interface" } diff --git a/mullvad-types/Cargo.toml b/mullvad-types/Cargo.toml index db300ef00cd2..603f9df38883 100644 --- a/mullvad-types/Cargo.toml +++ b/mullvad-types/Cargo.toml @@ -12,7 +12,7 @@ publish.workspace = true chrono = { workspace = true, features = ["clock", "serde"] } err-derive = { workspace = true } ipnetwork = "0.16" -once_cell = "1.13" +once_cell = { workspace = true } log = { workspace = true } regex = "1" serde = { version = "1.0", features = ["derive"] } diff --git a/talpid-core/Cargo.toml b/talpid-core/Cargo.toml index b8c730a1bc35..e39b07f24462 100644 --- a/talpid-core/Cargo.toml +++ b/talpid-core/Cargo.toml @@ -9,11 +9,10 @@ edition.workspace = true publish.workspace = true [dependencies] -duct = "0.13" err-derive = { workspace = true } futures = "0.3.15" ipnetwork = "0.16" -once_cell = "1.13" +once_cell = { workspace = true } libc = "0.2" log = { workspace = true } parking_lot = "0.12.0" @@ -42,6 +41,7 @@ nftnl = { version = "0.6.2", features = ["nftnl-1-1-0"] } mnl = { version = "0.2.2", features = ["mnl-1-0-4"] } which = { version = "4.0", default-features = false } talpid-dbus = { path = "../talpid-dbus" } +duct = "0.13" [target.'cfg(target_os = "macos")'.dependencies] @@ -51,6 +51,7 @@ trust-dns-server = { version = "0.23.0", features = ["resolver"] } trust-dns-proto = "0.23.0" subslice = "0.2" async-trait = "0.1" +duct = "0.13" [target.'cfg(windows)'.dependencies] diff --git a/talpid-dbus/Cargo.toml b/talpid-dbus/Cargo.toml index aa37b3264037..60ab88f97312 100644 --- a/talpid-dbus/Cargo.toml +++ b/talpid-dbus/Cargo.toml @@ -10,7 +10,7 @@ publish.workspace = true [target.'cfg(target_os = "linux")'.dependencies] dbus = "0.9" err-derive = { workspace = true } -once_cell = "1.13" +once_cell = { workspace = true } log = { workspace = true } libc = "0.2" tokio = { workspace = true, features = ["rt"] } diff --git a/talpid-openvpn/Cargo.toml b/talpid-openvpn/Cargo.toml index 259bfa9b66fe..8106c4816d1f 100644 --- a/talpid-openvpn/Cargo.toml +++ b/talpid-openvpn/Cargo.toml @@ -11,11 +11,9 @@ publish.workspace = true [dependencies] async-trait = "0.1" -duct = "0.13" err-derive = { workspace = true } futures = "0.3.15" -is-terminal = "0.4.2" -once_cell = "1.13" +once_cell = { workspace = true } log = { workspace = true } os_pipe = "1.1.4" parking_lot = "0.12.0" diff --git a/talpid-openvpn/src/lib.rs b/talpid-openvpn/src/lib.rs index 6d2989160edb..e6080044cb05 100644 --- a/talpid-openvpn/src/lib.rs +++ b/talpid-openvpn/src/lib.rs @@ -7,10 +7,7 @@ use crate::proxy::{ProxyMonitor, ProxyResourceData}; use futures::channel::oneshot; #[cfg(windows)] use once_cell::sync::Lazy; -use process::{ - openvpn::{OpenVpnCommand, OpenVpnProcHandle}, - stoppable_process::StoppableProcess, -}; +use process::openvpn::{OpenVpnCommand, OpenVpnProcHandle}; #[cfg(target_os = "linux")] use std::collections::{HashMap, HashSet}; #[cfg(windows)] @@ -22,16 +19,15 @@ use std::{ process::ExitStatus, sync::{ atomic::{AtomicBool, Ordering}, - mpsc, Arc, Mutex, + mpsc, Arc, }, - thread, time::Duration, }; #[cfg(target_os = "linux")] use talpid_routing::{self, RequiredRoute}; use talpid_tunnel::TunnelEvent; use talpid_types::{net::openvpn, ErrorExt}; -use tokio::task; +use tokio::{sync::Mutex, task}; #[cfg(windows)] use widestring::U16CString; @@ -170,7 +166,7 @@ pub struct OpenVpnMonitor { >, abort_spawn: futures::future::AbortHandle, - child: Arc>>>, + child: Arc>>, proxy_monitor: Option>, closed: Arc, /// Keep the `TempFile` for the user-pass file in the struct, so it's removed on drop. @@ -437,16 +433,12 @@ impl OpenVpnMonitor { let close_handle = monitor.close_handle(); tokio::spawn(async move { if tunnel_close_rx.await.is_ok() { - tokio::task::spawn_blocking(move || { - if let Err(error) = close_handle.close() { - log::error!( - "{}", - error.display_chain_with_msg("Failed to close the tunnel") - ); - } - }) - .await - .expect("close handle panic"); + if let Err(error) = close_handle.close().await { + log::error!( + "{}", + error.display_chain_with_msg("Failed to close the tunnel") + ); + } } }); @@ -491,17 +483,18 @@ impl OpenVpnMonitor { } let handle = self.runtime.clone(); - - thread::spawn(move || { - tx_tunnel.send(Stopped::Tunnel(self.wait_tunnel())).unwrap(); + handle.spawn(async move { + tx_tunnel + .send(Stopped::Tunnel(self.wait_tunnel().await)) + .unwrap(); let _ = proxy_close_handle.close(); }); - thread::spawn(move || { + handle.spawn(async move { tx_proxy - .send(Stopped::Proxy(handle.block_on(proxy_monitor.wait()))) + .send(Stopped::Proxy(proxy_monitor.wait().await)) .unwrap(); - let _ = tunnel_close_handle.close(); + tunnel_close_handle.close().await }); let result = rx.recv().expect("wait got no result"); @@ -516,13 +509,19 @@ impl OpenVpnMonitor { } } else { // No proxy active, wait only for the tunnel. - self.wait_tunnel() + let handle = self.runtime.clone(); + let (tx_tunnel, rx) = mpsc::channel(); + handle.spawn(async move { + let x = self.wait_tunnel(); + tx_tunnel.send(x.await).unwrap(); + }); + rx.recv().expect("wait_tunnel got no result") } } /// Supplement `inner_wait_tunnel()` with logging and error handling. - fn wait_tunnel(self) -> Result<()> { - let result = self.inner_wait_tunnel(); + async fn wait_tunnel(self) -> Result<()> { + let result = self.inner_wait_tunnel().await; match result { WaitResult::Preparation(result) => match result { Err(error) => { @@ -559,13 +558,15 @@ impl OpenVpnMonitor { /// Waits for both the child process and the event dispatcher in parallel. After both have /// returned this returns the earliest result. - fn inner_wait_tunnel(mut self) -> WaitResult { + async fn inner_wait_tunnel(mut self) -> WaitResult { let child = match self - .runtime - .block_on(self.spawn_task.take().unwrap()) + .spawn_task + .take() + .unwrap() + .await .expect("spawn task panicked") { - Ok(Ok(child)) => Arc::new(child), + Ok(Ok(child)) => child, Ok(Err(error)) => { self.closed.swap(true, Ordering::SeqCst); return WaitResult::Preparation(Err(error)); @@ -574,41 +575,33 @@ impl OpenVpnMonitor { }; if self.closed.load(Ordering::SeqCst) { - let _ = child.kill(); + let _ = child.kill().await; return WaitResult::Preparation(Ok(())); } { - self.child.lock().unwrap().replace(child.clone()); + self.child.lock().await.replace(child); } - let closed_handle = self.closed.clone(); - let child_close_handle = self.close_handle(); - - let (child_tx, rx) = mpsc::channel(); - let dispatcher_tx = child_tx.clone(); - let event_server_abort_tx = self.event_server_abort_tx.clone(); - thread::spawn(move || { - let result = child.wait(); - let closed = closed_handle.load(Ordering::SeqCst); - child_tx.send(WaitResult::Child(result, closed)).unwrap(); + let kill_child = async move { + let result = self.child.lock().await.as_ref().unwrap().wait().await; + let closed = self.closed.load(Ordering::SeqCst); + let result = WaitResult::Child(result, closed); event_server_abort_tx.trigger(); - }); - - let server_join_handle = self - .server_join_handle - .take() - .expect("No event server quit handle"); - self.runtime.spawn(async move { + result + }; + let kill_event_dispatcher = async move { + let server_join_handle = self + .server_join_handle + .take() + .expect("No event server quit handle"); let _ = server_join_handle.await; - dispatcher_tx.send(WaitResult::EventDispatcher).unwrap(); - let _ = child_close_handle.close(); - }); + WaitResult::EventDispatcher + }; - let result = rx.recv().expect("inner_wait_tunnel no result"); - let _ = rx.recv().expect("inner_wait_tunnel no second result"); + let (result, _) = tokio::join!(kill_child, kill_event_dispatcher); result } @@ -726,18 +719,18 @@ impl OpenVpnMonitor { /// A handle to an `OpenVpnMonitor` for closing it. #[derive(Debug, Clone)] pub struct OpenVpnCloseHandle { - child: Arc>>>, + child: Arc>>, abort_spawn: futures::future::AbortHandle, closed: Arc, } impl OpenVpnCloseHandle { /// Kills the underlying OpenVPN process, making the `OpenVpnMonitor::wait` method return. - pub fn close(self) -> io::Result<()> { + pub async fn close(self) -> io::Result<()> { if !self.closed.swap(true, Ordering::SeqCst) { self.abort_spawn.abort(); - if let Some(child) = self.child.lock().unwrap().as_ref() { - child.kill() + if let Some(child) = self.child.lock().await.as_ref() { + child.kill().await } else { Ok(()) } @@ -775,12 +768,13 @@ pub trait OpenVpnBuilder { } /// Trait for types acting as handles to subprocesses for `OpenVpnMonitor` +#[async_trait::async_trait] pub trait ProcessHandle: Send + Sync + 'static { /// Block until the subprocess exits or there is an error in the wait syscall. - fn wait(&self) -> io::Result; + async fn wait(&self) -> io::Result; /// Kill the subprocess. - fn kill(&self) -> io::Result<()>; + async fn kill(&self) -> io::Result<()>; } impl OpenVpnBuilder for OpenVpnCommand { @@ -799,7 +793,7 @@ impl OpenVpnBuilder for OpenVpnCommand { } fn start(&self) -> io::Result { - OpenVpnProcHandle::new(self.build()) + OpenVpnProcHandle::new(&mut self.build()) } #[cfg(target_os = "linux")] @@ -809,13 +803,14 @@ impl OpenVpnBuilder for OpenVpnCommand { } } +#[async_trait::async_trait] impl ProcessHandle for OpenVpnProcHandle { - fn wait(&self) -> io::Result { - self.inner.wait().map(|output| output.status) + async fn wait(&self) -> io::Result { + self.wait().await } - fn kill(&self) -> io::Result<()> { - self.nice_kill(OPENVPN_DIE_TIMEOUT) + async fn kill(&self) -> io::Result<()> { + self.nice_kill(OPENVPN_DIE_TIMEOUT).await } } @@ -1219,32 +1214,25 @@ mod tests { #[derive(Debug, Copy, Clone)] struct TestProcessHandle(i32); + #[async_trait::async_trait] impl ProcessHandle for TestProcessHandle { #[cfg(unix)] - fn wait(&self) -> io::Result { + async fn wait(&self) -> io::Result { use std::os::unix::process::ExitStatusExt; Ok(ExitStatus::from_raw(self.0)) } #[cfg(windows)] - fn wait(&self) -> io::Result { + async fn wait(&self) -> io::Result { use std::os::windows::process::ExitStatusExt; Ok(ExitStatus::from_raw(self.0 as u32)) } - fn kill(&self) -> io::Result<()> { + async fn kill(&self) -> io::Result<()> { Ok(()) } } - fn new_runtime() -> Result { - tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .enable_all() - .build() - .map_err(Error::RuntimeError) - } - fn create_init_args_plugin_log( plugin_path: PathBuf, log_path: Option, @@ -1269,131 +1257,111 @@ mod tests { create_init_args_plugin_log("".into(), None) } - #[test] - fn sets_plugin() { + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn sets_plugin() { let builder = TestOpenVpnBuilder::default(); - let runtime = new_runtime().unwrap(); let openvpn_init_args = create_init_args_plugin_log("./my_test_plugin".into(), None); - let _ = runtime.block_on(async { - OpenVpnMonitor::new_internal( - builder.clone(), - openvpn_init_args, - TestOpenvpnEventProxy {}, - #[cfg(windows)] - Box::new(TestWintunContext {}), - ) - }); + let _ = OpenVpnMonitor::new_internal( + builder.clone(), + openvpn_init_args, + TestOpenvpnEventProxy {}, + #[cfg(windows)] + Box::new(TestWintunContext {}), + ); assert_eq!( Some(PathBuf::from("./my_test_plugin")), *builder.plugin.lock() ); } - #[test] - fn sets_log() { + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn sets_log() { let builder = TestOpenVpnBuilder::default(); - let runtime = new_runtime().unwrap(); let openvpn_init_args = create_init_args_plugin_log("".into(), Some(PathBuf::from("./my_test_log_file"))); - let _ = runtime.block_on(async { - OpenVpnMonitor::new_internal( - builder.clone(), - openvpn_init_args, - TestOpenvpnEventProxy {}, - #[cfg(windows)] - Box::new(TestWintunContext {}), - ) - }); + let _ = OpenVpnMonitor::new_internal( + builder.clone(), + openvpn_init_args, + TestOpenvpnEventProxy {}, + #[cfg(windows)] + Box::new(TestWintunContext {}), + ); assert_eq!( Some(PathBuf::from("./my_test_log_file")), *builder.log.lock() ); } - #[test] - fn exit_successfully() { + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn exit_successfully() { let builder = TestOpenVpnBuilder { process_handle: Some(TestProcessHandle(0)), ..Default::default() }; - let runtime = new_runtime().unwrap(); let openvpn_init_args = create_init_args(); - let testee = runtime - .block_on(async { - OpenVpnMonitor::new_internal( - builder, - openvpn_init_args, - TestOpenvpnEventProxy {}, - #[cfg(windows)] - Box::new(TestWintunContext {}), - ) - }) - .unwrap(); + let testee = OpenVpnMonitor::new_internal( + builder, + openvpn_init_args, + TestOpenvpnEventProxy {}, + #[cfg(windows)] + Box::new(TestWintunContext {}), + ) + .unwrap(); assert!(testee.wait().is_ok()); } - #[test] - fn exit_error() { + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn exit_error() { let builder = TestOpenVpnBuilder { process_handle: Some(TestProcessHandle(1)), ..Default::default() }; - let runtime = new_runtime().unwrap(); let openvpn_init_args = create_init_args(); - let testee = runtime - .block_on(async move { - OpenVpnMonitor::new_internal( - builder, - openvpn_init_args, - TestOpenvpnEventProxy {}, - #[cfg(windows)] - Box::new(TestWintunContext {}), - ) - }) - .unwrap(); + let testee = OpenVpnMonitor::new_internal( + builder, + openvpn_init_args, + TestOpenvpnEventProxy {}, + #[cfg(windows)] + Box::new(TestWintunContext {}), + ) + .unwrap(); assert!(testee.wait().is_err()); } - #[test] - fn wait_closed() { + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn wait_closed() { let builder = TestOpenVpnBuilder { process_handle: Some(TestProcessHandle(1)), ..Default::default() }; - let runtime = new_runtime().unwrap(); let openvpn_init_args = create_init_args(); - let testee = runtime - .block_on(async { - OpenVpnMonitor::new_internal( - builder, - openvpn_init_args, - TestOpenvpnEventProxy {}, - #[cfg(windows)] - Box::new(TestWintunContext {}), - ) - }) - .unwrap(); + let testee = OpenVpnMonitor::new_internal( + builder, + openvpn_init_args, + TestOpenvpnEventProxy {}, + #[cfg(windows)] + Box::new(TestWintunContext {}), + ) + .unwrap(); - testee.close_handle().close().unwrap(); - assert!(testee.wait().is_ok()); + testee.close_handle().close().await.unwrap(); + let result = testee.wait(); + println!("[testee.wait(): {:?}]", result); + assert!(result.is_ok()); } - #[test] - fn failed_process_start() { + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn failed_process_start() { let builder = TestOpenVpnBuilder::default(); - let runtime = new_runtime().unwrap(); let openvpn_init_args = create_init_args(); - let result = runtime - .block_on(async { - OpenVpnMonitor::new_internal( - builder, - openvpn_init_args, - TestOpenvpnEventProxy {}, - #[cfg(windows)] - Box::new(TestWintunContext {}), - ) - }) - .unwrap(); + let result = OpenVpnMonitor::new_internal( + builder, + openvpn_init_args, + TestOpenvpnEventProxy {}, + #[cfg(windows)] + Box::new(TestWintunContext {}), + ) + .unwrap(); match result.wait() { Err(Error::StartProcessError) => (), _ => panic!("Wrong error"), diff --git a/talpid-openvpn/src/process/mod.rs b/talpid-openvpn/src/process/mod.rs index 54cdeb904213..f496df31ae7f 100644 --- a/talpid-openvpn/src/process/mod.rs +++ b/talpid-openvpn/src/process/mod.rs @@ -1,6 +1,3 @@ /// A module for all OpenVPN related process management. #[cfg(not(target_os = "android"))] pub mod openvpn; - -/// A trait for stopping subprocesses gracefully. -pub mod stoppable_process; diff --git a/talpid-openvpn/src/process/openvpn.rs b/talpid-openvpn/src/process/openvpn.rs index 722468e3ca1c..44e00d0eda0c 100644 --- a/talpid-openvpn/src/process/openvpn.rs +++ b/talpid-openvpn/src/process/openvpn.rs @@ -1,6 +1,3 @@ -use duct; - -use super::stoppable_process::StoppableProcess; use os_pipe::{pipe, PipeWriter}; use parking_lot::Mutex; use shell_escape; @@ -190,9 +187,11 @@ impl OpenVpnCommand { } /// Build a runnable expression from the current state of the command. - pub fn build(&self) -> duct::Expression { + pub fn build(&self) -> tokio::process::Command { log::debug!("Building expression: {}", &self); - duct::cmd(&self.openvpn_bin, self.get_arguments()).unchecked() + let mut handle = tokio::process::Command::new(&self.openvpn_bin); + handle.args(self.get_arguments()); + handle } /// Returns all arguments that the subprocess would be spawned with. @@ -365,8 +364,11 @@ impl fmt::Display for OpenVpnCommand { /// Handle to a running OpenVPN process. pub struct OpenVpnProcHandle { - /// Duct handle - pub inner: duct::Handle, + /// Handle to the child process running OpenVPN. + /// + /// This handle is acquired by calling [`OpenVpnCommand::build`] (or + /// [`tokio::process::Command::spawn`]). + pub inner: std::sync::Arc>, /// Pipe handle to stdin of the OpenVPN process. Our custom fork of OpenVPN /// has been changed so that it exits cleanly when stdin is closed. This is a hack /// solution to cleanly shut OpenVPN down without using the @@ -377,62 +379,85 @@ pub struct OpenVpnProcHandle { impl OpenVpnProcHandle { /// Configures the expression to run OpenVPN in a way compatible with this handle /// and spawns it. Returns the handle. - pub fn new(mut cmd: duct::Expression) -> io::Result { - use is_terminal::IsTerminal; + pub fn new(mut cmd: &mut tokio::process::Command) -> io::Result { + use std::io::IsTerminal; if !std::io::stdout().is_terminal() { - cmd = cmd.stdout_null(); + cmd = cmd.stdout(std::process::Stdio::null()) } if !std::io::stderr().is_terminal() { - cmd = cmd.stderr_null(); + cmd = cmd.stderr(std::process::Stdio::null()) } let (reader, writer) = pipe()?; - let proc_handle = cmd.stdin_file(reader).start()?; + let proc_handle = cmd.stdin(reader).spawn()?; Ok(Self { - inner: proc_handle, + inner: std::sync::Arc::new(tokio::sync::Mutex::new(proc_handle)), stdin: Mutex::new(Some(writer)), }) } -} -impl StoppableProcess for OpenVpnProcHandle { - fn stop(&self) { + /// Attempts to stop the OpenVPN process gracefully in the given time + /// period, otherwise kills the process. + pub async fn nice_kill(&self, timeout: std::time::Duration) -> io::Result<()> { + log::debug!("Trying to stop child process gracefully"); + self.stop().await; + + // Wait for the process to die for a maximum of `timeout`. + let wait_result = tokio::time::timeout(timeout, self.wait()).await; + match wait_result { + Ok(_) => log::debug!("Child process terminated gracefully"), + Err(_) => { + log::warn!( + "Child process did not terminate gracefully within timeout, forcing termination" + ); + self.kill().await?; + } + } + Ok(()) + } + + /// Waits for the child to exit completely, returning the status that it + /// exited with. See [tokio::process::Child::wait] for in-depth + /// documentation. + async fn wait(&self) -> io::Result { + self.inner.lock().await.wait().await + } + + /// Kill the OpenVPN process and drop its stdin handle. + async fn stop(&self) { // Dropping our stdin handle so that it is closed once. Closing the handle should // gracefully stop our OpenVPN child process. if self.stdin.lock().take().is_none() { log::warn!("Tried to close OpenVPN stdin handle twice, this is a bug"); } + self.clean_up().await } - fn kill(&self) -> io::Result<()> { + async fn kill(&self) -> io::Result<()> { log::warn!("Killing OpenVPN process"); - self.inner.kill()?; + self.inner.lock().await.kill().await?; log::debug!("OpenVPN forcefully killed"); Ok(()) } - fn has_stopped(&self) -> io::Result { - match self.inner.try_wait() { - Ok(None) => Ok(false), - Ok(Some(_)) => Ok(true), - Err(e) => Err(e), - } + async fn has_stopped(&self) -> io::Result { + let exit_status = self.inner.lock().await.try_wait()?; + Ok(exit_status.is_some()) } -} -impl Drop for OpenVpnProcHandle { - fn drop(&mut self) { - let result = match self.has_stopped() { - Ok(false) => self.kill(), + /// Try to kill the OpenVPN process. + async fn clean_up(&self) { + let result = match self.has_stopped().await { + Ok(false) => self.kill().await, Err(e) => { log::error!( "{}", e.display_chain_with_msg("Failed to check if OpenVPN is running") ); - self.kill() + self.kill().await } _ => Ok(()), }; diff --git a/talpid-openvpn/src/process/stoppable_process.rs b/talpid-openvpn/src/process/stoppable_process.rs deleted file mode 100644 index 3681c6cfa878..000000000000 --- a/talpid-openvpn/src/process/stoppable_process.rs +++ /dev/null @@ -1,53 +0,0 @@ -use std::{ - io, thread, - time::{Duration, Instant}, -}; - -static POLL_INTERVAL_MS: Duration = Duration::from_millis(50); - -/// A best effort attempt at stopping a subprocess whilst also ensuring that the subprocess is -/// killed eventually. -pub trait StoppableProcess -where - Self: Sized, -{ - /// Gracefully stops a process. - fn stop(&self); - - /// Kills a process unconditionally. Implementations should strive to never fail. - fn kill(&self) -> io::Result<()>; - - /// Check if process is stopped. This method must not block. - fn has_stopped(&self) -> io::Result; - - /// Attempts to stop a process gracefully in the given time period, otherwise kills the - /// process. - fn nice_kill(&self, timeout: Duration) -> io::Result<()> { - log::debug!("Trying to stop child process gracefully"); - self.stop(); - if wait_timeout(self, timeout)? { - log::debug!("Child process terminated gracefully"); - } else { - log::warn!( - "Child process did not terminate gracefully within timeout, forcing termination" - ); - self.kill()?; - } - Ok(()) - } -} -/// Wait for a process to die for a maximum of `timeout`. Returns true if the process died within -/// the timeout. -fn wait_timeout(process: &T, timeout: Duration) -> io::Result -where - T: StoppableProcess + Sized, -{ - let timer = Instant::now(); - while timer.elapsed() < timeout { - if process.has_stopped()? { - return Ok(true); - } - thread::sleep(POLL_INTERVAL_MS); - } - Ok(false) -} diff --git a/talpid-routing/Cargo.toml b/talpid-routing/Cargo.toml index 50bd4c1477a2..56fb12b13c31 100644 --- a/talpid-routing/Cargo.toml +++ b/talpid-routing/Cargo.toml @@ -21,7 +21,7 @@ talpid-types = { path = "../talpid-types" } [target.'cfg(target_os = "linux")'.dependencies] libc = "0.2" -once_cell = "1.13" +once_cell = { workspace = true } rtnetlink = "0.11" netlink-packet-route = "0.13" netlink-sys = "0.8.3" diff --git a/talpid-wireguard/Cargo.toml b/talpid-wireguard/Cargo.toml index 04ea3fc00bc2..f8e854a7c56f 100644 --- a/talpid-wireguard/Cargo.toml +++ b/talpid-wireguard/Cargo.toml @@ -14,7 +14,7 @@ err-derive = { workspace = true } futures = "0.3.15" hex = "0.4" ipnetwork = "0.16" -once_cell = "1.13" +once_cell = { workspace = true } libc = "0.2" log = { workspace = true } parking_lot = "0.12.0"