Skip to content

Commit

Permalink
Improve error handling
Browse files Browse the repository at this point in the history
Propagate errors in the handlers. The errors are now caugh in the
`disconnect` function but we should notify the user sooner.

This is an improvement though, as previously the user hasn't been
notified at all.
  • Loading branch information
lukipuki committed Jan 21, 2024
1 parent 97f246e commit 1d53d77
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
29 changes: 18 additions & 11 deletions src/connections/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn spawn_read_handler<R>(
cancellation_token: CancellationToken,
read_stream: R,
read_output_tx: UnboundedSender<IncomingStreamData>,
) -> JoinHandle<()>
) -> JoinHandle<Result<(), Error>>
where
R: AsyncReadExt + Send + Unpin + 'static,
{
Expand All @@ -27,9 +27,11 @@ where
tokio::select! {
_ = cancellation_token.cancelled() => {
debug!("Read handler cancelled");
Ok(())
}
e = handle => {
error!("Read handler unexpectedly terminated: {:#?}", e);
e
}
}
})
Expand Down Expand Up @@ -82,7 +84,7 @@ pub fn spawn_write_handler<W>(
cancellation_token: CancellationToken,
write_stream: W,
write_input_rx: tokio::sync::mpsc::UnboundedReceiver<EncodedToRadioPacketWithHeader>,
) -> JoinHandle<()>
) -> JoinHandle<Result<(), Error>>
where
W: AsyncWriteExt + Send + Unpin + 'static,
{
Expand All @@ -91,10 +93,14 @@ where
spawn(async move {
tokio::select! {
_ = cancellation_token.cancelled() => {
debug!("Write handler cancelled");
debug!("Write handler cancelled");
Ok(())
}
_ = handle => {
error!("Write handler unexpectedly terminated");
write_result = handle => {
if let Err(e) = &write_result {
error!("Write handler unexpectedly terminated {e:?}");
}
write_result
}
}
})
Expand Down Expand Up @@ -132,16 +138,19 @@ pub fn spawn_processing_handler(
cancellation_token: CancellationToken,
read_output_rx: UnboundedReceiver<IncomingStreamData>,
decoded_packet_tx: UnboundedSender<protobufs::FromRadio>,
) -> JoinHandle<()> {
) -> JoinHandle<Result<(), Error>> {
let handle = start_processing_handler(read_output_rx, decoded_packet_tx);

spawn(async move {
tokio::select! {
_ = cancellation_token.cancelled() => {
debug!("Message processing handler cancelled");
debug!("Message processing handler cancelled");
Ok(())
}
_ = handle => {
error!("Message processing handler unexpectedly terminated");
error!("Message processing handler unexpectedly terminated");
Err(Error::InvalidaDataSize{data_length: 12})
// processing_result
}
}
})
Expand All @@ -150,7 +159,7 @@ pub fn spawn_processing_handler(
async fn start_processing_handler(
mut read_output_rx: tokio::sync::mpsc::UnboundedReceiver<IncomingStreamData>,
decoded_packet_tx: UnboundedSender<protobufs::FromRadio>,
) -> Result<(), Error> {
) {
trace!("Started message processing handler");

let mut buffer = StreamBuffer::new(decoded_packet_tx);
Expand All @@ -161,6 +170,4 @@ async fn start_processing_handler(
}

trace!("Processing read_output_rx channel closed");

Ok(())
}
19 changes: 12 additions & 7 deletions src/connections/stream_api.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures_util::future::join3;
use log::trace;
use prost::Message;
use std::{fmt::Display, marker::PhantomData};
Expand Down Expand Up @@ -70,9 +71,9 @@ pub struct StreamApi;
pub struct ConnectedStreamApi<State = state::Configured> {
write_input_tx: UnboundedSender<EncodedToRadioPacketWithHeader>,

read_handle: JoinHandle<()>,
write_handle: JoinHandle<()>,
processing_handle: JoinHandle<()>,
read_handle: JoinHandle<Result<(), Error>>,
write_handle: JoinHandle<Result<(), Error>>,
processing_handle: JoinHandle<Result<(), Error>>,

cancellation_token: CancellationToken,

Expand Down Expand Up @@ -586,11 +587,15 @@ impl ConnectedStreamApi<state::Configured> {

// Close worker threads

self.read_handle.await?;
self.write_handle.await?;
self.processing_handle.await?;
let (read_result, write_result, processing_result) =
join3(self.read_handle, self.write_handle, self.processing_handle).await;

trace!("TCP handlers fully disconnected");
// Note: we only return the first error.
read_result??;
write_result??;
processing_result??;

trace!("Handlers fully disconnected");

Ok(StreamApi)
}
Expand Down

0 comments on commit 1d53d77

Please sign in to comment.