Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary formatters in packetdump #304

Merged
merged 2 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/packetdump/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ type RTPFilterCallback func(pkt *rtp.Packet) bool

// RTCPFilterCallback can be used to filter RTCP packets to dump.
// The callback returns whether or not to print dump the packet's content.
// Deprecated: prefer RTCPPerPacketFilterCallback
type RTCPFilterCallback func(pkt []rtcp.Packet) bool

// RTCPPerPacketFilterCallback can be used to filter RTCP packets to dump.
// It's called once per every packet opposing to RTCPFilterCallback which is called once per packet batch
type RTCPPerPacketFilterCallback func(pkt rtcp.Packet) bool
10 changes: 10 additions & 0 deletions pkg/packetdump/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,29 @@ import (
// RTPFormatCallback can be used to apply custom formatting to each dumped RTP
// packet. If new lines should be added after each packet, they must be included
// in the returned format.
// Deprecated: prefer RTPBinaryFormatCallback
type RTPFormatCallback func(*rtp.Packet, interceptor.Attributes) string

// RTCPFormatCallback can be used to apply custom formatting to each dumped RTCP
// packet. If new lines should be added after each packet, they must be included
// in the returned format.
// Deprecated: prefer RTCPBinaryFormatCallback
type RTCPFormatCallback func([]rtcp.Packet, interceptor.Attributes) string

// DefaultRTPFormatter returns the default log format for RTP packets
// Deprecated: useless export since set by default
func DefaultRTPFormatter(pkt *rtp.Packet, _ interceptor.Attributes) string {
return fmt.Sprintf("%s\n", pkt)
}

// DefaultRTCPFormatter returns the default log format for RTCP packets
// Deprecated: useless export since set by default
func DefaultRTCPFormatter(pkts []rtcp.Packet, _ interceptor.Attributes) string {
return fmt.Sprintf("%s\n", pkts)
}

// RTPBinaryFormatCallback can be used to apply custom formatting or marshaling to each dumped RTP packet
type RTPBinaryFormatCallback func(*rtp.Packet, interceptor.Attributes) ([]byte, error)

// RTCPBinaryFormatCallback can be used to apply custom formatting or marshaling to each dumped RTCP packet
type RTCPBinaryFormatCallback func(rtcp.Packet, interceptor.Attributes) ([]byte, error)
27 changes: 27 additions & 0 deletions pkg/packetdump/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func RTCPWriter(w io.Writer) PacketDumperOption {
}

// RTPFormatter sets the RTP format
// Deprecated: prefer RTPBinaryFormatter
func RTPFormatter(f RTPFormatCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtpFormat = f
Expand All @@ -45,13 +46,30 @@ func RTPFormatter(f RTPFormatCallback) PacketDumperOption {
}

// RTCPFormatter sets the RTCP format
// Deprecated: prefer RTCPBinaryFormatter
func RTCPFormatter(f RTCPFormatCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtcpFormat = f
return nil
}
}

// RTPBinaryFormatter sets the RTP binary formatter
func RTPBinaryFormatter(f RTPBinaryFormatCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtpFormatBinary = f
return nil
}
}

// RTCPBinaryFormatter sets the RTCP binary formatter
func RTCPBinaryFormatter(f RTCPBinaryFormatCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtcpFormatBinary = f
return nil
}
}

// RTPFilter sets the RTP filter.
func RTPFilter(callback RTPFilterCallback) PacketDumperOption {
return func(d *PacketDumper) error {
Expand All @@ -61,9 +79,18 @@ func RTPFilter(callback RTPFilterCallback) PacketDumperOption {
}

// RTCPFilter sets the RTCP filter.
// Deprecated: prefer RTCPPerPacketFilter
func RTCPFilter(callback RTCPFilterCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtcpFilter = callback
return nil
}
}

// RTCPPerPacketFilter sets the RTCP per-packet filter.
func RTCPPerPacketFilter(callback RTCPPerPacketFilterCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtcpPerPacketFilter = callback
return nil
}
}
117 changes: 98 additions & 19 deletions pkg/packetdump/packet_dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
"github.com/pion/rtp"
)

// ErrBothBinaryAndDeprecatedFormat is returned when both binary and deprecated format callbacks are set
var ErrBothBinaryAndDeprecatedFormat = fmt.Errorf("both binary and deprecated format callbacks are set")

