Skip to content

Commit

Permalink
fix: retire loss packet
Browse files Browse the repository at this point in the history
  • Loading branch information
metah3m committed Feb 13, 2025
1 parent 4bc833b commit c6b8a56
Showing 1 changed file with 33 additions and 71 deletions.
104 changes: 33 additions & 71 deletions qcongestion/src/congestion.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
cmp::Ordering,
collections::{HashSet, VecDeque},
collections::VecDeque,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
time::{Duration, Instant},
Expand Down Expand Up @@ -242,6 +242,7 @@ impl CongestionController {
);
}

// A.8. Setting the Loss Detection Timer
fn set_loss_timer(&mut self) {
let (earliest_loss_time, _) = self.get_loss_time_and_space();
if let Some(earliest_loss_time) = earliest_loss_time {
Expand Down Expand Up @@ -497,7 +498,15 @@ impl super::CongestionControl for ArcCC {
notify.notify_waiters();
}
if count % 100 == 0 {
tracing::trace!(send_quote = guard.send_quota(now));
tracing::trace!(
"{} cc loop count {} requires_ack {} {:?} pacing rate {:?} send_quota {}",
guard.handshake.role(),
count,
guard.requires_ack(),
guard.rcvd_records[Epoch::Data].requires_ack(guard.max_ack_delay),
guard.algorithm.pacing_rate(),
guard.send_quota(now),
);
}
count += 1;
}
Expand Down Expand Up @@ -548,10 +557,7 @@ impl super::CongestionControl for ArcCC {
guard.on_ack_rcvd(space, ack_frame, now);
}

fn on_pkt_rcvd(&self, epoch: Epoch, pn: u64, is_ack_eliciting: bool) {
if !is_ack_eliciting {
return;
}
fn on_pkt_rcvd(&self, epoch: Epoch, pn: u64, _is_ack_eliciting: bool) {
let mut guard = self.0.lock().unwrap();
guard.rcvd_records[epoch].on_pkt_rcvd(pn);
let now = Instant::now();
Expand All @@ -566,10 +572,11 @@ impl super::CongestionControl for ArcCC {
/// The [`RcvdRecords`] struct is used to maintain records of received packets for each epoch.
/// It tracks acknowledged packets and determines when an ACK frame should be sent.
/// It also retires packets that have been acknowledged by an ACK frame that has already sent and which has been confirmed by the peer.
#[derive(Debug)]
struct RcvdRecords {
epoch: Epoch,
ack_immedietly: bool,
last_ack_sent: Option<(u64, HashSet<u64>)>,
last_ack_sent: Option<(u64, u64)>,
rcvd_queue: VecDeque<(u64, Instant)>,
}

Expand Down Expand Up @@ -617,17 +624,10 @@ impl RcvdRecords {
return largest_pn;
}

// All ack-eliciting 0-RTT and 1-RTT packets MUST acknowledge within its advertised max_ack_delay
let empty_set = HashSet::new();
let pending_ack = self
.last_ack_sent
.as_ref()
.map(|(_, set)| set)
.unwrap_or(&empty_set);

let largest_ack_sent = self.last_ack_sent.map(|x| x.1).unwrap_or(0);
let now = Instant::now();
for (pn, rec_time) in self.rcvd_queue.iter() {
if now - *rec_time >= max_delay && !pending_ack.contains(pn) {
if now - *rec_time >= max_delay && pn > &largest_ack_sent {
return largest_pn;
}
}
Expand All @@ -637,26 +637,23 @@ impl RcvdRecords {
/// Called when an ACK is sent.
/// Updates the last ACK sent information and resets the `need_ack` flag.
fn on_ack_sent(&mut self, pn: u64, largest_acked: u64) {
let pending_retire = self
.rcvd_queue
.iter()
.filter(|&(pn, _)| *pn <= largest_acked)
.map(|&(pn, _)| pn);
self.last_ack_sent = Some((pn, pending_retire.collect()));
self.last_ack_sent = Some((pn, largest_acked));
self.ack_immedietly = false;
}

/// Processes an acknowledged (ACK) packet.
/// If the ACKed packet number matches the last sent ACK number, retires all acknowledged packets.
fn ack(&mut self, ack: u64, trackers: &[Arc<dyn TrackPackets>; 3]) {
let pending_retire = match self.last_ack_sent {
Some((pn, ref list)) if ack == pn => list.clone(),
let largest_acked = match self.last_ack_sent {
Some((pn, ref largest_acked)) if ack == pn => largest_acked,
_ => return,
};

trackers[self.epoch].retire(&mut pending_retire.iter().cloned());
self.rcvd_queue
.retain(|&(pn, _)| !pending_retire.contains(&pn));
let begin = self.rcvd_queue.front().map(|&(pn, _)| pn).unwrap_or(0);
let mut retire = begin..=*largest_acked;
tracing::trace!("retire to {:?}", retire);
trackers[self.epoch].retire(&mut retire);
self.rcvd_queue.retain(|&(pn, _)| pn > *largest_acked);
}
}

Expand Down Expand Up @@ -983,45 +980,29 @@ mod tests {
vec![0, 1, 3, 5, 7, 9]
);

assert_eq!(
ack_reocrd.last_ack_sent,
Some((
2,
HashSet::from_iter(vec![0, 1, 3, 5, 7].into_iter().map(|x| x as u64))
))
);

// pn 3 ack 0,1,3,5,7,9
ack_reocrd.on_ack_sent(3, 9);
assert_eq!(
ack_reocrd.last_ack_sent,
Some((
3,
HashSet::from_iter(vec![0, 1, 3, 5, 7, 9].into_iter().map(|x| x as u64))
))
);

// recv pn 2 ack, ingore
// recv pn 2 ack, retire 0,1,3,5,7
ack_reocrd.ack(2, &[Arc::new(Mock), Arc::new(Mock), Arc::new(Mock)]);
assert_eq!(
ack_reocrd
.rcvd_queue
.iter()
.map(|&(pn, _)| pn)
.collect::<Vec<_>>(),
vec![0, 1, 3, 5, 7, 9]
vec![9]
);

ack_reocrd.on_ack_sent(3, 9);
ack_reocrd.on_pkt_rcvd(8);
ack_reocrd.on_pkt_rcvd(11);
assert_eq!(
ack_reocrd
.rcvd_queue
.iter()
.map(|&(pn, _)| pn)
.collect::<Vec<_>>(),
vec![0, 1, 3, 5, 7, 9, 11]
vec![8, 9, 11]
);
// recv pn 3 ack, ret
// recv pn 3 ack, retire 8,9

ack_reocrd.ack(3, &[Arc::new(Mock), Arc::new(Mock), Arc::new(Mock)]);
assert_eq!(
Expand Down Expand Up @@ -1065,33 +1046,14 @@ mod tests {
vec![5, 6, 7, 8, 9, 10]
);

// 4 属于迟到的包,可能被对面判定为丢包
ack_reocrd.on_pkt_rcvd(4);

// ack 2
// ack 2,对面可能判定 4 为丢包,我放也应该 retir 4
ack_reocrd.ack(2, &[Arc::new(Mock), Arc::new(Mock), Arc::new(Mock)]);
assert_eq!(ack_reocrd.requires_ack(max_ack_delay).unwrap().0, 4);
assert_eq!(
ack_reocrd
.rcvd_queue
.iter()
.map(|&(pn, _)| pn)
.collect::<Vec<_>>(),
vec![4]
);
ack_reocrd.on_ack_sent(3, 4);
assert_eq!(ack_reocrd.requires_ack(max_ack_delay), None);

// ack 3
ack_reocrd.ack(3, &[Arc::new(Mock), Arc::new(Mock), Arc::new(Mock)]);
assert_eq!(ack_reocrd.requires_ack(max_ack_delay), None);
assert_eq!(
ack_reocrd
.rcvd_queue
.iter()
.map(|&(pn, _)| pn)
.collect::<Vec<_>>(),
vec![]
);
assert!(ack_reocrd.rcvd_queue.is_empty());
}
struct Mock;
impl TrackPackets for Mock {
Expand Down

0 comments on commit c6b8a56

Please sign in to comment.