From 5244e8b99809fd7afc75c901a07e00c252b4ed94 Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Sun, 19 Jan 2025 17:17:48 +0100 Subject: [PATCH] Add report reception timestamps --- pkg/ccfb/ccfb_receiver.go | 6 +++--- pkg/ccfb/history.go | 7 ++++--- pkg/ccfb/history_test.go | 2 +- pkg/ccfb/interceptor.go | 10 ++++++---- pkg/ccfb/twcc_receiver.go | 7 ++++--- 5 files changed, 18 insertions(+), 14 deletions(-) diff --git a/pkg/ccfb/ccfb_receiver.go b/pkg/ccfb/ccfb_receiver.go index 811c636d..a90a9e9b 100644 --- a/pkg/ccfb/ccfb_receiver.go +++ b/pkg/ccfb/ccfb_receiver.go @@ -19,16 +19,16 @@ type acknowledgementList struct { acks []acknowledgement } -func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) map[uint32]acknowledgementList { +func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) (time.Time, map[uint32]acknowledgementList) { if feedback == nil { - return nil + return time.Time{}, nil } result := map[uint32]acknowledgementList{} referenceTime := ntp.ToTime32(feedback.ReportTimestamp, ts) for _, rb := range feedback.ReportBlocks { result[rb.MediaSSRC] = convertMetricBlock(ts, referenceTime, rb.BeginSequence, rb.MetricBlocks) } - return result + return referenceTime, result } func convertMetricBlock(ts time.Time, reference time.Time, seqNrOffset uint16, blocks []rtcp.CCFeedbackMetricBlock) acknowledgementList { diff --git a/pkg/ccfb/history.go b/pkg/ccfb/history.go index bec0f769..6e8aba3c 100644 --- a/pkg/ccfb/history.go +++ b/pkg/ccfb/history.go @@ -11,7 +11,8 @@ import ( ) type PacketReportList struct { - Timestamp time.Time + Arrival time.Time + Departure time.Time Reports []PacketReport } @@ -109,7 +110,7 @@ func (h *history) getReportForAck(al acknowledgementList) PacketReportList { } return PacketReportList{ - Timestamp: al.ts, - Reports: reports, + Arrival: al.ts, + Reports: reports, } } diff --git a/pkg/ccfb/history_test.go b/pkg/ccfb/history_test.go index 4a56fd11..88ca99e4 100644 --- a/pkg/ccfb/history_test.go +++ b/pkg/ccfb/history_test.go @@ -72,7 +72,7 @@ func TestHistory(t *testing.T) { }, }, expectedReport: PacketReportList{ - Timestamp: time.Time{}.Add(time.Second), + Arrival: time.Time{}.Add(time.Second), Reports: []PacketReport{ {1, 1200, time.Time{}.Add(2 * time.Millisecond), true, time.Time{}.Add(3 * time.Millisecond), 0}, {2, 1200, time.Time{}.Add(3 * time.Millisecond), false, time.Time{}, 0}, diff --git a/pkg/ccfb/interceptor.go b/pkg/ccfb/interceptor.go index e8b4afef..08de7429 100644 --- a/pkg/ccfb/interceptor.go +++ b/pkg/ccfb/interceptor.go @@ -93,12 +93,12 @@ func (i *Interceptor) BindLocalStream(info *interceptor.StreamInfo, writer inter // BindRTCPReader implements interceptor.Interceptor. func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { - now := i.timestamp() - n, attr, err := reader.Read(b, a) if err != nil { return n, attr, err } + now := i.timestamp() + buf := make([]byte, n) copy(buf, b[:n]) @@ -111,14 +111,16 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor. pkts, err := attr.GetRTCPPackets(buf) for _, pkt := range pkts { var reportLists map[uint32]acknowledgementList + var reportDeparture time.Time switch fb := pkt.(type) { case *rtcp.CCFeedbackReport: - reportLists = convertCCFB(now, fb) + reportDeparture, reportLists = convertCCFB(now, fb) case *rtcp.TransportLayerCC: - reportLists = convertTWCC(now, fb) + reportDeparture, reportLists = convertTWCC(now, fb) } for ssrc, reportList := range reportLists { prl := i.ssrcToHistory[ssrc].getReportForAck(reportList) + prl.Departure = reportDeparture if l, ok := pktReportLists[ssrc]; !ok { pktReportLists[ssrc] = &prl } else { diff --git a/pkg/ccfb/twcc_receiver.go b/pkg/ccfb/twcc_receiver.go index d5c4c195..c7783e86 100644 --- a/pkg/ccfb/twcc_receiver.go +++ b/pkg/ccfb/twcc_receiver.go @@ -7,14 +7,15 @@ import ( "github.com/pion/rtcp" ) -func convertTWCC(ts time.Time, feedback *rtcp.TransportLayerCC) map[uint32]acknowledgementList { +func convertTWCC(ts time.Time, feedback *rtcp.TransportLayerCC) (time.Time, map[uint32]acknowledgementList) { log.Printf("got twcc report: %v", feedback) if feedback == nil { - return nil + return time.Time{}, nil } var acks []acknowledgement nextTimestamp := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond) + reportDeparture := nextTimestamp recvDeltaIndex := 0 offset := 0 @@ -85,7 +86,7 @@ func convertTWCC(ts time.Time, feedback *rtcp.TransportLayerCC) map[uint32]ackno } } - return map[uint32]acknowledgementList{ + return reportDeparture, map[uint32]acknowledgementList{ 0: { ts: ts, acks: acks,