Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve congestion control #1627

Merged
merged 7 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::{
ops::Add,
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
Arc, Mutex, MutexGuard,
},
time::{Duration, Instant},
Expand Down Expand Up @@ -688,28 +688,62 @@ impl TransmissionPipeline {
});
}

let active = Arc::new(AtomicBool::new(true));
let active = Arc::new(TransmissionPipelineStatus {
disabled: AtomicBool::new(false),
congested: AtomicU8::new(0),
});
let producer = TransmissionPipelineProducer {
stage_in: stage_in.into_boxed_slice().into(),
active: active.clone(),
status: active.clone(),
wait_before_drop: config.wait_before_drop,
wait_before_close: config.wait_before_close,
};
let consumer = TransmissionPipelineConsumer {
stage_out: stage_out.into_boxed_slice(),
n_out_r,
active,
status: active,
};

(producer, consumer)
}
}

struct TransmissionPipelineStatus {
// The whole pipeline is enabled or disabled
disabled: AtomicBool,
// Bitflags to indicate the given priority queue is congested
congested: AtomicU8,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussional: in order to reduce contention around this atomic, I'd recommend switching to a set of separate AtomicBool and make 64-byte padding (maybe indirect by putting them into StageIn etc). The cost of operation for multi-(thread,core,priority) setup should be smaller.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that but increasing the number of atomics also increases the amount of data to be kept in cache. Having one atomic per priority will significantly put more burden on moving data across different cache level I assume, thus reducing performance in any case. I don't have an experimental comparison but I'm not expecting a lot of contention in general.

}

impl TransmissionPipelineStatus {
fn set_disabled(&self, status: bool) {
self.disabled.store(status, Ordering::Relaxed);
}

fn is_disabled(&self) -> bool {
self.disabled.load(Ordering::Relaxed)
}

fn set_congested(&self, priority: Priority, status: bool) {
let prioflag = 1 << priority as u8;
if status {
self.congested.fetch_or(prioflag, Ordering::Relaxed);
} else {
self.congested.fetch_and(!prioflag, Ordering::Relaxed);
}
}

fn is_congested(&self, priority: Priority) -> bool {
let prioflag = 1 << priority as u8;
self.congested.load(Ordering::Relaxed) & prioflag != 0
}
}

#[derive(Clone)]
pub(crate) struct TransmissionPipelineProducer {
// Each priority queue has its own Mutex
stage_in: Arc<[Mutex<StageIn>]>,
active: Arc<AtomicBool>,
status: Arc<TransmissionPipelineStatus>,
wait_before_drop: (Duration, Duration),
wait_before_close: Duration,
}
Expand All @@ -724,16 +758,25 @@ impl TransmissionPipelineProducer {
} else {
(0, Priority::DEFAULT)
};

// If message is droppable, compute a deadline after which the sample could be dropped
let (wait_time, max_wait_time) = if msg.is_droppable() {
// Checked if we are blocked on the priority queue and we drop directly the message
if self.status.is_congested(priority) {
return false;
}
(self.wait_before_drop.0, Some(self.wait_before_drop.1))
} else {
(self.wait_before_close, None)
};
let mut deadline = Deadline::new(wait_time, max_wait_time);
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
queue.push_network_message(&mut msg, priority, &mut deadline)
let sent = queue.push_network_message(&mut msg, priority, &mut deadline);
if !sent {
self.status.set_congested(priority, true);
}
sent
}

#[inline]
Expand All @@ -750,7 +793,7 @@ impl TransmissionPipelineProducer {
}

pub(crate) fn disable(&self) {
self.active.store(false, Ordering::Relaxed);
self.status.set_disabled(true);

// Acquire all the locks, in_guard first, out_guard later
// Use the same locking order as in drain to avoid deadlocks
Expand All @@ -768,17 +811,18 @@ pub(crate) struct TransmissionPipelineConsumer {
// A single Mutex for all the priority queues
stage_out: Box<[StageOut]>,
n_out_r: Waiter,
active: Arc<AtomicBool>,
status: Arc<TransmissionPipelineStatus>,
}

impl TransmissionPipelineConsumer {
pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> {
while self.active.load(Ordering::Relaxed) {
pub(crate) async fn pull(&mut self) -> Option<(WBatch, Priority)> {
while !self.status.is_disabled() {
let mut backoff = MicroSeconds::MAX;
// Calculate the backoff maximum
for (prio, queue) in self.stage_out.iter_mut().enumerate() {
match queue.try_pull() {
Pull::Some(batch) => {
let prio = Priority::try_from(prio as u8).unwrap();
return Some((batch, prio));
}
Pull::Backoff(deadline) => {
Expand Down Expand Up @@ -818,8 +862,9 @@ impl TransmissionPipelineConsumer {
None
}

pub(crate) fn refill(&mut self, batch: WBatch, priority: usize) {
self.stage_out[priority].refill(batch);
pub(crate) fn refill(&mut self, batch: WBatch, priority: Priority) {
self.stage_out[priority as usize].refill(batch);
self.status.set_congested(priority, false);
}

pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> {
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,10 @@ async fn tx_task(
link.send_batch(&mut batch).await?;
// Keep track of next SNs
if let Some(sn) = batch.codec.latest_sn.reliable {
last_sns[priority].reliable = sn;
last_sns[priority as usize].reliable = sn;
}
if let Some(sn) = batch.codec.latest_sn.best_effort {
last_sns[priority].best_effort = sn;
last_sns[priority as usize].best_effort = sn;
}
#[cfg(feature = "stats")]
{
Expand Down
Loading