From dfa62915a1cb890b98d1280782a8d545287d9caa Mon Sep 17 00:00:00 2001 From: Rob Elsner Date: Tue, 22 Oct 2024 17:33:30 -0600 Subject: [PATCH] jitterbuffer: Add packet skipping in interceptor --- internal/test/mock_stream.go | 1 - pkg/jitterbuffer/jitter_buffer.go | 8 ++- pkg/jitterbuffer/option.go | 7 +++ pkg/jitterbuffer/receiver_interceptor.go | 51 +++++++++++------ pkg/jitterbuffer/receiver_interceptor_test.go | 57 ++++++++++++++++++- 5 files changed, 102 insertions(+), 22 deletions(-) diff --git a/internal/test/mock_stream.go b/internal/test/mock_stream.go index 83d0732..83ef396 100644 --- a/internal/test/mock_stream.go +++ b/internal/test/mock_stream.go @@ -103,7 +103,6 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc if !ok { return 0, nil, io.EOF } - marshaled, err := p.Marshal() if err != nil { return 0, nil, io.EOF diff --git a/pkg/jitterbuffer/jitter_buffer.go b/pkg/jitterbuffer/jitter_buffer.go index 2d460f2..52bd570 100644 --- a/pkg/jitterbuffer/jitter_buffer.go +++ b/pkg/jitterbuffer/jitter_buffer.go @@ -130,6 +130,12 @@ func (jb *JitterBuffer) PlayoutHead() uint16 { return jb.playoutHead } +func (jb *JitterBuffer) Length() uint16 { + jb.mutex.Lock() + defer jb.mutex.Unlock() + return jb.packets.Length() +} + // SetPlayoutHead allows you to manually specify the packet you wish to pop next // If you have encountered a packet that hasn't resolved you can skip it. func (jb *JitterBuffer) SetPlayoutHead(playoutHead uint16) { @@ -157,7 +163,7 @@ func (jb *JitterBuffer) Push(packet *rtp.Packet) { if jb.packets.Length() == 0 { jb.emit(StartBuffering) } - if jb.packets.Length() > 100 { + if jb.packets.Length() > 2*jb.minStartCount { jb.stats.overflowCount++ jb.emit(BufferOverflow) } diff --git a/pkg/jitterbuffer/option.go b/pkg/jitterbuffer/option.go index 7a09df8..de53295 100644 --- a/pkg/jitterbuffer/option.go +++ b/pkg/jitterbuffer/option.go @@ -18,3 +18,10 @@ func Log(log logging.LeveledLogger) ReceiverInterceptorOption { return nil } } + +func WithSkipMissingPackets() ReceiverInterceptorOption { + return func(d *ReceiverInterceptor) error { + d.skipMissingPackets = true + return nil + } +} diff --git a/pkg/jitterbuffer/receiver_interceptor.go b/pkg/jitterbuffer/receiver_interceptor.go index cd133e2..80d2ab2 100644 --- a/pkg/jitterbuffer/receiver_interceptor.go +++ b/pkg/jitterbuffer/receiver_interceptor.go @@ -4,6 +4,7 @@ package jitterbuffer import ( + "errors" "sync" "github.com/pion/interceptor" @@ -16,11 +17,14 @@ type InterceptorFactory struct { opts []ReceiverInterceptorOption } -// NewInterceptor constructs a new ReceiverInterceptor. -func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) { +// NewInterceptor constructs a new ReceiverInterceptor +func (g *InterceptorFactory) NewInterceptor(logName string) (interceptor.Interceptor, error) { + if logName == "" { + logName = "jitterbuffer" + } i := &ReceiverInterceptor{ close: make(chan struct{}), - log: logging.NewDefaultLoggerFactory().NewLogger("jitterbuffer"), + log: logging.NewDefaultLoggerFactory().NewLogger(logName), buffer: New(), } @@ -52,11 +56,11 @@ func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, // arriving) quickly enough. type ReceiverInterceptor struct { interceptor.NoOp - buffer *JitterBuffer - m sync.Mutex - wg sync.WaitGroup - close chan struct{} - log logging.LeveledLogger + buffer *JitterBuffer + wg sync.WaitGroup + close chan struct{} + log logging.LeveledLogger + skipMissingPackets bool } // NewInterceptor returns a new InterceptorFactory. @@ -76,16 +80,31 @@ func (i *ReceiverInterceptor) BindRemoteStream( return n, attr, err } packet := &rtp.Packet{} - if err := packet.Unmarshal(buf); err != nil { + if err := packet.Unmarshal(buf[:n]); err != nil { return 0, nil, err } - i.m.Lock() - defer i.m.Unlock() i.buffer.Push(packet) if i.buffer.state == Emitting { - newPkt, err := i.buffer.Pop() - if err != nil { - return 0, nil, err + for { + newPkt, err := i.buffer.Pop() + if err != nil { + if errors.Is(err, ErrNotFound) { + if i.skipMissingPackets { + i.log.Warn("Skipping missing packet") + i.buffer.SetPlayoutHead(i.buffer.PlayoutHead() + 1) + continue + } + } + return 0, nil, err + } + if newPkt != nil { + nlen, err := newPkt.MarshalTo(b) + return nlen, attr, err + + } + if i.buffer.Length() == 0 { + break + } } nlen, err := newPkt.MarshalTo(b) @@ -99,16 +118,12 @@ func (i *ReceiverInterceptor) BindRemoteStream( // UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track. func (i *ReceiverInterceptor) UnbindRemoteStream(_ *interceptor.StreamInfo) { defer i.wg.Wait() - i.m.Lock() - defer i.m.Unlock() i.buffer.Clear(true) } // Close closes the interceptor. func (i *ReceiverInterceptor) Close() error { defer i.wg.Wait() - i.m.Lock() - defer i.m.Unlock() i.buffer.Clear(true) return nil diff --git a/pkg/jitterbuffer/receiver_interceptor_test.go b/pkg/jitterbuffer/receiver_interceptor_test.go index 5492dd3..d3a01e2 100644 --- a/pkg/jitterbuffer/receiver_interceptor_test.go +++ b/pkg/jitterbuffer/receiver_interceptor_test.go @@ -80,15 +80,18 @@ func TestReceiverBuffersAndPlaysout(t *testing.T) { SenderSSRC: 123, MediaSSRC: 456, }}) - for s := 0; s < 61; s++ { + for s := 0; s < 910; s++ { stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ SequenceNumber: uint16(s), //nolint:gosec // G115 }}) } // Give time for packets to be handled and stream written to. time.Sleep(50 * time.Millisecond) - for s := 0; s < 10; s++ { + for s := 0; s < 50; s++ { read := <-stream.ReadRTP() + if read.Err != nil { + t.Fatal(read.Err) + } seq := read.Packet.Header.SequenceNumber assert.EqualValues(t, uint16(s), seq) //nolint:gosec // G115 } @@ -96,3 +99,53 @@ func TestReceiverBuffersAndPlaysout(t *testing.T) { err = testInterceptor.Close() assert.NoError(t, err) } + +func TestReceiverBuffersAndPlaysoutSkippingMissingPackets(t *testing.T) { + buf := bytes.Buffer{} + + factory, err := NewInterceptor( + Log(logging.NewDefaultLoggerFactory().NewLogger("test")), + WithSkipMissingPackets(), + ) + assert.NoError(t, err) + + i, err := factory.NewInterceptor("jitterbuffer") + assert.NoError(t, err) + + assert.EqualValues(t, 0, buf.Len()) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + + for s := 0; s < 420; s++ { + if s == 6 { + s++ + } + if s == 40 { + s = s + 20 + } + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: uint16(s), + }}) + } + + for s := 0; s < 100; s++ { + read := <-stream.ReadRTP() + if read.Err != nil { + continue + } + seq := read.Packet.Header.SequenceNumber + if s == 6 { + s++ + } + if s == 40 { + s = s + 20 + } + assert.EqualValues(t, uint16(s), seq) + } + assert.NoError(t, stream.Close()) + err = i.Close() + assert.NoError(t, err) +}