From 749922abf77db460d59e59c8a827bfa47db35ed5 Mon Sep 17 00:00:00 2001 From: Rob Elsner Date: Tue, 22 Oct 2024 17:33:30 -0600 Subject: [PATCH] jitterbuffer: Add packet skipping in interceptor --- internal/test/mock_stream.go | 1 - pkg/jitterbuffer/jitter_buffer.go | 8 ++- pkg/jitterbuffer/option.go | 7 +++ pkg/jitterbuffer/receiver_interceptor.go | 51 ++++++++++------- pkg/jitterbuffer/receiver_interceptor_test.go | 57 ++++++++++++++++++- 5 files changed, 101 insertions(+), 23 deletions(-) diff --git a/internal/test/mock_stream.go b/internal/test/mock_stream.go index bf96e31b..dd545498 100644 --- a/internal/test/mock_stream.go +++ b/internal/test/mock_stream.go @@ -92,7 +92,6 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc if !ok { return 0, nil, io.EOF } - marshaled, err := p.Marshal() if err != nil { return 0, nil, io.EOF diff --git a/pkg/jitterbuffer/jitter_buffer.go b/pkg/jitterbuffer/jitter_buffer.go index 863a0a80..a3897d45 100644 --- a/pkg/jitterbuffer/jitter_buffer.go +++ b/pkg/jitterbuffer/jitter_buffer.go @@ -128,6 +128,12 @@ func (jb *JitterBuffer) PlayoutHead() uint16 { return jb.playoutHead } +func (jb *JitterBuffer) Length() uint16 { + jb.mutex.Lock() + defer jb.mutex.Unlock() + return jb.packets.Length() +} + // SetPlayoutHead allows you to manually specify the packet you wish to pop next // If you have encountered a packet that hasn't resolved you can skip it func (jb *JitterBuffer) SetPlayoutHead(playoutHead uint16) { @@ -155,7 +161,7 @@ func (jb *JitterBuffer) Push(packet *rtp.Packet) { if jb.packets.Length() == 0 { jb.emit(StartBuffering) } - if jb.packets.Length() > 100 { + if jb.packets.Length() > 2*jb.minStartCount { jb.stats.overflowCount++ jb.emit(BufferOverflow) } diff --git a/pkg/jitterbuffer/option.go b/pkg/jitterbuffer/option.go index 9a33c22e..b22a3f41 100644 --- a/pkg/jitterbuffer/option.go +++ b/pkg/jitterbuffer/option.go @@ -17,3 +17,10 @@ func Log(log logging.LeveledLogger) ReceiverInterceptorOption { return nil } } + +func WithSkipMissingPackets() ReceiverInterceptorOption { + return func(d *ReceiverInterceptor) error { + d.skipMissingPackets = true + return nil + } +} diff --git a/pkg/jitterbuffer/receiver_interceptor.go b/pkg/jitterbuffer/receiver_interceptor.go index b4c032b9..4f66bb13 100644 --- a/pkg/jitterbuffer/receiver_interceptor.go +++ b/pkg/jitterbuffer/receiver_interceptor.go @@ -4,6 +4,7 @@ package jitterbuffer import ( + "errors" "sync" "github.com/pion/interceptor" @@ -17,10 +18,13 @@ type InterceptorFactory struct { } // NewInterceptor constructs a new ReceiverInterceptor -func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) { +func (g *InterceptorFactory) NewInterceptor(logName string) (interceptor.Interceptor, error) { + if logName == "" { + logName = "jitterbuffer" + } i := &ReceiverInterceptor{ close: make(chan struct{}), - log: logging.NewDefaultLoggerFactory().NewLogger("jitterbuffer"), + log: logging.NewDefaultLoggerFactory().NewLogger(logName), buffer: New(), } @@ -52,11 +56,11 @@ func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, // arriving) quickly enough. type ReceiverInterceptor struct { interceptor.NoOp - buffer *JitterBuffer - m sync.Mutex - wg sync.WaitGroup - close chan struct{} - log logging.LeveledLogger + buffer *JitterBuffer + wg sync.WaitGroup + close chan struct{} + log logging.LeveledLogger + skipMissingPackets bool } // NewInterceptor returns a new InterceptorFactory @@ -74,19 +78,32 @@ func (i *ReceiverInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader return n, attr, err } packet := &rtp.Packet{} - if err := packet.Unmarshal(buf); err != nil { + if err := packet.Unmarshal(buf[:n]); err != nil { return 0, nil, err } - i.m.Lock() - defer i.m.Unlock() i.buffer.Push(packet) if i.buffer.state == Emitting { - newPkt, err := i.buffer.Pop() - if err != nil { - return 0, nil, err + for { + newPkt, err := i.buffer.Pop() + if err != nil { + if errors.Is(err, ErrNotFound) { + if i.skipMissingPackets { + i.log.Warn("Skipping missing packet") + i.buffer.SetPlayoutHead(i.buffer.PlayoutHead() + 1) + continue + } + } + return 0, nil, err + } + if newPkt != nil { + nlen, err := newPkt.MarshalTo(b) + return nlen, attr, err + + } + if i.buffer.Length() == 0 { + break + } } - nlen, err := newPkt.MarshalTo(b) - return nlen, attr, err } return n, attr, ErrPopWhileBuffering }) @@ -95,16 +112,12 @@ func (i *ReceiverInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader // UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track. func (i *ReceiverInterceptor) UnbindRemoteStream(_ *interceptor.StreamInfo) { defer i.wg.Wait() - i.m.Lock() - defer i.m.Unlock() i.buffer.Clear(true) } // Close closes the interceptor func (i *ReceiverInterceptor) Close() error { defer i.wg.Wait() - i.m.Lock() - defer i.m.Unlock() i.buffer.Clear(true) return nil } diff --git a/pkg/jitterbuffer/receiver_interceptor_test.go b/pkg/jitterbuffer/receiver_interceptor_test.go index 58685966..f9f31cea 100644 --- a/pkg/jitterbuffer/receiver_interceptor_test.go +++ b/pkg/jitterbuffer/receiver_interceptor_test.go @@ -80,15 +80,18 @@ func TestReceiverBuffersAndPlaysout(t *testing.T) { SenderSSRC: 123, MediaSSRC: 456, }}) - for s := 0; s < 61; s++ { + for s := 0; s < 910; s++ { stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ SequenceNumber: uint16(s), }}) } // Give time for packets to be handled and stream written to. time.Sleep(50 * time.Millisecond) - for s := 0; s < 10; s++ { + for s := 0; s < 50; s++ { read := <-stream.ReadRTP() + if read.Err != nil { + t.Fatal(read.Err) + } seq := read.Packet.Header.SequenceNumber assert.EqualValues(t, uint16(s), seq) } @@ -96,3 +99,53 @@ func TestReceiverBuffersAndPlaysout(t *testing.T) { err = i.Close() assert.NoError(t, err) } + +func TestReceiverBuffersAndPlaysoutSkippingMissingPackets(t *testing.T) { + buf := bytes.Buffer{} + + factory, err := NewInterceptor( + Log(logging.NewDefaultLoggerFactory().NewLogger("test")), + WithSkipMissingPackets(), + ) + assert.NoError(t, err) + + i, err := factory.NewInterceptor("jitterbuffer") + assert.NoError(t, err) + + assert.EqualValues(t, 0, buf.Len()) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + + for s := 0; s < 420; s++ { + if s == 6 { + s++ + } + if s == 40 { + s = s + 20 + } + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: uint16(s), + }}) + } + + for s := 0; s < 100; s++ { + read := <-stream.ReadRTP() + if read.Err != nil { + continue + } + seq := read.Packet.Header.SequenceNumber + if s == 6 { + s++ + } + if s == 40 { + s = s + 20 + } + assert.EqualValues(t, uint16(s), seq) + } + assert.NoError(t, stream.Close()) + err = i.Close() + assert.NoError(t, err) +}