Skip to content

Commit

Permalink
Binary formatters in packetdump (#304)
Browse files Browse the repository at this point in the history
* Support for binary formatters in packetdump

* Tests, validation, format
  • Loading branch information
aalekseevx authored Jan 20, 2025
1 parent 6022ad6 commit 3436288
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 19 deletions.
5 changes: 5 additions & 0 deletions pkg/packetdump/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ type RTPFilterCallback func(pkt *rtp.Packet) bool

// RTCPFilterCallback can be used to filter RTCP packets to dump.
// The callback returns whether or not to print dump the packet's content.
// Deprecated: prefer RTCPPerPacketFilterCallback
type RTCPFilterCallback func(pkt []rtcp.Packet) bool

// RTCPPerPacketFilterCallback can be used to filter RTCP packets to dump.
// It's called once per every packet opposing to RTCPFilterCallback which is called once per packet batch
type RTCPPerPacketFilterCallback func(pkt rtcp.Packet) bool
10 changes: 10 additions & 0 deletions pkg/packetdump/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,29 @@ import (
// RTPFormatCallback can be used to apply custom formatting to each dumped RTP
// packet. If new lines should be added after each packet, they must be included
// in the returned format.
// Deprecated: prefer RTPBinaryFormatCallback
type RTPFormatCallback func(*rtp.Packet, interceptor.Attributes) string

// RTCPFormatCallback can be used to apply custom formatting to each dumped RTCP
// packet. If new lines should be added after each packet, they must be included
// in the returned format.
// Deprecated: prefer RTCPBinaryFormatCallback
type RTCPFormatCallback func([]rtcp.Packet, interceptor.Attributes) string

// DefaultRTPFormatter returns the default log format for RTP packets
// Deprecated: useless export since set by default
func DefaultRTPFormatter(pkt *rtp.Packet, _ interceptor.Attributes) string {
return fmt.Sprintf("%s\n", pkt)
}

// DefaultRTCPFormatter returns the default log format for RTCP packets
// Deprecated: useless export since set by default
func DefaultRTCPFormatter(pkts []rtcp.Packet, _ interceptor.Attributes) string {
return fmt.Sprintf("%s\n", pkts)
}

// RTPBinaryFormatCallback can be used to apply custom formatting or marshaling to each dumped RTP packet
type RTPBinaryFormatCallback func(*rtp.Packet, interceptor.Attributes) ([]byte, error)

// RTCPBinaryFormatCallback can be used to apply custom formatting or marshaling to each dumped RTCP packet
type RTCPBinaryFormatCallback func(rtcp.Packet, interceptor.Attributes) ([]byte, error)
27 changes: 27 additions & 0 deletions pkg/packetdump/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func RTCPWriter(w io.Writer) PacketDumperOption {
}

// RTPFormatter sets the RTP format
// Deprecated: prefer RTPBinaryFormatter
func RTPFormatter(f RTPFormatCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtpFormat = f
Expand All @@ -45,13 +46,30 @@ func RTPFormatter(f RTPFormatCallback) PacketDumperOption {
}

// RTCPFormatter sets the RTCP format
// Deprecated: prefer RTCPBinaryFormatter
func RTCPFormatter(f RTCPFormatCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtcpFormat = f
return nil
}
}

// RTPBinaryFormatter sets the RTP binary formatter
func RTPBinaryFormatter(f RTPBinaryFormatCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtpFormatBinary = f
return nil
}
}

// RTCPBinaryFormatter sets the RTCP binary formatter
func RTCPBinaryFormatter(f RTCPBinaryFormatCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtcpFormatBinary = f
return nil
}
}

// RTPFilter sets the RTP filter.
func RTPFilter(callback RTPFilterCallback) PacketDumperOption {
return func(d *PacketDumper) error {
Expand All @@ -61,9 +79,18 @@ func RTPFilter(callback RTPFilterCallback) PacketDumperOption {
}

// RTCPFilter sets the RTCP filter.
// Deprecated: prefer RTCPPerPacketFilter
func RTCPFilter(callback RTCPFilterCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtcpFilter = callback
return nil
}
}

// RTCPPerPacketFilter sets the RTCP per-packet filter.
func RTCPPerPacketFilter(callback RTCPPerPacketFilterCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtcpPerPacketFilter = callback
return nil
}
}
117 changes: 98 additions & 19 deletions pkg/packetdump/packet_dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/pion/rtp"
)

