Skip to content

Commit

Permalink
Use JoinHandle to manage the listening task
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Mar 5, 2024
1 parent 20310ee commit 4774c67
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 29 deletions.
18 changes: 8 additions & 10 deletions io/zenoh-links/zenoh-link-serial/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use std::sync::{
};
use std::time::Duration;
use tokio::sync::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use zenoh_core::{zasynclock, zasyncread, zasyncwrite};
use zenoh_link_commons::{
ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait,
Expand Down Expand Up @@ -220,22 +221,20 @@ impl fmt::Debug for LinkUnicastSerial {
struct ListenerUnicastSerial {
endpoint: EndPoint,
token: CancellationToken,
tracker: TaskTracker,
handle: JoinHandle<ZResult<()>>,
}

impl ListenerUnicastSerial {
fn new(endpoint: EndPoint, token: CancellationToken, tracker: TaskTracker) -> Self {
fn new(endpoint: EndPoint, token: CancellationToken, handle: JoinHandle<ZResult<()>>) -> Self {
Self {
endpoint,
token,
tracker,
handle,
}
}

async fn stop(&self) {
self.token.cancel();
self.tracker.close();
self.tracker.wait().await;
}
}

Expand Down Expand Up @@ -320,18 +319,17 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial {
let c_manager = self.manager.clone();
let c_listeners = self.listeners.clone();

let tracker = TaskTracker::new();
let task = async move {
// Wait for the accept loop to terminate
let res =
accept_read_task(link, c_token, c_manager, c_path.clone(), is_connected).await;
zasyncwrite!(c_listeners).remove(&c_path);
res
};
tracker.spawn_on(task, &zenoh_runtime::ZRuntime::Acceptor);
let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task);

let locator = endpoint.to_locator();
let listener = ListenerUnicastSerial::new(endpoint, token, tracker);
let listener = ListenerUnicastSerial::new(endpoint, token, handle);
// Update the list of active listeners on the manager
listeners.insert(path, listener);

Expand All @@ -353,7 +351,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial {

// Send the stop signal
listener.stop().await;
Ok(())
listener.handle.await?
}

async fn get_listeners(&self) -> Vec<EndPoint> {
Expand Down
17 changes: 8 additions & 9 deletions io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::RwLock as AsyncRwLock;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use zenoh_core::{zasyncread, zasyncwrite};
use zenoh_link_commons::{
Expand Down Expand Up @@ -170,29 +171,27 @@ impl fmt::Debug for LinkUnicastUnixSocketStream {
struct ListenerUnixSocketStream {
endpoint: EndPoint,
token: CancellationToken,
tracker: TaskTracker,
handle: JoinHandle<ZResult<()>>,
lock_fd: RawFd,
}

impl ListenerUnixSocketStream {
fn new(
endpoint: EndPoint,
token: CancellationToken,
tracker: TaskTracker,
handle: JoinHandle<ZResult<()>>,
lock_fd: RawFd,
) -> ListenerUnixSocketStream {
ListenerUnixSocketStream {
endpoint,
token,
tracker,
handle,
lock_fd,
}
}

async fn stop(&self) {
self.token.cancel();
self.tracker.close();
self.tracker.wait().await;
}
}

Expand Down Expand Up @@ -386,17 +385,16 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream {
let c_listeners = self.listeners.clone();
let c_path = local_path_str.to_owned();

let tracker = TaskTracker::new();
let task = async move {
// Wait for the accept loop to terminate
let res = accept_task(socket, c_token, c_manager).await;
zasyncwrite!(c_listeners).remove(&c_path);
res
};
tracker.spawn_on(task, &zenoh_runtime::ZRuntime::Acceptor);
let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task);

let locator = endpoint.to_locator();
let listener = ListenerUnixSocketStream::new(endpoint, token, tracker, lock_fd);
let listener = ListenerUnixSocketStream::new(endpoint, token, handle, lock_fd);
listeners.insert(local_path_str.to_owned(), listener);

Ok(locator)
Expand All @@ -417,6 +415,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream {

// Send the stop signal
listener.stop().await;
listener.handle.await??;

//Release the lock
let _ = nix::fcntl::flock(listener.lock_fd, nix::fcntl::FlockArg::UnlockNonblock);
Expand Down
18 changes: 8 additions & 10 deletions io/zenoh-links/zenoh-link-ws/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use tokio::task::JoinHandle;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tokio_util::sync::CancellationToken;
use zenoh_core::{zasynclock, zasyncread, zasyncwrite};
use zenoh_link_commons::{
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender,
Expand Down Expand Up @@ -255,26 +256,24 @@ impl fmt::Debug for LinkUnicastWs {
struct ListenerUnicastWs {
endpoint: EndPoint,
token: CancellationToken,
tracker: TaskTracker,
handle: JoinHandle<ZResult<()>>,
}

impl ListenerUnicastWs {
fn new(
endpoint: EndPoint,
token: CancellationToken,
tracker: TaskTracker,
handle: JoinHandle<ZResult<()>>,
) -> ListenerUnicastWs {
ListenerUnicastWs {
endpoint,
token,
tracker,
handle,
}
}

async fn stop(&self) {
self.token.cancel();
self.tracker.close();
self.tracker.wait().await;
}
}

Expand Down Expand Up @@ -363,17 +362,16 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs {
let c_listeners = self.listeners.clone();
let c_addr = local_addr;

let tracker = TaskTracker::new();
let task = async move {
// Wait for the accept loop to terminate
let res = accept_task(socket, c_token, c_manager).await;
zasyncwrite!(c_listeners).remove(&c_addr);
res
};
tracker.spawn_on(task, &zenoh_runtime::ZRuntime::Acceptor);
let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task);

let locator = endpoint.to_locator();
let listener = ListenerUnicastWs::new(endpoint, token, tracker);
let listener = ListenerUnicastWs::new(endpoint, token, handle);
// Update the list of active listeners on the manager
zasyncwrite!(self.listeners).insert(local_addr, listener);

Expand All @@ -395,7 +393,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs {

// Send the stop signal
listener.stop().await;
Ok(())
listener.handle.await?
}

async fn get_listeners(&self) -> Vec<EndPoint> {
Expand Down

0 comments on commit 4774c67

Please sign in to comment.