Skip to content

Commit

Permalink
Add report reception timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
mengelbart committed Jan 19, 2025
1 parent a71cb0a commit 7ad4eec
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 51 deletions.
6 changes: 3 additions & 3 deletions pkg/ccfb/ccfb_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ type acknowledgementList struct {
acks []acknowledgement
}

func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) map[uint32]acknowledgementList {
func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) (time.Time, map[uint32]acknowledgementList) {
if feedback == nil {
return nil
return time.Time{}, nil
}
result := map[uint32]acknowledgementList{}
referenceTime := ntp.ToTime32(feedback.ReportTimestamp, ts)
for _, rb := range feedback.ReportBlocks {
result[rb.MediaSSRC] = convertMetricBlock(ts, referenceTime, rb.BeginSequence, rb.MetricBlocks)
}
return result
return referenceTime, result
}

func convertMetricBlock(ts time.Time, reference time.Time, seqNrOffset uint16, blocks []rtcp.CCFeedbackMetricBlock) acknowledgementList {
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccfb/ccfb_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func TestConvertCCFB(t *testing.T) {
ts time.Time
feedback *rtcp.CCFeedbackReport
expect map[uint32]acknowledgementList
expectTS time.Time
}{
{},
{
Expand Down Expand Up @@ -50,11 +51,14 @@ func TestConvertCCFB(t *testing.T) {
},
},
},
expectTS: timeZero.Add(time.Second),
},
}
for i, tc := range cases {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
res := convertCCFB(tc.ts, tc.feedback)
resTS, res := convertCCFB(tc.ts, tc.feedback)

assert.InDelta(t, tc.expectTS.UnixNano(), resTS.UnixNano(), float64(time.Millisecond.Nanoseconds()))

// Can't directly check equality since arrival timestamp conversions
// may be slightly off due to ntp conversions.
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccfb/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
)

type PacketReportList struct {

Check failure on line 13 in pkg/ccfb/history.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: exported type PacketReportList should have comment or be unexported (revive)
Timestamp time.Time
Arrival time.Time
Departure time.Time
Reports []PacketReport
}

Expand Down Expand Up @@ -109,7 +110,7 @@ func (h *history) getReportForAck(al acknowledgementList) PacketReportList {
}

