Skip to content

Commit

Permalink
Support for binary formatters in packetdump
Browse files Browse the repository at this point in the history
  • Loading branch information
aalekseevx committed Jan 17, 2025
1 parent e187410 commit 5a55af3
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 13 deletions.
5 changes: 5 additions & 0 deletions pkg/packetdump/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ type RTPFilterCallback func(pkt *rtp.Packet) bool

// RTCPFilterCallback can be used to filter RTCP packets to dump.
// The callback returns whether or not to print dump the packet's content.
// Deprecated: prefer RTCPFilterPerPacketCallback
type RTCPFilterCallback func(pkt []rtcp.Packet) bool

// RTCPFilterPerPacketCallback can be used to filter RTCP packets to dump.
// It's called once per every packet opposing to RTCPFilterCallback which is called once per packet batch
type RTCPFilterPerPacketCallback func(pkt rtcp.Packet) bool
10 changes: 10 additions & 0 deletions pkg/packetdump/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,29 @@ import (
// RTPFormatCallback can be used to apply custom formatting to each dumped RTP
// packet. If new lines should be added after each packet, they must be included
// in the returned format.
// Deprecated: prefer RTPBinaryFormatCallback
type RTPFormatCallback func(*rtp.Packet, interceptor.Attributes) string

// RTCPFormatCallback can be used to apply custom formatting to each dumped RTCP
// packet. If new lines should be added after each packet, they must be included
// in the returned format.
// Deprecated: prefer RTCPBinaryFormatCallback
type RTCPFormatCallback func([]rtcp.Packet, interceptor.Attributes) string

// DefaultRTPFormatter returns the default log format for RTP packets
// Deprecated: useless export since set by default
func DefaultRTPFormatter(pkt *rtp.Packet, _ interceptor.Attributes) string {
return fmt.Sprintf("%s\n", pkt)
}

// DefaultRTCPFormatter returns the default log format for RTCP packets
// Deprecated: useless export since set by default
func DefaultRTCPFormatter(pkts []rtcp.Packet, _ interceptor.Attributes) string {
return fmt.Sprintf("%s\n", pkts)
}

// RTPBinaryFormatCallback can be used to apply custom formatting or marshalling to each dumped RTP packet

Check failure on line 38 in pkg/packetdump/format.go

View workflow job for this annotation

GitHub Actions / lint / Go

`marshalling` is a misspelling of `marshaling` (misspell)
type RTPBinaryFormatCallback func(*rtp.Packet, interceptor.Attributes) ([]byte, error)

// RTCPBinaryFormatCallback can be used to apply custom formatting or marshalling to each dumped RTCP packet

Check failure on line 41 in pkg/packetdump/format.go

View workflow job for this annotation

GitHub Actions / lint / Go

`marshalling` is a misspelling of `marshaling` (misspell)
type RTCPBinaryFormatCallback func(rtcp.Packet, interceptor.Attributes) ([]byte, error)
27 changes: 27 additions & 0 deletions pkg/packetdump/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func RTCPWriter(w io.Writer) PacketDumperOption {
}

// RTPFormatter sets the RTP format
// Deprecated: prefer RTPBinaryFormatter
func RTPFormatter(f RTPFormatCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtpFormat = f
Expand All @@ -45,13 +46,30 @@ func RTPFormatter(f RTPFormatCallback) PacketDumperOption {
}

// RTCPFormatter sets the RTCP format
// Deprecated: prefer RTCPBinaryFormatter
func RTCPFormatter(f RTCPFormatCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtcpFormat = f
return nil
}
}

// RTPBinaryFormatter sets the RTP binary formatter
func RTPBinaryFormatter(f RTPBinaryFormatCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtpFormatBinary = f
return nil
}
}

// RTCPBinaryFormatter sets the RTCP binary formatter
func RTCPBinaryFormatter(f RTCPBinaryFormatCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtcpFormatBinary = f
return nil
}
}

