Skip to content

Commit

Permalink
WIP: Add interceptor to aggregate CCFB reports
Browse files Browse the repository at this point in the history
  • Loading branch information
mengelbart committed Jan 16, 2025
1 parent 4834813 commit e86ddd8
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 0 deletions.
55 changes: 55 additions & 0 deletions pkg/ccfb/ccfb_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package ccfb

import (
"time"

"github.com/pion/interceptor/internal/ntp"
"github.com/pion/rtcp"
)

type acknowledgement struct {
seqNr uint16
arrived bool
arrival time.Time
ecn rtcp.ECN
}

type acknowledgementList struct {
ts time.Time
acks []acknowledgement
}

func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) map[uint32]acknowledgementList {
result := map[uint32]acknowledgementList{}
referenceTime := ntp.ToTime(uint64(feedback.ReportTimestamp) << 16)
for _, rb := range feedback.ReportBlocks {
result[rb.MediaSSRC] = convertMetricBlock(ts, referenceTime, rb.BeginSequence, rb.MetricBlocks)
}
return result
}

func convertMetricBlock(ts time.Time, referenceTime time.Time, seqNrOffset uint16, blocks []rtcp.CCFeedbackMetricBlock) acknowledgementList {
reports := make([]acknowledgement, len(blocks))
for i, mb := range blocks {
if mb.Received {
delta := time.Duration((float64(mb.ArrivalTimeOffset) / 1024.0) * float64(time.Second))
reports[i] = acknowledgement{
seqNr: seqNrOffset + uint16(i),
arrived: true,
arrival: referenceTime.Add(-delta),
ecn: mb.ECN,
}
} else {
reports[i] = acknowledgement{
seqNr: seqNrOffset + uint16(i),
arrived: false,
arrival: time.Time{},
ecn: 0,
}
}
}
return acknowledgementList{
ts: ts,
acks: reports,
}
}
92 changes: 92 additions & 0 deletions pkg/ccfb/history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package ccfb

import (
"errors"
"log"
"time"

"github.com/pion/interceptor/internal/sequencenumber"
"github.com/pion/rtcp"
)

type PacketReportList struct {
Timestamp time.Time
Reports []PacketReport
}

type PacketReport struct {
SeqNr int64
Size uint16
Departure time.Time
Arrived bool
Arrival time.Time
ECN rtcp.ECN
}

type sentPacket struct {
seqNr int64
size uint16
departure time.Time
}

type history struct {
inflight []sentPacket
sentSeqNr *sequencenumber.Unwrapper
ackedSeqNr *sequencenumber.Unwrapper
}

func newHistory() *history {
return &history{
inflight: []sentPacket{},
sentSeqNr: &sequencenumber.Unwrapper{},
ackedSeqNr: &sequencenumber.Unwrapper{},
}
}

func (h *history) add(seqNr uint16, size uint16, departure time.Time) error {
sn := h.sentSeqNr.Unwrap(seqNr)
if len(h.inflight) > 0 && sn < h.inflight[len(h.inflight)-1].seqNr {
return errors.New("sequence number went backwards")
}
h.inflight = append(h.inflight, sentPacket{
seqNr: sn,
size: size,
departure: departure,
})
return nil
}

func (h *history) getReportForAck(al acknowledgementList) PacketReportList {
reports := []PacketReport{}
log.Printf("highest sent: %v", h.inflight[len(h.inflight)-1].seqNr)
for _, pr := range al.acks {
sn := h.ackedSeqNr.Unwrap(pr.seqNr)
i := h.index(sn)
if i > -1 {
reports = append(reports, PacketReport{
SeqNr: sn,
Size: h.inflight[i].size,
Departure: h.inflight[i].departure,
Arrived: pr.arrived,
Arrival: pr.arrival,
ECN: pr.ecn,
})
} else {
panic("got feedback for unknown packet")
}
log.Printf("processed ack for seq nr %v, arrived: %v", sn, pr.arrived)
}
return PacketReportList{
Timestamp: al.ts,
Reports: reports,
}
}

func (h *history) index(seqNr int64) int {
for i := range h.inflight {
if h.inflight[i].seqNr == seqNr {
return i
}
}
return -1
}
109 changes: 109 additions & 0 deletions pkg/ccfb/interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package ccfb

import (
"sync"
"time"

"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)

type ccfbAttributesKeyType uint32

const CCFBAttributesKey ccfbAttributesKeyType = iota

type Option func(*Interceptor) error

type InterceptorFactory struct {
opts []Option
}

func NewInterceptor(opts ...Option) (*InterceptorFactory, error) {
return &InterceptorFactory{
opts: opts,
}, nil
}

func (f *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
i := &Interceptor{
NoOp: interceptor.NoOp{},
timestamp: time.Now,
ssrcToHistory: make(map[uint32]*history),
}
for _, opt := range f.opts {
if err := opt(i); err != nil {
return nil, err
}
}
return i, nil
}

type Interceptor struct {
interceptor.NoOp
lock sync.Mutex
timestamp func() time.Time
ssrcToHistory map[uint32]*history
}

// BindLocalStream implements interceptor.Interceptor.
func (i *Interceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
i.lock.Lock()
defer i.lock.Unlock()
i.ssrcToHistory[info.SSRC] = newHistory()

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
i.lock.Lock()
defer i.lock.Unlock()
i.ssrcToHistory[header.SSRC].add(header.SequenceNumber, uint16(header.MarshalSize()+len(payload)), i.timestamp())
return writer.Write(header, payload, attributes)
})
}

// BindRTCPReader implements interceptor.Interceptor.
func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
now := i.timestamp()

n, attr, err := reader.Read(b, a)
if err != nil {
return n, attr, err
}
buf := make([]byte, n)
copy(buf, b[:n])

if attr == nil {
attr = make(interceptor.Attributes)
}

pktReportLists := map[uint32]*PacketReportList{}

pkts, err := attr.GetRTCPPackets(buf)
for _, pkt := range pkts {
switch fb := pkt.(type) {
case *rtcp.CCFeedbackReport:
reportLists := convertCCFB(now, fb)
for ssrc, reportList := range reportLists {
prl := i.ssrcToHistory[ssrc].getReportForAck(reportList)
if l, ok := pktReportLists[ssrc]; !ok {
pktReportLists[ssrc] = &prl
} else {
l.Reports = append(l.Reports, prl.Reports...)
}
}
}
}
attr.Set(CCFBAttributesKey, pktReportLists)
return n, attr, err
})
}

// Close implements interceptor.Interceptor.
func (i *Interceptor) Close() error {
panic("unimplemented")
}

// UnbindLocalStream implements interceptor.Interceptor.
func (i *Interceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
panic("unimplemented")
}

0 comments on commit e86ddd8

Please sign in to comment.