diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 1dea3965d4..0ed6755b42 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -22,7 +22,8 @@ use std::sync::{ }; use std::time::Duration; use tokio::sync::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; -use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, @@ -220,22 +221,20 @@ impl fmt::Debug for LinkUnicastSerial { struct ListenerUnicastSerial { endpoint: EndPoint, token: CancellationToken, - tracker: TaskTracker, + handle: JoinHandle>, } impl ListenerUnicastSerial { - fn new(endpoint: EndPoint, token: CancellationToken, tracker: TaskTracker) -> Self { + fn new(endpoint: EndPoint, token: CancellationToken, handle: JoinHandle>) -> Self { Self { endpoint, token, - tracker, + handle, } } async fn stop(&self) { self.token.cancel(); - self.tracker.close(); - self.tracker.wait().await; } } @@ -320,7 +319,6 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { let c_manager = self.manager.clone(); let c_listeners = self.listeners.clone(); - let tracker = TaskTracker::new(); let task = async move { // Wait for the accept loop to terminate let res = @@ -328,10 +326,10 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { zasyncwrite!(c_listeners).remove(&c_path); res }; - tracker.spawn_on(task, &zenoh_runtime::ZRuntime::Acceptor); + let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); let locator = endpoint.to_locator(); - let listener = ListenerUnicastSerial::new(endpoint, token, tracker); + let listener = ListenerUnicastSerial::new(endpoint, token, handle); // Update the list of active listeners on the manager listeners.insert(path, listener); @@ -353,7 +351,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { // Send the stop signal listener.stop().await; - Ok(()) + listener.handle.await? } async fn get_listeners(&self) -> Vec { diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index 0304f6073f..53441ab89c 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -24,7 +24,8 @@ use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::RwLock as AsyncRwLock; -use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use uuid::Uuid; use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_link_commons::{ @@ -170,7 +171,7 @@ impl fmt::Debug for LinkUnicastUnixSocketStream { struct ListenerUnixSocketStream { endpoint: EndPoint, token: CancellationToken, - tracker: TaskTracker, + handle: JoinHandle>, lock_fd: RawFd, } @@ -178,21 +179,19 @@ impl ListenerUnixSocketStream { fn new( endpoint: EndPoint, token: CancellationToken, - tracker: TaskTracker, + handle: JoinHandle>, lock_fd: RawFd, ) -> ListenerUnixSocketStream { ListenerUnixSocketStream { endpoint, token, - tracker, + handle, lock_fd, } } async fn stop(&self) { self.token.cancel(); - self.tracker.close(); - self.tracker.wait().await; } } @@ -386,17 +385,16 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream { let c_listeners = self.listeners.clone(); let c_path = local_path_str.to_owned(); - let tracker = TaskTracker::new(); let task = async move { // Wait for the accept loop to terminate let res = accept_task(socket, c_token, c_manager).await; zasyncwrite!(c_listeners).remove(&c_path); res }; - tracker.spawn_on(task, &zenoh_runtime::ZRuntime::Acceptor); + let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); let locator = endpoint.to_locator(); - let listener = ListenerUnixSocketStream::new(endpoint, token, tracker, lock_fd); + let listener = ListenerUnixSocketStream::new(endpoint, token, handle, lock_fd); listeners.insert(local_path_str.to_owned(), listener); Ok(locator) @@ -417,6 +415,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream { // Send the stop signal listener.stop().await; + listener.handle.await??; //Release the lock let _ = nix::fcntl::flock(listener.lock_fd, nix::fcntl::FlockArg::UnlockNonblock); diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 7b2493c1e2..6a0cf64e6e 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -24,10 +24,11 @@ use std::sync::Arc; use std::time::Duration; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; +use tokio::task::JoinHandle; use tokio_tungstenite::accept_async; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; -use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, @@ -255,26 +256,24 @@ impl fmt::Debug for LinkUnicastWs { struct ListenerUnicastWs { endpoint: EndPoint, token: CancellationToken, - tracker: TaskTracker, + handle: JoinHandle>, } impl ListenerUnicastWs { fn new( endpoint: EndPoint, token: CancellationToken, - tracker: TaskTracker, + handle: JoinHandle>, ) -> ListenerUnicastWs { ListenerUnicastWs { endpoint, token, - tracker, + handle, } } async fn stop(&self) { self.token.cancel(); - self.tracker.close(); - self.tracker.wait().await; } } @@ -363,17 +362,16 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { let c_listeners = self.listeners.clone(); let c_addr = local_addr; - let tracker = TaskTracker::new(); let task = async move { // Wait for the accept loop to terminate let res = accept_task(socket, c_token, c_manager).await; zasyncwrite!(c_listeners).remove(&c_addr); res }; - tracker.spawn_on(task, &zenoh_runtime::ZRuntime::Acceptor); + let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); let locator = endpoint.to_locator(); - let listener = ListenerUnicastWs::new(endpoint, token, tracker); + let listener = ListenerUnicastWs::new(endpoint, token, handle); // Update the list of active listeners on the manager zasyncwrite!(self.listeners).insert(local_addr, listener); @@ -395,7 +393,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { // Send the stop signal listener.stop().await; - Ok(()) + listener.handle.await? } async fn get_listeners(&self) -> Vec {