// PacketDumper dumps packet to a io.Writer
type PacketDumper struct {
log logging.LeveledLogger
Expand All @@ -28,31 +31,44 @@
rtpStream io.Writer
rtcpStream io.Writer

rtpFormatBinary RTPBinaryFormatCallback
rtcpFormatBinary RTCPBinaryFormatCallback

rtpFormat RTPFormatCallback
rtcpFormat RTCPFormatCallback

rtpFilter RTPFilterCallback
rtcpFilter RTCPFilterCallback
rtpFilter RTPFilterCallback
rtcpFilter RTCPFilterCallback
rtcpPerPacketFilter RTCPPerPacketFilterCallback
}

// NewPacketDumper creates a new PacketDumper
func NewPacketDumper(opts ...PacketDumperOption) (*PacketDumper, error) {
d := &PacketDumper{
log: logging.NewDefaultLoggerFactory().NewLogger("packet_dumper"),
wg: sync.WaitGroup{},
close: make(chan struct{}),
rtpChan: make(chan *rtpDump),
rtcpChan: make(chan *rtcpDump),
rtpStream: os.Stdout,
rtcpStream: os.Stdout,
rtpFormat: DefaultRTPFormatter,
rtcpFormat: DefaultRTCPFormatter,
log: logging.NewDefaultLoggerFactory().NewLogger("packet_dumper"),
wg: sync.WaitGroup{},
close: make(chan struct{}),
rtpChan: make(chan *rtpDump),
rtcpChan: make(chan *rtcpDump),
rtpStream: os.Stdout,
rtcpStream: os.Stdout,
rtpFormat: nil,
rtcpFormat: nil,
rtpFormatBinary: nil,
rtcpFormatBinary: nil,
rtpFilter: func(*rtp.Packet) bool {
return true
},
rtcpFilter: func([]rtcp.Packet) bool {
return true
},
rtcpPerPacketFilter: func(rtcp.Packet) bool {
return true
},
}

if d.rtpFormat != nil && d.rtpFormatBinary != nil {
return nil, ErrBothBinaryAndDeprecatedFormat

Check warning on line 71 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L71

Added line #L71 was not covered by tests
}

for _, opt := range opts {
Expand All @@ -61,6 +77,14 @@
}
}

if d.rtpFormat == nil && d.rtpFormatBinary == nil {
d.rtpFormat = DefaultRTPFormatter
}

if d.rtcpFormat == nil && d.rtcpFormatBinary == nil {
d.rtcpFormat = DefaultRTCPFormatter
}

d.wg.Add(1)
go d.loop()

Expand Down Expand Up @@ -117,17 +141,72 @@
case <-d.close:
return
case dump := <-d.rtpChan:
if d.rtpFilter(dump.packet) {
if _, err := fmt.Fprint(d.rtpStream, d.rtpFormat(dump.packet, dump.attributes)); err != nil {
d.log.Errorf("could not dump RTP packet %v", err)
}
err := d.writeDumpedRTP(dump)
if err != nil {
d.log.Errorf("could not dump RTP packet: %v", err)

Check warning on line 146 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L146

Added line #L146 was not covered by tests
}
case dump := <-d.rtcpChan:
if d.rtcpFilter(dump.packets) {
if _, err := fmt.Fprint(d.rtcpStream, d.rtcpFormat(dump.packets, dump.attributes)); err != nil {
d.log.Errorf("could not dump RTCP packet %v", err)
}
err := d.writeDumpedRTCP(dump)
if err != nil {
d.log.Errorf("could not dump RTCP packets: %v", err)
}

Check warning on line 152 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L151-L152

Added lines #L151 - L152 were not covered by tests
}
}
}

func (d *PacketDumper) writeDumpedRTP(dump *rtpDump) error {
if !d.rtpFilter(dump.packet) {
return nil
}

if d.rtpFormatBinary != nil {
dumped, err := d.rtpFormatBinary(dump.packet, dump.attributes)
if err != nil {
return fmt.Errorf("rtp format binary: %w", err)
}

Check warning on line 166 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L165-L166

Added lines #L165 - L166 were not covered by tests
_, err = d.rtpStream.Write(dumped)
if err != nil {
return fmt.Errorf("rtp stream write: %w", err)
}

Check warning on line 170 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L169-L170

Added lines #L169 - L170 were not covered by tests
}

if d.rtpFormat != nil {
if _, err := fmt.Fprint(d.rtpStream, d.rtpFormat(dump.packet, dump.attributes)); err != nil {
return fmt.Errorf("rtp stream Fprint: %w", err)
}

Check warning on line 176 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L175-L176

Added lines #L175 - L176 were not covered by tests
}

return nil
}

