-
Notifications
You must be signed in to change notification settings - Fork 66
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
module github.com/pion/interceptor | ||
|
||
go 1.20 | ||
go 1.21 | ||
|
||
require ( | ||
github.com/pion/logging v0.2.2 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package bwe | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"github.com/pion/rtcp" | ||
) | ||
|
||
type acknowledgment struct { | ||
seqNr int64 | ||
size uint16 | ||
departure time.Time | ||
arrived bool | ||
arrival time.Time | ||
ecn rtcp.ECN | ||
} | ||
|
||
func (a acknowledgment) String() string { | ||
return fmt.Sprintf("seq=%v, departure=%v, arrival=%v", a.seqNr, a.departure, a.arrival) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
package bwe | ||
|
||
import ( | ||
"time" | ||
) | ||
|
||
type arrivalGroup []acknowledgment | ||
|
||
type arrivalGroupAccumulator struct { | ||
next arrivalGroup | ||
burstInterval time.Duration | ||
maxBurstDuration time.Duration | ||
} | ||
|
||
func newArrivalGroupAccumulator() *arrivalGroupAccumulator { | ||
return &arrivalGroupAccumulator{ | ||
next: make([]acknowledgment, 0), | ||
burstInterval: 5 * time.Millisecond, | ||
maxBurstDuration: 100 * time.Millisecond, | ||
} | ||
} | ||
|
||
func (a *arrivalGroupAccumulator) onPacketAcked(ack acknowledgment) arrivalGroup { | ||
if len(a.next) == 0 { | ||
a.next = append(a.next, ack) | ||
return nil | ||
} | ||
|
||
if ack.departure.Sub(a.next[0].departure) < a.burstInterval { | ||
a.next = append(a.next, ack) | ||
return nil | ||
} | ||
|
||
sendTimeDelta := ack.departure.Sub(a.next[0].departure) | ||
arrivalTimeDeltaLast := ack.arrival.Sub(a.next[len(a.next)-1].arrival) | ||
arrivalTimeDeltaFirst := ack.arrival.Sub(a.next[0].arrival) | ||
propagationDelta := arrivalTimeDeltaFirst - sendTimeDelta | ||
|
||
if propagationDelta < 0 && arrivalTimeDeltaLast <= a.burstInterval && arrivalTimeDeltaFirst < a.maxBurstDuration { | ||
a.next = append(a.next, ack) | ||
return nil | ||
} | ||
|
||
group := make(arrivalGroup, len(a.next)) | ||
copy(group, a.next) | ||
a.next = arrivalGroup{ack} | ||
return group | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,239 @@ | ||
package bwe | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestArrivalGroupAccumulator(t *testing.T) { | ||
triggerNewGroupElement := acknowledgment{ | ||
departure: time.Time{}.Add(time.Second), | ||
arrival: time.Time{}.Add(time.Second), | ||
} | ||
cases := []struct { | ||
name string | ||
log []acknowledgment | ||
exp []arrivalGroup | ||
}{ | ||
{ | ||
name: "emptyCreatesNoGroups", | ||
log: []acknowledgment{}, | ||
exp: []arrivalGroup{}, | ||
}, | ||
{ | ||
name: "createsSingleElementGroup", | ||
log: []acknowledgment{ | ||
{ | ||
departure: time.Time{}, | ||
arrival: time.Time{}.Add(time.Millisecond), | ||
}, | ||
triggerNewGroupElement, | ||
}, | ||
exp: []arrivalGroup{{ | ||
{ | ||
departure: time.Time{}, | ||
arrival: time.Time{}.Add(time.Millisecond), | ||
}, | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "createsTwoElementGroup", | ||
log: []acknowledgment{ | ||
{ | ||
arrival: time.Time{}.Add(15 * time.Millisecond), | ||
}, | ||
{ | ||
departure: time.Time{}.Add(3 * time.Millisecond), | ||
arrival: time.Time{}.Add(20 * time.Millisecond), | ||
}, | ||
triggerNewGroupElement, | ||
}, | ||
exp: []arrivalGroup{{ | ||
{ | ||
departure: time.Time{}, | ||
arrival: time.Time{}.Add(15 * time.Millisecond), | ||
}, | ||
{ | ||
departure: time.Time{}.Add(3 * time.Millisecond), | ||
arrival: time.Time{}.Add(20 * time.Millisecond), | ||
}, | ||
}}, | ||
}, | ||
{ | ||
name: "createsTwoArrivalGroups1", | ||
log: []acknowledgment{ | ||
{ | ||
departure: time.Time{}, | ||
arrival: time.Time{}.Add(15 * time.Millisecond), | ||
}, | ||
{ | ||
departure: time.Time{}.Add(3 * time.Millisecond), | ||
arrival: time.Time{}.Add(20 * time.Millisecond), | ||
}, | ||
{ | ||
departure: time.Time{}.Add(9 * time.Millisecond), | ||
arrival: time.Time{}.Add(24 * time.Millisecond), | ||
}, | ||
triggerNewGroupElement, | ||
}, | ||
exp: []arrivalGroup{ | ||
{ | ||
{ | ||
arrival: time.Time{}.Add(15 * time.Millisecond), | ||
}, | ||
{ | ||
departure: time.Time{}.Add(3 * time.Millisecond), | ||
arrival: time.Time{}.Add(20 * time.Millisecond), | ||
}, | ||
}, | ||
{ | ||
{ | ||
departure: time.Time{}.Add(9 * time.Millisecond), | ||
arrival: time.Time{}.Add(24 * time.Millisecond), | ||
}, | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "createsTwoArrivalGroups2", | ||
log: []acknowledgment{ | ||
{ | ||
departure: time.Time{}, | ||
arrival: time.Time{}.Add(15 * time.Millisecond), | ||
}, | ||
{ | ||
departure: time.Time{}.Add(3 * time.Millisecond), | ||
arrival: time.Time{}.Add(20 * time.Millisecond), | ||
}, | ||
{ | ||
departure: time.Time{}.Add(9 * time.Millisecond), | ||
arrival: time.Time{}.Add(30 * time.Millisecond), | ||
}, | ||
triggerNewGroupElement, | ||
}, | ||
exp: []arrivalGroup{ | ||
{ | ||
{ | ||
arrival: time.Time{}.Add(15 * time.Millisecond), | ||
}, | ||
{ | ||
departure: time.Time{}.Add(3 * time.Millisecond), | ||
arrival: time.Time{}.Add(20 * time.Millisecond), | ||
}, | ||
}, | ||
{ | ||
{ | ||
departure: time.Time{}.Add(9 * time.Millisecond), | ||
arrival: time.Time{}.Add(30 * time.Millisecond), | ||
}, | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "ignoresOutOfOrderPackets", | ||
log: []acknowledgment{ | ||
{ | ||
departure: time.Time{}, | ||
arrival: time.Time{}.Add(15 * time.Millisecond), | ||
}, | ||
{ | ||
departure: time.Time{}.Add(6 * time.Millisecond), | ||
arrival: time.Time{}.Add(34 * time.Millisecond), | ||
}, | ||
{ | ||
departure: time.Time{}.Add(8 * time.Millisecond), | ||
arrival: time.Time{}.Add(30 * time.Millisecond), | ||
}, | ||
triggerNewGroupElement, | ||
}, | ||
exp: []arrivalGroup{ | ||
{ | ||
{ | ||
departure: time.Time{}, | ||
arrival: time.Time{}.Add(15 * time.Millisecond), | ||
}, | ||
}, | ||
{ | ||
{ | ||
departure: time.Time{}.Add(6 * time.Millisecond), | ||
arrival: time.Time{}.Add(34 * time.Millisecond), | ||
}, | ||
{ | ||
departure: time.Time{}.Add(8 * time.Millisecond), | ||
arrival: time.Time{}.Add(30 * time.Millisecond), | ||
}, | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "newGroupBecauseOfInterDepartureTime", | ||
log: []acknowledgment{ | ||
{ | ||
seqNr: 0, | ||
departure: time.Time{}, | ||
arrival: time.Time{}.Add(4 * time.Millisecond), | ||
}, | ||
{ | ||
seqNr: 1, | ||
departure: time.Time{}.Add(3 * time.Millisecond), | ||
arrival: time.Time{}.Add(4 * time.Millisecond), | ||
}, | ||
{ | ||
seqNr: 2, | ||
departure: time.Time{}.Add(6 * time.Millisecond), | ||
arrival: time.Time{}.Add(10 * time.Millisecond), | ||
}, | ||
{ | ||
seqNr: 3, | ||
departure: time.Time{}.Add(9 * time.Millisecond), | ||
arrival: time.Time{}.Add(10 * time.Millisecond), | ||
}, | ||
triggerNewGroupElement, | ||
}, | ||
exp: []arrivalGroup{ | ||
{ | ||
{ | ||
seqNr: 0, | ||
departure: time.Time{}, | ||
arrival: time.Time{}.Add(4 * time.Millisecond), | ||
}, | ||
{ | ||
seqNr: 1, | ||
departure: time.Time{}.Add(3 * time.Millisecond), | ||
arrival: time.Time{}.Add(4 * time.Millisecond), | ||
}, | ||
}, | ||
{ | ||
{ | ||
seqNr: 2, | ||
departure: time.Time{}.Add(6 * time.Millisecond), | ||
arrival: time.Time{}.Add(10 * time.Millisecond), | ||
}, | ||
{ | ||
seqNr: 3, | ||
departure: time.Time{}.Add(9 * time.Millisecond), | ||
arrival: time.Time{}.Add(10 * time.Millisecond), | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
for _, tc := range cases { | ||
tc := tc | ||
t.Run(tc.name, func(t *testing.T) { | ||
aga := newArrivalGroupAccumulator() | ||
received := []arrivalGroup{} | ||
for _, ack := range tc.log { | ||
next := aga.onPacketAcked(ack) | ||
if next != nil { | ||
received = append(received, next) | ||
} | ||
} | ||
assert.Equal(t, tc.exp, received) | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package bwe | ||
|
||
import ( | ||
"log" | ||
"time" | ||
) | ||
|
||
type DelayRateController struct { | ||
aga *arrivalGroupAccumulator | ||
last arrivalGroup | ||
kf *kalman | ||
od *overuseDetector | ||
rc *rateController | ||
latest usage | ||
} | ||
|
||
func NewDelayRateController(initialRate int) *DelayRateController { | ||
return &DelayRateController{ | ||
aga: newArrivalGroupAccumulator(), | ||
last: []acknowledgment{}, | ||
kf: newKalman(), | ||
od: newOveruseDetector(true), | ||
rc: newRateController(initialRate), | ||
} | ||
} | ||
|
||
func (c *DelayRateController) OnPacketAcked(ack acknowledgment) { | ||
next := c.aga.onPacketAcked(ack) | ||
if next == nil { | ||
return | ||
} | ||
if len(next) == 0 { | ||
// ignore empty groups, should never occur | ||
return | ||
} | ||
if len(c.last) == 0 { | ||
c.last = next | ||
return | ||
} | ||
interArrivalTime := next[len(next)-1].arrival.Sub(c.last[len(c.last)-1].arrival) | ||
interDepartureTime := next[0].departure.Sub(c.last[0].departure) | ||
interGroupDelay := interArrivalTime - interDepartureTime | ||
estimate := c.kf.updateEstimate(interGroupDelay) | ||
c.latest = c.od.update(ack.arrival, estimate) | ||
Check failure on line 44 in pkg/bwe/delay_rate_controller.go
|
||
c.last = next | ||
log.Printf( | ||
"interArrivalTime=%v, interDepartureTime=%v, interGroupDelay=%v, estimate=%v, threshold=%v", | ||
interArrivalTime.Nanoseconds(), | ||
interDepartureTime.Nanoseconds(), | ||
interGroupDelay.Nanoseconds(), | ||
estimate.Nanoseconds(), | ||
c.od.delayThreshold.Nanoseconds(), | ||
Check failure on line 52 in pkg/bwe/delay_rate_controller.go
|
||
) | ||
} | ||
|
||
func (c *DelayRateController) Update(ts time.Time, lastDeliveryRate int, rtt time.Duration) int { | ||
return c.rc.update(ts, c.latest, lastDeliveryRate, rtt) | ||
} |