Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an initial, very simple, jitter buffer #222

Merged
merged 1 commit into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions pkg/jitterbuffer/jitter_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package jitterbuffer implements a buffer for RTP packets designed to help
// counteract non-deterministic sources of latency
package jitterbuffer

import (
"errors"
"math"
"sync"

"github.com/pion/rtp"
)

// State tracks a JitterBuffer as either Buffering or Emitting
type State uint16

// Event represents all events a JitterBuffer can emit
type Event string

var (
// ErrBufferUnderrun is returned when the buffer has no items
ErrBufferUnderrun = errors.New("invalid Peek: Empty jitter buffer")
// ErrPopWhileBuffering is returned if a jitter buffer is not in a playback state
ErrPopWhileBuffering = errors.New("attempt to pop while buffering")
)

const (
// Buffering is the state when the jitter buffer has not started emitting yet, or has hit an underflow and needs to re-buffer packets
Buffering State = iota
// Emitting is the state when the jitter buffer is operating nominally
Emitting
)

const (
// StartBuffering is emitted when the buffer receives its first packet
StartBuffering Event = "startBuffering"
// BeginPlayback is emitted when the buffer has satisfied its buffer length
BeginPlayback = "playing"
// BufferUnderflow is emitted when the buffer does not have enough packets to Pop
BufferUnderflow = "underflow"
// BufferOverflow is emitted when the buffer has exceeded its limit
BufferOverflow = "overflow"
)

func (jbs State) String() string {
switch jbs {
case Buffering:
return "Buffering"
case Emitting:
return "Emitting"

Check warning on line 52 in pkg/jitterbuffer/jitter_buffer.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/jitter_buffer.go#L47-L52

Added lines #L47 - L52 were not covered by tests
}
return "unknown"

Check warning on line 54 in pkg/jitterbuffer/jitter_buffer.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/jitter_buffer.go#L54

Added line #L54 was not covered by tests
}

type (
// Option will Override JitterBuffer's defaults
Option func(jb *JitterBuffer)
// EventListener will be called when the corresponding Event occurs
EventListener func(event Event, jb *JitterBuffer)
)

// A JitterBuffer will accept Pushed packets, put them in sequence number
// order, and allows removing in either sequence number order or via a
// provided timestamp
type JitterBuffer struct {
packets *PriorityQueue
lastSequence uint16
playoutHead uint16
playoutReady bool
state State
stats Stats
listeners map[Event][]EventListener
mutex sync.Mutex
}

// Stats Track interesting statistics for the life of this JitterBuffer
// outOfOrderCount will provide the number of times a packet was Pushed
//
// without its predecessor being present
//
// underflowCount will provide the count of attempts to Pop an empty buffer
// overflowCount will track the number of times the jitter buffer exceeds its limit
type Stats struct {
outOfOrderCount uint32
underflowCount uint32
overflowCount uint32
}

// New will initialize a jitter buffer and its associated statistics
func New(opts ...Option) *JitterBuffer {
jb := &JitterBuffer{state: Buffering, stats: Stats{0, 0, 0}, packets: NewQueue(), listeners: make(map[Event][]EventListener)}
for _, o := range opts {
o(jb)
}

Check warning on line 96 in pkg/jitterbuffer/jitter_buffer.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/jitter_buffer.go#L95-L96

Added lines #L95 - L96 were not covered by tests
return jb
}

// Listen will register an event listener
// The jitter buffer may emit events correspnding, interested listerns should
// look at Event for available events
func (jb *JitterBuffer) Listen(event Event, cb EventListener) {
jb.listeners[event] = append(jb.listeners[event], cb)

Check warning on line 104 in pkg/jitterbuffer/jitter_buffer.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/jitter_buffer.go#L103-L104

Added lines #L103 - L104 were not covered by tests
}

func (jb *JitterBuffer) updateStats(lastPktSeqNo uint16) {
// If we have at least one packet, and the next packet being pushed in is not
// at the expected sequence number increment the out of order count
if jb.packets.Length() > 0 && lastPktSeqNo != ((jb.lastSequence+1)%math.MaxUint16) {
jb.stats.outOfOrderCount++
}
jb.lastSequence = lastPktSeqNo
}

// Push an RTP packet into the jitter buffer, this does not clone
// the data so if the memory is expected to be reused, the caller should
// take this in to account and pass a copy of the packet they wish to buffer
func (jb *JitterBuffer) Push(packet *rtp.Packet) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.packets.Length() == 0 {
jb.emit(StartBuffering)
}
if jb.packets.Length() > 100 {
jb.stats.overflowCount++
jb.emit(BufferOverflow)
}
if !jb.playoutReady && jb.packets.Length() == 0 {
jb.playoutHead = packet.SequenceNumber
}
jb.updateStats(packet.SequenceNumber)
jb.packets.Push(packet, packet.SequenceNumber)
jb.updateState()
}

func (jb *JitterBuffer) emit(event Event) {
for _, l := range jb.listeners[event] {
l(event, jb)
}

Check warning on line 140 in pkg/jitterbuffer/jitter_buffer.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/jitter_buffer.go#L139-L140

Added lines #L139 - L140 were not covered by tests
}

func (jb *JitterBuffer) updateState() {
// For now, we only look at the number of packets captured in the play buffer
if jb.packets.Length() >= 50 && jb.state == Buffering {
jb.state = Emitting
jb.playoutReady = true
jb.emit(BeginPlayback)
}
}