func (d *PacketDumper) writeDumpedRTCP(dump *rtcpDump) error {
if !d.rtcpFilter(dump.packets) {
return nil
}

for _, pkt := range dump.packets {
if !d.rtcpPerPacketFilter(pkt) {
continue
}

if d.rtcpFormatBinary != nil {
dumped, err := d.rtcpFormatBinary(pkt, dump.attributes)
if err != nil {
return fmt.Errorf("rtcp format binary: %w", err)
}

Check warning on line 196 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L195-L196

Added lines #L195 - L196 were not covered by tests

_, err = d.rtcpStream.Write(dumped)
if err != nil {
return fmt.Errorf("rtcp stream write: %w", err)

Check warning on line 200 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L200

Added line #L200 was not covered by tests
}
}
}

if d.rtcpFormat != nil {
if _, err := fmt.Fprint(d.rtcpStream, d.rtcpFormat(dump.packets, dump.attributes)); err != nil {
return fmt.Errorf("rtсp stream Fprint: %w", err)
}

Check warning on line 208 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L207-L208

Added lines #L207 - L208 were not covered by tests
}

return nil
}
103 changes: 103 additions & 0 deletions pkg/packetdump/receiver_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,106 @@ func TestReceiverFilterNothing(t *testing.T) {

assert.NotZero(t, buf.Len())
}

func TestReceiverCustomBinaryFormatter(t *testing.T) {
rtpBuf := bytes.Buffer{}
rtcpBuf := bytes.Buffer{}

factory, err := NewReceiverInterceptor(
RTPWriter(&rtpBuf),
RTCPWriter(&rtcpBuf),
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
// custom binary formatter to dump only seqno mod 256
RTPBinaryFormatter(func(p *rtp.Packet, _ interceptor.Attributes) ([]byte, error) {
return []byte{byte(p.SequenceNumber)}, nil
}),
// custom binary formatter to dump only DestinationSSRCs mod 256
RTCPBinaryFormatter(func(p rtcp.Packet, _ interceptor.Attributes) ([]byte, error) {
b := make([]byte, 0)
for _, ssrc := range p.DestinationSSRC() {
b = append(b, byte(ssrc))
}
return b, nil
}),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("")
assert.NoError(t, err)

assert.EqualValues(t, 0, rtpBuf.Len())
assert.EqualValues(t, 0, rtcpBuf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
SenderSSRC: 123,
MediaSSRC: 45,
}})
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
SequenceNumber: uint16(123),
}})

// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)

err = i.Close()
assert.NoError(t, err)

// check that there is custom formatter results in buffer
assert.Equal(t, []byte{123}, rtpBuf.Bytes())
assert.Equal(t, []byte{45}, rtcpBuf.Bytes())
}

func TestReceiverRTCPPerPacketFilter(t *testing.T) {
buf := bytes.Buffer{}

factory, err := NewReceiverInterceptor(
RTCPWriter(&buf),
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
RTCPPerPacketFilter(func(packet rtcp.Packet) bool {
_, isPli := packet.(*rtcp.PictureLossIndication)
return isPli
}),
RTCPBinaryFormatter(func(p rtcp.Packet, _ interceptor.Attributes) ([]byte, error) {
assert.IsType(t, &rtcp.PictureLossIndication{}, p)
return []byte{123}, nil
}),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("")
assert.NoError(t, err)

assert.EqualValues(t, 0, buf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
SenderSSRC: 123,
MediaSSRC: 456,
}})
stream.ReceiveRTCP([]rtcp.Packet{&rtcp.ReceiverReport{
SSRC: 789,
}})
// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)

err = i.Close()
assert.NoError(t, err)

// Only single PictureLossIndication should have been written.
assert.Equal(t, []byte{123}, buf.Bytes())
}
Loading
Loading