// RTPFilter sets the RTP filter.
func RTPFilter(callback RTPFilterCallback) PacketDumperOption {
return func(d *PacketDumper) error {
Expand All @@ -61,9 +79,18 @@ func RTPFilter(callback RTPFilterCallback) PacketDumperOption {
}

// RTCPFilter sets the RTCP filter.
// Deprecated: prefer RTCPFilterPerPacket
func RTCPFilter(callback RTCPFilterCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtcpFilter = callback
return nil
}
}

// RTCPFilterPerPacket sets the RTCP per-packet filter.
func RTCPFilterPerPacket(callback RTCPFilterPerPacketCallback) PacketDumperOption {
return func(d *PacketDumper) error {
d.rtcpFilterPerPacket = callback
return nil
}

Check warning on line 95 in pkg/packetdump/option.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/option.go#L91-L95

Added lines #L91 - L95 were not covered by tests
}
85 changes: 72 additions & 13 deletions pkg/packetdump/packet_dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,40 @@ type PacketDumper struct {
rtpStream io.Writer
rtcpStream io.Writer

rtpFormatBinary RTPBinaryFormatCallback
rtcpFormatBinary RTCPBinaryFormatCallback

rtpFormat RTPFormatCallback
rtcpFormat RTCPFormatCallback

rtpFilter RTPFilterCallback
rtcpFilter RTCPFilterCallback
rtpFilter RTPFilterCallback
rtcpFilter RTCPFilterCallback
rtcpFilterPerPacket RTCPFilterPerPacketCallback
}

// NewPacketDumper creates a new PacketDumper
func NewPacketDumper(opts ...PacketDumperOption) (*PacketDumper, error) {
d := &PacketDumper{
log: logging.NewDefaultLoggerFactory().NewLogger("packet_dumper"),
wg: sync.WaitGroup{},
close: make(chan struct{}),
rtpChan: make(chan *rtpDump),
rtcpChan: make(chan *rtcpDump),
rtpStream: os.Stdout,
rtcpStream: os.Stdout,
rtpFormat: DefaultRTPFormatter,
rtcpFormat: DefaultRTCPFormatter,
log: logging.NewDefaultLoggerFactory().NewLogger("packet_dumper"),
wg: sync.WaitGroup{},
close: make(chan struct{}),
rtpChan: make(chan *rtpDump),
rtcpChan: make(chan *rtcpDump),
rtpStream: os.Stdout,
rtcpStream: os.Stdout,
rtpFormat: nil,
rtcpFormat: nil,
rtpFormatBinary: nil,
rtcpFormatBinary: nil,
rtpFilter: func(*rtp.Packet) bool {
return true
},
rtcpFilter: func([]rtcp.Packet) bool {
return true
},
rtcpFilterPerPacket: func(rtcp.Packet) bool {
return true
},
}

for _, opt := range opts {
Expand All @@ -61,6 +70,14 @@ func NewPacketDumper(opts ...PacketDumperOption) (*PacketDumper, error) {
}
}

if d.rtpFormat == nil && d.rtpFormatBinary == nil {
d.rtpFormat = DefaultRTPFormatter
}

if d.rtcpFormat == nil && d.rtcpFormatBinary == nil {
d.rtcpFormat = DefaultRTCPFormatter
}

d.wg.Add(1)
go d.loop()

Expand Down Expand Up @@ -117,13 +134,55 @@ func (d *PacketDumper) loop() {
case <-d.close:
return
case dump := <-d.rtpChan:
if d.rtpFilter(dump.packet) {
if !d.rtpFilter(dump.packet) {
continue
}

if d.rtpFormatBinary != nil {
dumped, err := d.rtpFormatBinary(dump.packet, dump.attributes)
if err != nil {
d.log.Errorf("could not format RTP packet: %v", err)
continue

Check warning on line 145 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L144-L145

Added lines #L144 - L145 were not covered by tests
}
_, err = d.rtpStream.Write(dumped)
if err != nil {
d.log.Errorf("could not write formatted RTP packet %v", err)
continue

Check warning on line 150 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L149-L150

Added lines #L149 - L150 were not covered by tests
}
}

if d.rtpFormat != nil {
if _, err := fmt.Fprint(d.rtpStream, d.rtpFormat(dump.packet, dump.attributes)); err != nil {
d.log.Errorf("could not dump RTP packet %v", err)
continue

Check warning on line 157 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L157

Added line #L157 was not covered by tests
}
}

case dump := <-d.rtcpChan:
if d.rtcpFilter(dump.packets) {
if !d.rtcpFilter(dump.packets) {
continue
}

for _, pkt := range dump.packets {
if !d.rtcpFilterPerPacket(pkt) {
continue

Check warning on line 168 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L168

Added line #L168 was not covered by tests
}

if d.rtcpFormatBinary != nil {
dumped, err := d.rtcpFormatBinary(pkt, dump.attributes)
if err != nil {
d.log.Errorf("could not format RTCP packet: %v", err)
continue

Check warning on line 175 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L174-L175

Added lines #L174 - L175 were not covered by tests
}
_, err = d.rtcpStream.Write(dumped)
if err != nil {
d.log.Errorf("could not write formatted RTCP packet: %v", err)
continue

Check warning on line 180 in pkg/packetdump/packet_dumper.go

View check run for this annotation

Codecov / codecov/patch

pkg/packetdump/packet_dumper.go#L179-L180

Added lines #L179 - L180 were not covered by tests
}
}
}

if d.rtcpFormat != nil {
if _, err := fmt.Fprint(d.rtcpStream, d.rtcpFormat(dump.packets, dump.attributes)); err != nil {
d.log.Errorf("could not dump RTCP packet %v", err)
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/packetdump/receiver_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,59 @@ func TestReceiverFilterNothing(t *testing.T) {

assert.NotZero(t, buf.Len())
}

func TestReceiverCustomBinaryFormatter(t *testing.T) {
rtpBuf := bytes.Buffer{}
rtcpBuf := bytes.Buffer{}

factory, err := NewReceiverInterceptor(
RTPWriter(&rtpBuf),
RTCPWriter(&rtcpBuf),
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
// custom binary formatter to dump only seqno mod 256
RTPBinaryFormatter(func(p *rtp.Packet, a interceptor.Attributes) ([]byte, error) {

Check failure on line 121 in pkg/packetdump/receiver_interceptor_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

unused-parameter: parameter 'a' seems to be unused, consider removing or renaming it as _ (revive)
return []byte{byte(p.SequenceNumber)}, nil
}),
// custom binary formatter to dump only DestinationSSRCs mod 256
RTCPBinaryFormatter(func(p rtcp.Packet, a interceptor.Attributes) ([]byte, error) {
b := make([]byte, 0)
for _, ssrc := range p.DestinationSSRC() {
b = append(b, byte(ssrc))
}
return b, nil
}),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("")
assert.NoError(t, err)

assert.EqualValues(t, 0, rtpBuf.Len())
assert.EqualValues(t, 0, rtcpBuf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
SenderSSRC: 123,
MediaSSRC: 45,
}})
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
SequenceNumber: uint16(123),
}})

// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)

err = i.Close()
assert.NoError(t, err)

// check that there is custom formatter results in buffer
assert.Equal(t, []byte{123}, rtpBuf.Bytes())
assert.Equal(t, []byte{45}, rtcpBuf.Bytes())
}
59 changes: 59 additions & 0 deletions pkg/packetdump/sender_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,62 @@ func TestSenderFilterNothing(t *testing.T) {

assert.NotZero(t, buf.Len())
}

func TestSenderCustomBinaryFormatter(t *testing.T) {
rtpBuf := bytes.Buffer{}
rtcpBuf := bytes.Buffer{}

factory, err := NewSenderInterceptor(
RTPWriter(&rtpBuf),
RTCPWriter(&rtcpBuf),
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
// custom binary formatter to dump only seqno mod 256
RTPBinaryFormatter(func(p *rtp.Packet, a interceptor.Attributes) ([]byte, error) {

Check failure on line 127 in pkg/packetdump/sender_interceptor_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

unused-parameter: parameter 'a' seems to be unused, consider removing or renaming it as _ (revive)
return []byte{byte(p.SequenceNumber)}, nil
}),
// custom binary formatter to dump only DestinationSSRCs mod 256
RTCPBinaryFormatter(func(p rtcp.Packet, a interceptor.Attributes) ([]byte, error) {

Check failure on line 131 in pkg/packetdump/sender_interceptor_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

unused-parameter: parameter 'a' seems to be unused, consider removing or renaming it as _ (revive)
b := make([]byte, 0)
for _, ssrc := range p.DestinationSSRC() {
b = append(b, byte(ssrc))
}
return b, nil
}),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("")
assert.NoError(t, err)

assert.EqualValues(t, 0, rtpBuf.Len())
assert.EqualValues(t, 0, rtcpBuf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

err = stream.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
SenderSSRC: 123,
MediaSSRC: 45,
}})
assert.NoError(t, err)

err = stream.WriteRTP(&rtp.Packet{Header: rtp.Header{
SequenceNumber: uint16(123),
}})
assert.NoError(t, err)

// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)

err = i.Close()
assert.NoError(t, err)

// check that there is custom formatter results in buffer
assert.Equal(t, []byte{123}, rtpBuf.Bytes())
assert.Equal(t, []byte{45}, rtcpBuf.Bytes())
}

0 comments on commit 5a55af3

Please sign in to comment.