From 5a55af3eb17bd67784672c572d4635a4738d2734 Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Fri, 17 Jan 2025 19:36:16 +0300 Subject: [PATCH] Support for binary formatters in packetdump --- pkg/packetdump/filter.go | 5 ++ pkg/packetdump/format.go | 10 +++ pkg/packetdump/option.go | 27 +++++++ pkg/packetdump/packet_dumper.go | 85 +++++++++++++++++---- pkg/packetdump/receiver_interceptor_test.go | 56 ++++++++++++++ pkg/packetdump/sender_interceptor_test.go | 59 ++++++++++++++ 6 files changed, 229 insertions(+), 13 deletions(-) diff --git a/pkg/packetdump/filter.go b/pkg/packetdump/filter.go index c20026b8..b996cfae 100644 --- a/pkg/packetdump/filter.go +++ b/pkg/packetdump/filter.go @@ -14,4 +14,9 @@ type RTPFilterCallback func(pkt *rtp.Packet) bool // RTCPFilterCallback can be used to filter RTCP packets to dump. // The callback returns whether or not to print dump the packet's content. +// Deprecated: prefer RTCPFilterPerPacketCallback type RTCPFilterCallback func(pkt []rtcp.Packet) bool + +// RTCPFilterPerPacketCallback can be used to filter RTCP packets to dump. +// It's called once per every packet opposing to RTCPFilterCallback which is called once per packet batch +type RTCPFilterPerPacketCallback func(pkt rtcp.Packet) bool diff --git a/pkg/packetdump/format.go b/pkg/packetdump/format.go index 889e084e..ab4788cd 100644 --- a/pkg/packetdump/format.go +++ b/pkg/packetdump/format.go @@ -14,19 +14,29 @@ import ( // RTPFormatCallback can be used to apply custom formatting to each dumped RTP // packet. If new lines should be added after each packet, they must be included // in the returned format. +// Deprecated: prefer RTPBinaryFormatCallback type RTPFormatCallback func(*rtp.Packet, interceptor.Attributes) string // RTCPFormatCallback can be used to apply custom formatting to each dumped RTCP // packet. If new lines should be added after each packet, they must be included // in the returned format. +// Deprecated: prefer RTCPBinaryFormatCallback type RTCPFormatCallback func([]rtcp.Packet, interceptor.Attributes) string // DefaultRTPFormatter returns the default log format for RTP packets +// Deprecated: useless export since set by default func DefaultRTPFormatter(pkt *rtp.Packet, _ interceptor.Attributes) string { return fmt.Sprintf("%s\n", pkt) } // DefaultRTCPFormatter returns the default log format for RTCP packets +// Deprecated: useless export since set by default func DefaultRTCPFormatter(pkts []rtcp.Packet, _ interceptor.Attributes) string { return fmt.Sprintf("%s\n", pkts) } + +// RTPBinaryFormatCallback can be used to apply custom formatting or marshalling to each dumped RTP packet +type RTPBinaryFormatCallback func(*rtp.Packet, interceptor.Attributes) ([]byte, error) + +// RTCPBinaryFormatCallback can be used to apply custom formatting or marshalling to each dumped RTCP packet +type RTCPBinaryFormatCallback func(rtcp.Packet, interceptor.Attributes) ([]byte, error) diff --git a/pkg/packetdump/option.go b/pkg/packetdump/option.go index 9be23ea0..57251375 100644 --- a/pkg/packetdump/option.go +++ b/pkg/packetdump/option.go @@ -37,6 +37,7 @@ func RTCPWriter(w io.Writer) PacketDumperOption { } // RTPFormatter sets the RTP format +// Deprecated: prefer RTPBinaryFormatter func RTPFormatter(f RTPFormatCallback) PacketDumperOption { return func(d *PacketDumper) error { d.rtpFormat = f @@ -45,6 +46,7 @@ func RTPFormatter(f RTPFormatCallback) PacketDumperOption { } // RTCPFormatter sets the RTCP format +// Deprecated: prefer RTCPBinaryFormatter func RTCPFormatter(f RTCPFormatCallback) PacketDumperOption { return func(d *PacketDumper) error { d.rtcpFormat = f @@ -52,6 +54,22 @@ func RTCPFormatter(f RTCPFormatCallback) PacketDumperOption { } } +// RTPBinaryFormatter sets the RTP binary formatter +func RTPBinaryFormatter(f RTPBinaryFormatCallback) PacketDumperOption { + return func(d *PacketDumper) error { + d.rtpFormatBinary = f + return nil + } +} + +// RTCPBinaryFormatter sets the RTCP binary formatter +func RTCPBinaryFormatter(f RTCPBinaryFormatCallback) PacketDumperOption { + return func(d *PacketDumper) error { + d.rtcpFormatBinary = f + return nil + } +} + // RTPFilter sets the RTP filter. func RTPFilter(callback RTPFilterCallback) PacketDumperOption { return func(d *PacketDumper) error { @@ -61,9 +79,18 @@ func RTPFilter(callback RTPFilterCallback) PacketDumperOption { } // RTCPFilter sets the RTCP filter. +// Deprecated: prefer RTCPFilterPerPacket func RTCPFilter(callback RTCPFilterCallback) PacketDumperOption { return func(d *PacketDumper) error { d.rtcpFilter = callback return nil } } + +// RTCPFilterPerPacket sets the RTCP per-packet filter. +func RTCPFilterPerPacket(callback RTCPFilterPerPacketCallback) PacketDumperOption { + return func(d *PacketDumper) error { + d.rtcpFilterPerPacket = callback + return nil + } +} diff --git a/pkg/packetdump/packet_dumper.go b/pkg/packetdump/packet_dumper.go index 12e08063..9055aaf0 100644 --- a/pkg/packetdump/packet_dumper.go +++ b/pkg/packetdump/packet_dumper.go @@ -28,31 +28,40 @@ type PacketDumper struct { rtpStream io.Writer rtcpStream io.Writer + rtpFormatBinary RTPBinaryFormatCallback + rtcpFormatBinary RTCPBinaryFormatCallback + rtpFormat RTPFormatCallback rtcpFormat RTCPFormatCallback - rtpFilter RTPFilterCallback - rtcpFilter RTCPFilterCallback + rtpFilter RTPFilterCallback + rtcpFilter RTCPFilterCallback + rtcpFilterPerPacket RTCPFilterPerPacketCallback } // NewPacketDumper creates a new PacketDumper func NewPacketDumper(opts ...PacketDumperOption) (*PacketDumper, error) { d := &PacketDumper{ - log: logging.NewDefaultLoggerFactory().NewLogger("packet_dumper"), - wg: sync.WaitGroup{}, - close: make(chan struct{}), - rtpChan: make(chan *rtpDump), - rtcpChan: make(chan *rtcpDump), - rtpStream: os.Stdout, - rtcpStream: os.Stdout, - rtpFormat: DefaultRTPFormatter, - rtcpFormat: DefaultRTCPFormatter, + log: logging.NewDefaultLoggerFactory().NewLogger("packet_dumper"), + wg: sync.WaitGroup{}, + close: make(chan struct{}), + rtpChan: make(chan *rtpDump), + rtcpChan: make(chan *rtcpDump), + rtpStream: os.Stdout, + rtcpStream: os.Stdout, + rtpFormat: nil, + rtcpFormat: nil, + rtpFormatBinary: nil, + rtcpFormatBinary: nil, rtpFilter: func(*rtp.Packet) bool { return true }, rtcpFilter: func([]rtcp.Packet) bool { return true }, + rtcpFilterPerPacket: func(rtcp.Packet) bool { + return true + }, } for _, opt := range opts { @@ -61,6 +70,14 @@ func NewPacketDumper(opts ...PacketDumperOption) (*PacketDumper, error) { } } + if d.rtpFormat == nil && d.rtpFormatBinary == nil { + d.rtpFormat = DefaultRTPFormatter + } + + if d.rtcpFormat == nil && d.rtcpFormatBinary == nil { + d.rtcpFormat = DefaultRTCPFormatter + } + d.wg.Add(1) go d.loop() @@ -117,13 +134,55 @@ func (d *PacketDumper) loop() { case <-d.close: return case dump := <-d.rtpChan: - if d.rtpFilter(dump.packet) { + if !d.rtpFilter(dump.packet) { + continue + } + + if d.rtpFormatBinary != nil { + dumped, err := d.rtpFormatBinary(dump.packet, dump.attributes) + if err != nil { + d.log.Errorf("could not format RTP packet: %v", err) + continue + } + _, err = d.rtpStream.Write(dumped) + if err != nil { + d.log.Errorf("could not write formatted RTP packet %v", err) + continue + } + } + + if d.rtpFormat != nil { if _, err := fmt.Fprint(d.rtpStream, d.rtpFormat(dump.packet, dump.attributes)); err != nil { d.log.Errorf("could not dump RTP packet %v", err) + continue } } + case dump := <-d.rtcpChan: - if d.rtcpFilter(dump.packets) { + if !d.rtcpFilter(dump.packets) { + continue + } + + for _, pkt := range dump.packets { + if !d.rtcpFilterPerPacket(pkt) { + continue + } + + if d.rtcpFormatBinary != nil { + dumped, err := d.rtcpFormatBinary(pkt, dump.attributes) + if err != nil { + d.log.Errorf("could not format RTCP packet: %v", err) + continue + } + _, err = d.rtcpStream.Write(dumped) + if err != nil { + d.log.Errorf("could not write formatted RTCP packet: %v", err) + continue + } + } + } + + if d.rtcpFormat != nil { if _, err := fmt.Fprint(d.rtcpStream, d.rtcpFormat(dump.packets, dump.attributes)); err != nil { d.log.Errorf("could not dump RTCP packet %v", err) } diff --git a/pkg/packetdump/receiver_interceptor_test.go b/pkg/packetdump/receiver_interceptor_test.go index 0402db1e..4040e3e9 100644 --- a/pkg/packetdump/receiver_interceptor_test.go +++ b/pkg/packetdump/receiver_interceptor_test.go @@ -108,3 +108,59 @@ func TestReceiverFilterNothing(t *testing.T) { assert.NotZero(t, buf.Len()) } + +func TestReceiverCustomBinaryFormatter(t *testing.T) { + rtpBuf := bytes.Buffer{} + rtcpBuf := bytes.Buffer{} + + factory, err := NewReceiverInterceptor( + RTPWriter(&rtpBuf), + RTCPWriter(&rtcpBuf), + Log(logging.NewDefaultLoggerFactory().NewLogger("test")), + // custom binary formatter to dump only seqno mod 256 + RTPBinaryFormatter(func(p *rtp.Packet, a interceptor.Attributes) ([]byte, error) { + return []byte{byte(p.SequenceNumber)}, nil + }), + // custom binary formatter to dump only DestinationSSRCs mod 256 + RTCPBinaryFormatter(func(p rtcp.Packet, a interceptor.Attributes) ([]byte, error) { + b := make([]byte, 0) + for _, ssrc := range p.DestinationSSRC() { + b = append(b, byte(ssrc)) + } + return b, nil + }), + ) + assert.NoError(t, err) + + i, err := factory.NewInterceptor("") + assert.NoError(t, err) + + assert.EqualValues(t, 0, rtpBuf.Len()) + assert.EqualValues(t, 0, rtcpBuf.Len()) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{ + SenderSSRC: 123, + MediaSSRC: 45, + }}) + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: uint16(123), + }}) + + // Give time for packets to be handled and stream written to. + time.Sleep(50 * time.Millisecond) + + err = i.Close() + assert.NoError(t, err) + + // check that there is custom formatter results in buffer + assert.Equal(t, []byte{123}, rtpBuf.Bytes()) + assert.Equal(t, []byte{45}, rtcpBuf.Bytes()) +} diff --git a/pkg/packetdump/sender_interceptor_test.go b/pkg/packetdump/sender_interceptor_test.go index b005b557..e75e7a70 100644 --- a/pkg/packetdump/sender_interceptor_test.go +++ b/pkg/packetdump/sender_interceptor_test.go @@ -114,3 +114,62 @@ func TestSenderFilterNothing(t *testing.T) { assert.NotZero(t, buf.Len()) } + +func TestSenderCustomBinaryFormatter(t *testing.T) { + rtpBuf := bytes.Buffer{} + rtcpBuf := bytes.Buffer{} + + factory, err := NewSenderInterceptor( + RTPWriter(&rtpBuf), + RTCPWriter(&rtcpBuf), + Log(logging.NewDefaultLoggerFactory().NewLogger("test")), + // custom binary formatter to dump only seqno mod 256 + RTPBinaryFormatter(func(p *rtp.Packet, a interceptor.Attributes) ([]byte, error) { + return []byte{byte(p.SequenceNumber)}, nil + }), + // custom binary formatter to dump only DestinationSSRCs mod 256 + RTCPBinaryFormatter(func(p rtcp.Packet, a interceptor.Attributes) ([]byte, error) { + b := make([]byte, 0) + for _, ssrc := range p.DestinationSSRC() { + b = append(b, byte(ssrc)) + } + return b, nil + }), + ) + assert.NoError(t, err) + + i, err := factory.NewInterceptor("") + assert.NoError(t, err) + + assert.EqualValues(t, 0, rtpBuf.Len()) + assert.EqualValues(t, 0, rtcpBuf.Len()) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + err = stream.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{ + SenderSSRC: 123, + MediaSSRC: 45, + }}) + assert.NoError(t, err) + + err = stream.WriteRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: uint16(123), + }}) + assert.NoError(t, err) + + // Give time for packets to be handled and stream written to. + time.Sleep(50 * time.Millisecond) + + err = i.Close() + assert.NoError(t, err) + + // check that there is custom formatter results in buffer + assert.Equal(t, []byte{123}, rtpBuf.Bytes()) + assert.Equal(t, []byte{45}, rtcpBuf.Bytes()) +}