// Peek at the packet which is either:
//
// At the playout head when we are emitting, and the playoutHead flag is true
//
// or else
//
// At the last sequence received
func (jb *JitterBuffer) Peek(playoutHead bool) (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.packets.Length() < 1 {
return nil, ErrBufferUnderrun
}

Check warning on line 164 in pkg/jitterbuffer/jitter_buffer.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/jitter_buffer.go#L163-L164

Added lines #L163 - L164 were not covered by tests
if playoutHead && jb.state == Emitting {
return jb.packets.Find(jb.playoutHead)
}
return jb.packets.Find(jb.lastSequence)
}

// Pop an RTP packet from the jitter buffer at the current playout head
func (jb *JitterBuffer) Pop() (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.state != Emitting {
return nil, ErrPopWhileBuffering
}

Check warning on line 177 in pkg/jitterbuffer/jitter_buffer.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/jitter_buffer.go#L176-L177

Added lines #L176 - L177 were not covered by tests
packet, err := jb.packets.PopAt(jb.playoutHead)
if err != nil {
jb.stats.underflowCount++
jb.emit(BufferUnderflow)
return (*rtp.Packet)(nil), err
}
jb.playoutHead = (jb.playoutHead + 1) % math.MaxUint16
jb.updateState()
return packet, nil
}

// PopAtTimestamp pops an RTP packet from the jitter buffer with the provided timestamp
// Call this method repeatedly to drain the buffer at the timestamp
func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.state != Emitting {
return nil, ErrPopWhileBuffering
}

Check warning on line 196 in pkg/jitterbuffer/jitter_buffer.go

View check run for this annotation

Codecov / codecov/patch

pkg/jitterbuffer/jitter_buffer.go#L195-L196

Added lines #L195 - L196 were not covered by tests
packet, err := jb.packets.PopAtTimestamp(ts)
if err != nil {
jb.stats.underflowCount++
jb.emit(BufferUnderflow)
return (*rtp.Packet)(nil), err
}
jb.updateState()
return packet, nil
}
123 changes: 123 additions & 0 deletions pkg/jitterbuffer/jitter_buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"math"
"testing"

"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)

func TestJitterBuffer(t *testing.T) {
assert := assert.New(t)
t.Run("Appends packets in order", func(t *testing.T) {
jb := New()
assert.Equal(jb.lastSequence, uint16(0))
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}})

assert.Equal(jb.lastSequence, uint16(5002))

jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 512}, Payload: []byte{0x02}})

assert.Equal(jb.lastSequence, uint16(5012))
assert.Equal(jb.stats.outOfOrderCount, uint32(1))
assert.Equal(jb.packets.Length(), uint16(4))
assert.Equal(jb.lastSequence, uint16(5012))
})

t.Run("Appends packets and begins playout", func(t *testing.T) {
jb := New()
for i := 0; i < 100; i++ {
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
assert.Equal(jb.packets.Length(), uint16(100))
assert.Equal(jb.state, Emitting)
assert.Equal(jb.playoutHead, uint16(5012))
head, err := jb.Pop()
assert.Equal(head.SequenceNumber, uint16(5012))
assert.Equal(err, nil)
})
t.Run("Wraps playout correctly", func(t *testing.T) {
jb := New()
for i := 0; i < 100; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
assert.Equal(jb.packets.Length(), uint16(100))
assert.Equal(jb.state, Emitting)
assert.Equal(jb.playoutHead, uint16(math.MaxUint16-32))
head, err := jb.Pop()
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
for i := 0; i < 100; i++ {
head, err := jb.Pop()
if i < 99 {
assert.Equal(head.SequenceNumber, uint16((math.MaxUint16-31+i)%math.MaxUint16))
assert.Equal(err, nil)
} else {
assert.Equal(head, (*rtp.Packet)(nil))
}
}
})
t.Run("Pops at timestamp correctly", func(t *testing.T) {
jb := New()
for i := 0; i < 100; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
assert.Equal(jb.packets.Length(), uint16(100))
assert.Equal(jb.state, Emitting)
head, err := jb.PopAtTimestamp(uint32(513))
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32+1))
assert.Equal(err, nil)
head, err = jb.PopAtTimestamp(uint32(513))
assert.Equal(head, (*rtp.Packet)(nil))
assert.NotEqual(err, nil)

head, err = jb.Pop()
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})
t.Run("Can peek at a packet", func(t *testing.T) {
jb := New()
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}})
pkt, err := jb.Peek(false)
assert.Equal(pkt.SequenceNumber, uint16(5002))
assert.Equal(err, nil)
for i := 0; i < 100; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
pkt, err = jb.Peek(true)
assert.Equal(pkt.SequenceNumber, uint16(5000))
assert.Equal(err, nil)
})
t.Run("Pops at timestamp with multiple packets", func(t *testing.T) {
jb := New()
for i := 0; i < 50; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1019, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1020, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
assert.Equal(jb.packets.Length(), uint16(52))
assert.Equal(jb.state, Emitting)
head, err := jb.PopAtTimestamp(uint32(9000))
assert.Equal(head.SequenceNumber, uint16(1019))
assert.Equal(err, nil)
head, err = jb.PopAtTimestamp(uint32(9000))
assert.Equal(head.SequenceNumber, uint16(1020))
assert.Equal(err, nil)

head, err = jb.Pop()
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})
}
Loading
Loading