// ErrBothBinaryAndDeprecatedFormat is returned when both binary and deprecated format callbacks are set
var ErrBothBinaryAndDeprecatedFormat = fmt.Errorf("both binary and deprecated format callbacks are set")

// PacketDumper dumps packet to a io.Writer
type PacketDumper struct {
log logging.LeveledLogger
Expand All @@ -28,31 +31,44 @@ type PacketDumper struct {
rtpStream io.Writer
rtcpStream io.Writer

rtpFormatBinary RTPBinaryFormatCallback
rtcpFormatBinary RTCPBinaryFormatCallback

rtpFormat RTPFormatCallback
rtcpFormat RTCPFormatCallback

rtpFilter RTPFilterCallback
rtcpFilter RTCPFilterCallback
rtpFilter RTPFilterCallback
rtcpFilter RTCPFilterCallback
rtcpPerPacketFilter RTCPPerPacketFilterCallback
}

// NewPacketDumper creates a new PacketDumper
func NewPacketDumper(opts ...PacketDumperOption) (*PacketDumper, error) {
d := &PacketDumper{
log: logging.NewDefaultLoggerFactory().NewLogger("packet_dumper"),
wg: sync.WaitGroup{},
close: make(chan struct{}),
rtpChan: make(chan *rtpDump),
rtcpChan: make(chan *rtcpDump),
rtpStream: os.Stdout,
rtcpStream: os.Stdout,
rtpFormat: DefaultRTPFormatter,
rtcpFormat: DefaultRTCPFormatter,
log: logging.NewDefaultLoggerFactory().NewLogger("packet_dumper"),
wg: sync.WaitGroup{},
close: make(chan struct{}),
rtpChan: make(chan *rtpDump),
rtcpChan: make(chan *rtcpDump),
rtpStream: os.Stdout,
rtcpStream: os.Stdout,
rtpFormat: nil,
rtcpFormat: nil,
rtpFormatBinary: nil,
rtcpFormatBinary: nil,
rtpFilter: func(*rtp.Packet) bool {
return true
},
rtcpFilter: func([]rtcp.Packet) bool {
return true
},
rtcpPerPacketFilter: func(rtcp.Packet) bool {
return true
},
}

if d.rtpFormat != nil && d.rtpFormatBinary != nil {
return nil, ErrBothBinaryAndDeprecatedFormat
}

for _, opt := range opts {
Expand All @@ -61,6 +77,14 @@ func NewPacketDumper(opts ...PacketDumperOption) (*PacketDumper, error) {
}
}

if d.rtpFormat == nil && d.rtpFormatBinary == nil {
d.rtpFormat = DefaultRTPFormatter
}

if d.rtcpFormat == nil && d.rtcpFormatBinary == nil {
d.rtcpFormat = DefaultRTCPFormatter
}

d.wg.Add(1)
go d.loop()

Expand Down Expand Up @@ -117,17 +141,72 @@ func (d *PacketDumper) loop() {
case <-d.close:
return
case dump := <-d.rtpChan:
if d.rtpFilter(dump.packet) {
if _, err := fmt.Fprint(d.rtpStream, d.rtpFormat(dump.packet, dump.attributes)); err != nil {
d.log.Errorf("could not dump RTP packet %v", err)
}
err := d.writeDumpedRTP(dump)
if err != nil {
d.log.Errorf("could not dump RTP packet: %v", err)
}
case dump := <-d.rtcpChan:
if d.rtcpFilter(dump.packets) {
if _, err := fmt.Fprint(d.rtcpStream, d.rtcpFormat(dump.packets, dump.attributes)); err != nil {
d.log.Errorf("could not dump RTCP packet %v", err)
}
err := d.writeDumpedRTCP(dump)
if err != nil {
d.log.Errorf("could not dump RTCP packets: %v", err)
}
}
}
}

func (d *PacketDumper) writeDumpedRTP(dump *rtpDump) error {
if !d.rtpFilter(dump.packet) {
return nil
}

if d.rtpFormatBinary != nil {
dumped, err := d.rtpFormatBinary(dump.packet, dump.attributes)
if err != nil {
return fmt.Errorf("rtp format binary: %w", err)
}
_, err = d.rtpStream.Write(dumped)
if err != nil {
return fmt.Errorf("rtp stream write: %w", err)
}
}

if d.rtpFormat != nil {
if _, err := fmt.Fprint(d.rtpStream, d.rtpFormat(dump.packet, dump.attributes)); err != nil {
return fmt.Errorf("rtp stream Fprint: %w", err)
}
}

return nil
}

func (d *PacketDumper) writeDumpedRTCP(dump *rtcpDump) error {
if !d.rtcpFilter(dump.packets) {
return nil
}

for _, pkt := range dump.packets {
if !d.rtcpPerPacketFilter(pkt) {
continue
}

if d.rtcpFormatBinary != nil {
dumped, err := d.rtcpFormatBinary(pkt, dump.attributes)
if err != nil {
return fmt.Errorf("rtcp format binary: %w", err)
}

_, err = d.rtcpStream.Write(dumped)
if err != nil {
return fmt.Errorf("rtcp stream write: %w", err)
}
}
}

if d.rtcpFormat != nil {
if _, err := fmt.Fprint(d.rtcpStream, d.rtcpFormat(dump.packets, dump.attributes)); err != nil {
return fmt.Errorf("rtсp stream Fprint: %w", err)
}
}

return nil
}
103 changes: 103 additions & 0 deletions pkg/packetdump/receiver_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,106 @@ func TestReceiverFilterNothing(t *testing.T) {

assert.NotZero(t, buf.Len())
}

func TestReceiverCustomBinaryFormatter(t *testing.T) {
rtpBuf := bytes.Buffer{}
rtcpBuf := bytes.Buffer{}

factory, err := NewReceiverInterceptor(
RTPWriter(&rtpBuf),
RTCPWriter(&rtcpBuf),
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
// custom binary formatter to dump only seqno mod 256
RTPBinaryFormatter(func(p *rtp.Packet, _ interceptor.Attributes) ([]byte, error) {
return []byte{byte(p.SequenceNumber)}, nil
}),
// custom binary formatter to dump only DestinationSSRCs mod 256
RTCPBinaryFormatter(func(p rtcp.Packet, _ interceptor.Attributes) ([]byte, error) {
b := make([]byte, 0)
for _, ssrc := range p.DestinationSSRC() {
b = append(b, byte(ssrc))
}
return b, nil
}),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("")
assert.NoError(t, err)

assert.EqualValues(t, 0, rtpBuf.Len())
assert.EqualValues(t, 0, rtcpBuf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
SenderSSRC: 123,
MediaSSRC: 45,
}})
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
SequenceNumber: uint16(123),
}})

// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)

err = i.Close()
assert.NoError(t, err)

// check that there is custom formatter results in buffer
assert.Equal(t, []byte{123}, rtpBuf.Bytes())
assert.Equal(t, []byte{45}, rtcpBuf.Bytes())
}

func TestReceiverRTCPPerPacketFilter(t *testing.T) {
buf := bytes.Buffer{}

factory, err := NewReceiverInterceptor(
RTCPWriter(&buf),
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
RTCPPerPacketFilter(func(packet rtcp.Packet) bool {
_, isPli := packet.(*rtcp.PictureLossIndication)
return isPli
}),
RTCPBinaryFormatter(func(p rtcp.Packet, _ interceptor.Attributes) ([]byte, error) {
assert.IsType(t, &rtcp.PictureLossIndication{}, p)
return []byte{123}, nil
}),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("")
assert.NoError(t, err)

assert.EqualValues(t, 0, buf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
SenderSSRC: 123,
MediaSSRC: 456,
}})
stream.ReceiveRTCP([]rtcp.Packet{&rtcp.ReceiverReport{
SSRC: 789,
}})
// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)

err = i.Close()
assert.NoError(t, err)

// Only single PictureLossIndication should have been written.
assert.Equal(t, []byte{123}, buf.Bytes())
}
Loading

0 comments on commit 3436288

Please sign in to comment.