Skip to content

Commit

Permalink
[WIP] Debugging FlexFEC implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ypothoma committed May 6, 2024
1 parent 142f17f commit c193e9d
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 13 deletions.
32 changes: 30 additions & 2 deletions pkg/flexfec/encoder_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import (
"github.com/pion/rtp"
)

const BaseRTPHeaderSize = 12

// FecInterceptor implements FlexFec.
type FecInterceptor struct {
interceptor.NoOp
flexFecEncoder FlexEncoder
packetBuffer []rtp.Packet
minNumMediaPackets uint32

sentPacketCount uint32
}

// FecOption can be used to set initial options on Fec encoder interceptors.
Expand Down Expand Up @@ -42,21 +46,45 @@ func (r *FecInterceptorFactory) NewInterceptor(_ string) (interceptor.Intercepto
return interceptor, nil
}

func streamSupportsFec(info *interceptor.StreamInfo) bool {
specialLog("HERE IS THE MIMETYPE = ", info.MimeType)
return info.MimeType == "video/VP8"
}

// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
// will be called once per rtp packet.
func (r *FecInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
if !streamSupportsFec(info) {
return writer
}

// Chromium supports version flexfec-03 of existing draft, this is the one we will configure by default
// although we should support configuring the latest (flexfec-20) as well.
r.flexFecEncoder = NewFlexEncoder03(info.PayloadType, info.SSRC)

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {

specialLog("INTERCEPTOR: Writing to packet buffer...")
specialLog("INTERCEPTOR: header.sequenceNumber = ", header.SequenceNumber)
specialLog("INTERCEPTOR: payload length = ", len(payload))

r.packetBuffer = append(r.packetBuffer, rtp.Packet{
Header: *header,
Payload: payload,
})

// Send the media RTP packet
result, err := writer.Write(header, payload, attributes)
var result int
var err error
if r.sentPacketCount%20 != 0 && r.sentPacketCount > 1000 {
// Send the media RTP packet
result, err = writer.Write(header, payload, attributes)
} else {
specialLog("DROPPING PACKET ID - ", header.SequenceNumber)
//result, err = writer.Write(header, payload, attributes)
}
r.sentPacketCount++

//result, err := writer.Write(header, payload, attributes)

// Send the FEC packets
var fecPackets []rtp.Packet
Expand Down
6 changes: 2 additions & 4 deletions pkg/flexfec/flexfec_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
)

const (
// BaseRTPHeaderSize represents the minium RTP packet header size in bytes.
BaseRTPHeaderSize = 12
// BaseFecHeaderSize represents the minium FEC payload's header size including the
// required first mask.
BaseFecHeaderSize = 12
Expand All @@ -34,7 +32,7 @@ type FlexEncoder20 struct {
}

// NewFlexEncoder returns a new FlexFecEncer.
func NewFlexEncoder(payloadType uint8, ssrc uint32) *FlexEncoder20 {
func NewFlexEncoder20(payloadType uint8, ssrc uint32) *FlexEncoder20 {
return &FlexEncoder20{
payloadType: payloadType,
ssrc: ssrc,
Expand Down Expand Up @@ -180,7 +178,7 @@ func (flex *FlexEncoder20) encodeFlexFecRepairPayload(mediaPackets *util.MediaPa
mediaPacketPayload := mediaPackets.Next().Payload

if len(flexFecPayload) < len(mediaPacketPayload) {
// Expected FEC packet payload is bigger that what we can currently store,
// Expected FEC packet payload is bigger than what we can currently store,
// we need to resize.
flexFecPayloadTmp := make([]byte, len(mediaPacketPayload))
copy(flexFecPayloadTmp, flexFecPayload)
Expand Down
66 changes: 59 additions & 7 deletions pkg/flexfec/flexfec_encoder_03.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package flexfec

import (
"encoding/binary"
"fmt"

"github.com/pion/interceptor/pkg/flexfec/util"
"github.com/pion/rtp"
Expand Down Expand Up @@ -89,6 +90,14 @@ func (flex *FlexEncoder03) encodeFlexFecPacket(fecPacketIndex uint32, mediaBaseS
}

func (flex *FlexEncoder03) encodeFlexFecHeader(mediaPackets *util.MediaPacketIterator, mask1 uint16, optionalMask2 uint32, optionalMask3 uint64, mediaBaseSn uint16) []byte {
specialLog()
specialLog("----------ENCODING A HEADER----------")
specialLog("media packet SSRC = ", mediaPackets.First().SSRC)
specialLog()
specialLog("Base SN for batch of media packets = ", mediaBaseSn, " = %b", mediaBaseSn)
specialLog()
fmt.Printf("mask = %b", mask1)
specialLog()
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
Expand Down Expand Up @@ -131,13 +140,27 @@ func (flex *FlexEncoder03) encodeFlexFecHeader(mediaPackets *util.MediaPacketIte
for mediaPackets.HasNext() {
mediaPacket := mediaPackets.Next()

specialLog("Xoring a media packet's header...")
specialLog(" SN SN SN SN SN sequence number => ", mediaPacket.SequenceNumber)
specialLog(" timestamp => ", mediaPacket.Timestamp, " = %b", mediaPacket.Timestamp)

resized := false

if mediaPacket.MarshalSize() > len(tmpMediaPacketBuf) {
// The temporary buffer is too small, we need to resize.
specialLog(" Resizing tmpMediaPacketBuf... it's too small. Prev size => ", len(tmpMediaPacketBuf), " mediaPacket size => ", mediaPacket.MarshalSize())
tmpMediaPacketBuf = make([]byte, mediaPacket.MarshalSize())
resized = true
}

if resized {
specialLog(" We resized, here's the media packet size now before the marshalTo()... ", mediaPacket.MarshalSize())
specialLog(" mediaPacket info", mediaPacket.String())
}
n, err := mediaPacket.MarshalTo(tmpMediaPacketBuf)

if n == 0 || err != nil {
specialLog(" Something went wrong... ", err)
return nil
}

Expand All @@ -154,6 +177,9 @@ func (flex *FlexEncoder03) encodeFlexFecHeader(mediaPackets *util.MediaPacketIte
flexFecHeader[3] ^= uint8(lengthRecoveryVal)

// XOR the 5th to 8th bytes of the header: the timestamp field
fmt.Printf(" Xoring timestamp field, first byte -> tmpMedia[4] = %08b", tmpMediaPacketBuf[4])
specialLog()

flexFecHeader[4] ^= tmpMediaPacketBuf[4]
flexFecHeader[5] ^= tmpMediaPacketBuf[5]
flexFecHeader[6] ^= tmpMediaPacketBuf[6]
Expand All @@ -179,6 +205,9 @@ func (flex *FlexEncoder03) encodeFlexFecHeader(mediaPackets *util.MediaPacketIte

if optionalMask2 == 0 {
flexFecHeader[18] |= 0b10000000
printFecHeader(flexFecHeader)
specialLog()
specialLog("----------DONE ENCODING A HEADER----------")
return flexFecHeader
}
binary.BigEndian.PutUint32(flexFecHeader[20:24], optionalMask2)
Expand All @@ -189,25 +218,48 @@ func (flex *FlexEncoder03) encodeFlexFecHeader(mediaPackets *util.MediaPacketIte
binary.BigEndian.PutUint64(flexFecHeader[24:32], optionalMask3)
}

printFecHeader(flexFecHeader)
specialLog()
specialLog("----------DONE ENCODING A HEADER----------")
return flexFecHeader
}

func (flex *FlexEncoder03) encodeFlexFecRepairPayload(mediaPackets *util.MediaPacketIterator) []byte {
flexFecPayload := make([]byte, len(mediaPackets.First().Payload))
// FlexFec payload is the result of xoring bytes from the RTP packet after the min RTP header size.
flexFecPayload := make([]byte, mediaPackets.First().MarshalSize()-BaseRTPHeaderSize)

for mediaPackets.HasNext() {
mediaPacketPayload := mediaPackets.Next().Payload
// We marshal the mediaPacket payload data to the predefined tmpRtpPayload. This helps avoid
// unecessary allocations.
data, err := mediaPackets.Next().Marshal()
data = data[BaseRTPHeaderSize:]
if err != nil {
return nil
}

if len(flexFecPayload) < len(mediaPacketPayload) {
// Expected FEC packet payload is bigger that what we can currently store,
if len(flexFecPayload) < len(data) {
// Expected FEC packet payload is bigger than what we can currently store,
// we need to resize.
flexFecPayloadTmp := make([]byte, len(mediaPacketPayload))
flexFecPayloadTmp := make([]byte, len(data))
copy(flexFecPayloadTmp, flexFecPayload)
flexFecPayload = flexFecPayloadTmp
}
for byteIndex := 0; byteIndex < len(mediaPacketPayload); byteIndex++ {
flexFecPayload[byteIndex] ^= mediaPacketPayload[byteIndex]
for byteIndex := 0; byteIndex < len(data); byteIndex++ {
flexFecPayload[byteIndex] ^= data[byteIndex]
}
}
return flexFecPayload
}

func printFecHeader(flexFecHeader []byte) {
specialLog("===========FEC HEADER===========")
// Iterate through each row and print the bytes
for i := 0; i < len(flexFecHeader); i += 4 {
row := flexFecHeader[i : i+4]
for _, b := range row {
fmt.Printf("%08b ", b)
}
specialLog()
}
specialLog("===========FEC HEADER===========")
}
11 changes: 11 additions & 0 deletions pkg/flexfec/special_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package flexfec

import "fmt"

const specialLogEnabled = true

func specialLog(toLog ...interface{}) {
if specialLogEnabled {
fmt.Println(toLog...)
}
}

0 comments on commit c193e9d

Please sign in to comment.