diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 0dbded9209..c7d401b0d1 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -14,7 +14,7 @@ use std::{ ops::Add, sync::{ - atomic::{AtomicBool, AtomicU32, Ordering}, + atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering}, Arc, Mutex, MutexGuard, }, time::{Duration, Instant}, @@ -688,28 +688,62 @@ impl TransmissionPipeline { }); } - let active = Arc::new(AtomicBool::new(true)); + let active = Arc::new(TransmissionPipelineStatus { + disabled: AtomicBool::new(false), + congested: AtomicU8::new(0), + }); let producer = TransmissionPipelineProducer { stage_in: stage_in.into_boxed_slice().into(), - active: active.clone(), + status: active.clone(), wait_before_drop: config.wait_before_drop, wait_before_close: config.wait_before_close, }; let consumer = TransmissionPipelineConsumer { stage_out: stage_out.into_boxed_slice(), n_out_r, - active, + status: active, }; (producer, consumer) } } +struct TransmissionPipelineStatus { + // The whole pipeline is enabled or disabled + disabled: AtomicBool, + // Bitflags to indicate the given priority queue is congested + congested: AtomicU8, +} + +impl TransmissionPipelineStatus { + fn set_disabled(&self, status: bool) { + self.disabled.store(status, Ordering::Relaxed); + } + + fn is_disabled(&self) -> bool { + self.disabled.load(Ordering::Relaxed) + } + + fn set_congested(&self, priority: Priority, status: bool) { + let prioflag = 1 << priority as u8; + if status { + self.congested.fetch_or(prioflag, Ordering::Relaxed); + } else { + self.congested.fetch_and(!prioflag, Ordering::Relaxed); + } + } + + fn is_congested(&self, priority: Priority) -> bool { + let prioflag = 1 << priority as u8; + self.congested.load(Ordering::Relaxed) & prioflag != 0 + } +} + #[derive(Clone)] pub(crate) struct TransmissionPipelineProducer { // Each priority queue has its own Mutex stage_in: Arc<[Mutex]>, - active: Arc, + status: Arc, wait_before_drop: (Duration, Duration), wait_before_close: Duration, } @@ -724,8 +758,13 @@ impl TransmissionPipelineProducer { } else { (0, Priority::DEFAULT) }; + // If message is droppable, compute a deadline after which the sample could be dropped let (wait_time, max_wait_time) = if msg.is_droppable() { + // Checked if we are blocked on the priority queue and we drop directly the message + if self.status.is_congested(priority) { + return false; + } (self.wait_before_drop.0, Some(self.wait_before_drop.1)) } else { (self.wait_before_close, None) @@ -733,7 +772,11 @@ impl TransmissionPipelineProducer { let mut deadline = Deadline::new(wait_time, max_wait_time); // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); - queue.push_network_message(&mut msg, priority, &mut deadline) + let sent = queue.push_network_message(&mut msg, priority, &mut deadline); + if !sent { + self.status.set_congested(priority, true); + } + sent } #[inline] @@ -750,7 +793,7 @@ impl TransmissionPipelineProducer { } pub(crate) fn disable(&self) { - self.active.store(false, Ordering::Relaxed); + self.status.set_disabled(true); // Acquire all the locks, in_guard first, out_guard later // Use the same locking order as in drain to avoid deadlocks @@ -768,17 +811,18 @@ pub(crate) struct TransmissionPipelineConsumer { // A single Mutex for all the priority queues stage_out: Box<[StageOut]>, n_out_r: Waiter, - active: Arc, + status: Arc, } impl TransmissionPipelineConsumer { - pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> { - while self.active.load(Ordering::Relaxed) { + pub(crate) async fn pull(&mut self) -> Option<(WBatch, Priority)> { + while !self.status.is_disabled() { let mut backoff = MicroSeconds::MAX; // Calculate the backoff maximum for (prio, queue) in self.stage_out.iter_mut().enumerate() { match queue.try_pull() { Pull::Some(batch) => { + let prio = Priority::try_from(prio as u8).unwrap(); return Some((batch, prio)); } Pull::Backoff(deadline) => { @@ -818,8 +862,9 @@ impl TransmissionPipelineConsumer { None } - pub(crate) fn refill(&mut self, batch: WBatch, priority: usize) { - self.stage_out[priority].refill(batch); + pub(crate) fn refill(&mut self, batch: WBatch, priority: Priority) { + self.stage_out[priority as usize].refill(batch); + self.status.set_congested(priority, false); } pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> { diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index c4c23290ee..61c6f36ece 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -431,10 +431,10 @@ async fn tx_task( link.send_batch(&mut batch).await?; // Keep track of next SNs if let Some(sn) = batch.codec.latest_sn.reliable { - last_sns[priority].reliable = sn; + last_sns[priority as usize].reliable = sn; } if let Some(sn) = batch.codec.latest_sn.best_effort { - last_sns[priority].best_effort = sn; + last_sns[priority as usize].best_effort = sn; } #[cfg(feature = "stats")] {