return PacketReportList{
Timestamp: al.ts,
Reports: reports,
Arrival: al.ts,
Reports: reports,
}
}
2 changes: 1 addition & 1 deletion pkg/ccfb/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestHistory(t *testing.T) {
},
},
expectedReport: PacketReportList{
Timestamp: time.Time{}.Add(time.Second),
Arrival: time.Time{}.Add(time.Second),
Reports: []PacketReport{
{1, 1200, time.Time{}.Add(2 * time.Millisecond), true, time.Time{}.Add(3 * time.Millisecond), 0},
{2, 1200, time.Time{}.Add(3 * time.Millisecond), false, time.Time{}, 0},
Expand Down
10 changes: 6 additions & 4 deletions pkg/ccfb/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ func (i *Interceptor) BindLocalStream(info *interceptor.StreamInfo, writer inter
// BindRTCPReader implements interceptor.Interceptor.
func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
now := i.timestamp()

n, attr, err := reader.Read(b, a)
if err != nil {
return n, attr, err
}
now := i.timestamp()

buf := make([]byte, n)
copy(buf, b[:n])

Expand All @@ -111,14 +111,16 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.
pkts, err := attr.GetRTCPPackets(buf)
for _, pkt := range pkts {
var reportLists map[uint32]acknowledgementList
var reportDeparture time.Time
switch fb := pkt.(type) {
case *rtcp.CCFeedbackReport:
reportLists = convertCCFB(now, fb)
reportDeparture, reportLists = convertCCFB(now, fb)
case *rtcp.TransportLayerCC:
reportLists = convertTWCC(now, fb)
reportDeparture, reportLists = convertTWCC(now, fb)

Check warning on line 119 in pkg/ccfb/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/ccfb/interceptor.go#L109-L119

Added lines #L109 - L119 were not covered by tests
}
for ssrc, reportList := range reportLists {
prl := i.ssrcToHistory[ssrc].getReportForAck(reportList)
prl.Departure = reportDeparture
if l, ok := pktReportLists[ssrc]; !ok {
pktReportLists[ssrc] = &prl
} else {
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccfb/twcc_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"github.com/pion/rtcp"
)

func convertTWCC(ts time.Time, feedback *rtcp.TransportLayerCC) map[uint32]acknowledgementList {
func convertTWCC(ts time.Time, feedback *rtcp.TransportLayerCC) (time.Time, map[uint32]acknowledgementList) {
log.Printf("got twcc report: %v", feedback)

Check failure on line 11 in pkg/ccfb/twcc_receiver.go

View workflow job for this annotation

GitHub Actions / lint / Go

use of `log.Printf` forbidden by pattern `^log.(Panic|Fatal|Print)(f|ln)?$` (forbidigo)
if feedback == nil {
return nil
return time.Time{}, nil
}
var acks []acknowledgement

nextTimestamp := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond)
reportDeparture := nextTimestamp
recvDeltaIndex := 0

offset := 0
Expand Down Expand Up @@ -85,7 +86,7 @@ func convertTWCC(ts time.Time, feedback *rtcp.TransportLayerCC) map[uint32]ackno
}
}

return map[uint32]acknowledgementList{
return reportDeparture, map[uint32]acknowledgementList{
0: {
ts: ts,
acks: acks,
Expand Down
64 changes: 28 additions & 36 deletions pkg/ccfb/twcc_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestConvertTWCC(t *testing.T) {
ts time.Time
feedback *rtcp.TransportLayerCC
expect map[uint32]acknowledgementList
expectTS time.Time
}{
{},
{
Expand All @@ -24,26 +25,27 @@ func TestConvertTWCC(t *testing.T) {
MediaSSRC: 2,
BaseSequenceNumber: 178,
PacketStatusCount: 0,
ReferenceTime: 0,
ReferenceTime: 3,
FbPktCount: 0,
PacketChunks: []rtcp.PacketStatusChunk{},
RecvDeltas: []*rtcp.RecvDelta{},
},
expect: map[uint32]acknowledgementList{
2: {
0: {
ts: timeZero.Add(2 * time.Second),
acks: []acknowledgement{},
acks: nil,
},
},
expectTS: time.Time{}.Add(3 * 64 * time.Millisecond),
},
{
ts: timeZero.Add(2 * time.Second),
feedback: &rtcp.TransportLayerCC{
SenderSSRC: 1,
MediaSSRC: 2,
BaseSequenceNumber: 178,
PacketStatusCount: 3,
ReferenceTime: 0,
PacketStatusCount: 18,
ReferenceTime: 3,
FbPktCount: 0,
PacketChunks: []rtcp.PacketStatusChunk{
&rtcp.RunLengthChunk{
Expand Down Expand Up @@ -77,38 +79,38 @@ func TestConvertTWCC(t *testing.T) {
},
},
RecvDeltas: []*rtcp.RecvDelta{
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedLargeDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedLargeDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 1000},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 1000},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 1000},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 1000},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 1000},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 1000},
{Type: rtcp.TypeTCCPacketReceivedLargeDelta, Delta: 1000},
{Type: rtcp.TypeTCCPacketReceivedLargeDelta, Delta: 1000},
},
},
expect: map[uint32]acknowledgementList{
2: {
0: {
ts: timeZero.Add(2 * time.Second),
acks: []acknowledgement{
// first run length chunk
{seqNr: 178, arrived: true, arrival: time.Time{}, ecn: 0},
{seqNr: 179, arrived: true, arrival: time.Time{}, ecn: 0},
{seqNr: 180, arrived: true, arrival: time.Time{}, ecn: 0},
{seqNr: 178, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 1*time.Millisecond), ecn: 0},
{seqNr: 179, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 2*time.Millisecond), ecn: 0},
{seqNr: 180, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 3*time.Millisecond), ecn: 0},

// first status vector chunk
{seqNr: 181, arrived: true, arrival: time.Time{}, ecn: 0},
{seqNr: 182, arrived: true, arrival: time.Time{}, ecn: 0},
{seqNr: 183, arrived: true, arrival: time.Time{}, ecn: 0},
{seqNr: 181, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 4*time.Millisecond), ecn: 0},
{seqNr: 182, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 5*time.Millisecond), ecn: 0},
{seqNr: 183, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 6*time.Millisecond), ecn: 0},
{seqNr: 184, arrived: false, arrival: time.Time{}, ecn: 0},
{seqNr: 185, arrived: false, arrival: time.Time{}, ecn: 0},
{seqNr: 186, arrived: false, arrival: time.Time{}, ecn: 0},
{seqNr: 187, arrived: false, arrival: time.Time{}, ecn: 0},
{seqNr: 188, arrived: false, arrival: time.Time{}, ecn: 0},

// second status vector chunk
{seqNr: 189, arrived: true, arrival: time.Time{}, ecn: 0},
{seqNr: 190, arrived: true, arrival: time.Time{}, ecn: 0},
{seqNr: 189, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 7*time.Millisecond), ecn: 0},
{seqNr: 190, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 8*time.Millisecond), ecn: 0},
{seqNr: 191, arrived: false, arrival: time.Time{}, ecn: 0},
{seqNr: 192, arrived: false, arrival: time.Time{}, ecn: 0},
{seqNr: 193, arrived: false, arrival: time.Time{}, ecn: 0},
Expand All @@ -117,24 +119,14 @@ func TestConvertTWCC(t *testing.T) {
},
},
},
expectTS: time.Time{}.Add(3 * 64 * time.Millisecond),
},
}
for i, tc := range cases {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
res := convertTWCC(tc.ts, tc.feedback)

// Can't directly check equality since arrival timestamp conversions
// may be slightly off due to ntp conversions.
assert.Equal(t, len(tc.expect), len(res))
for i, ee := range tc.expect {
assert.Equal(t, ee.ts, res[i].ts)
for j, ack := range ee.acks {
assert.Equal(t, ack.seqNr, res[i].acks[j].seqNr)
assert.Equal(t, ack.arrived, res[i].acks[j].arrived)
assert.Equal(t, ack.ecn, res[i].acks[j].ecn)
assert.InDelta(t, ack.arrival.UnixNano(), res[i].acks[j].arrival.UnixNano(), float64(time.Millisecond.Nanoseconds()))
}
}
resTS, res := convertTWCC(tc.ts, tc.feedback)
assert.Equal(t, tc.expect, res)
assert.Equal(t, tc.expectTS, resTS)
})
}

Expand Down

0 comments on commit 7ad4eec

Please sign in to comment.