-
Notifications
You must be signed in to change notification settings - Fork 66
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add an initial, very simple, jitter buffer
- Loading branch information
1 parent
450ac84
commit 2f8ea21
Showing
4 changed files
with
508 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly> | ||
// SPDX-License-Identifier: MIT | ||
/* | ||
JitterBuffer provides a buffer for RTP packets designed to help | ||
counteract non-deterministic sources of latency | ||
*/ | ||
package jitterbuffer | ||
|
||
import ( | ||
"errors" | ||
"math" | ||
"sync" | ||
|
||
"github.com/pion/rtp" | ||
) | ||
|
||
type JitterBufferState uint16 | ||
|
||
type JitterBufferEvent string | ||
|
||
var ( | ||
BufferUnderrun = errors.New("Invalid Peek: Empty jitter buffer") | ||
PopWhileBuffering = errors.New("Attempt to pop while buffering") | ||
) | ||
|
||
const ( | ||
Buffering JitterBufferState = iota | ||
Emitting | ||
) | ||
|
||
const ( | ||
StartBuffering JitterBufferEvent = "startBuffering" | ||
BeginPlayback = "playing" | ||
BufferUnderflow = "underflow" | ||
BufferOverflow = "overflow" | ||
) | ||
|
||
func (jbs JitterBufferState) String() string { | ||
switch jbs { | ||
case Buffering: | ||
return "Buffering" | ||
case Emitting: | ||
return "Emitting" | ||
} | ||
return "unknown" | ||
} | ||
|
||
type JitterBufferOption struct { | ||
} | ||
type ( | ||
Option func(jb *JitterBuffer) | ||
JitterBufferEventListener func(event JitterBufferEvent, jb *JitterBuffer) | ||
) | ||
|
||
type JitterBuffer struct { | ||
packets *PriorityQueue | ||
last_sequence uint16 | ||
playout_head uint16 | ||
playout_ready bool | ||
state JitterBufferState | ||
sample_rate uint16 | ||
payload_sample_rate int | ||
max_depth int | ||
stats JitterBufferStats | ||
listeners []JitterBufferEventListener | ||
mutex sync.Mutex | ||
} | ||
|
||
type JitterBufferStats struct { | ||
out_of_order_count uint32 | ||
empty_count uint32 | ||
underflow_count uint32 | ||
overflow_count uint32 | ||
jitter float32 | ||
max_jitter float32 | ||
} | ||
|
||
// New will initialize a jitter buffer and its associated statistics | ||
func New(opts ...Option) *JitterBuffer { | ||
jb := &JitterBuffer{state: Buffering, stats: JitterBufferStats{0, 0, 0, 0, .0, .0}, packets: NewQueue()} | ||
for _, o := range opts { | ||
o(jb) | ||
} | ||
return jb | ||
} | ||
|
||
// The jitter buffer may emit events correspnding, interested listerns should | ||
// look at JitterBufferEvent for available events | ||
func (jb *JitterBuffer) Listen(event JitterBufferEvent, cb JitterBufferEventListener) { | ||
jb.listeners = append(jb.listeners, cb) | ||
} | ||
|
||
func (jb *JitterBuffer) updateStats(last_packet_seq_no uint16) { | ||
// If we have at least one packet, and the next packet being pushed in is not | ||
// at the expected sequence number increment the out of order count | ||
if jb.packets.Length() > 0 && last_packet_seq_no != ((jb.last_sequence+1)%math.MaxUint16) { | ||
jb.stats.out_of_order_count++ | ||
} | ||
jb.last_sequence = last_packet_seq_no | ||
} | ||
|
||
// Push an RTP packet into the jitter buffer, this does not clone | ||
// the data so if the memory is expected to be reused, the caller should | ||
// take this in to account and pass a copy of the packet they wish to buffer | ||
func (jb *JitterBuffer) Push(packet *rtp.Packet) { | ||
jb.mutex.Lock() | ||
defer jb.mutex.Unlock() | ||
if jb.packets.Length() > 100 { | ||
jb.stats.overflow_count++ | ||
jb.emit(BufferOverflow) | ||
} | ||
if !jb.playout_ready && jb.packets.Length() == 0 { | ||
jb.playout_head = packet.SequenceNumber | ||
} | ||
jb.updateStats(packet.SequenceNumber) | ||
jb.packets.Push(packet, packet.SequenceNumber) | ||
jb.updateState() | ||
} | ||
|
||
func (jb *JitterBuffer) emit(event JitterBufferEvent) { | ||
for _, l := range jb.listeners { | ||
l(event, jb) | ||
} | ||
} | ||
|
||
func (jb *JitterBuffer) updateState() { | ||
// For now, we only look at the number of packets captured in the play buffer | ||
if jb.packets.Length() >= 50 && jb.state == Buffering { | ||
jb.state = Emitting | ||
jb.emit(BeginPlayback) | ||
} | ||
} | ||
|
||
// Peek at the packet which is either: | ||
// | ||
// At the playout head when we are emitting, and the playoutHead flag is true | ||
// | ||
// or else | ||
// | ||
// At the last sequence received | ||
func (jb *JitterBuffer) Peek(playoutHead bool) (*rtp.Packet, error) { | ||
jb.mutex.Lock() | ||
defer jb.mutex.Unlock() | ||
if jb.packets.Length() < 1 { | ||
return nil, BufferUnderrun | ||
} | ||
if playoutHead && jb.state == Emitting { | ||
return jb.packets.Find(jb.playout_head) | ||
} | ||
return jb.packets.Find(jb.last_sequence) | ||
} | ||
|
||
// Pop an RTP packet from the jitter buffer at the current playout head | ||
func (jb *JitterBuffer) Pop() (*rtp.Packet, error) { | ||
jb.mutex.Lock() | ||
defer jb.mutex.Unlock() | ||
if jb.state != Emitting { | ||
return nil, PopWhileBuffering | ||
} | ||
packet, err := jb.packets.PopAt(jb.playout_head) | ||
if err != nil { | ||
jb.stats.underflow_count++ | ||
jb.emit(BufferUnderflow) | ||
return (*rtp.Packet)(nil), err | ||
} | ||
jb.playout_head = (jb.playout_head + 1) % math.MaxUint16 | ||
jb.updateState() | ||
return packet, nil | ||
} | ||
|
||
// Pop an RTP packet from the jitter buffer at the current playout head | ||
func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) { | ||
jb.mutex.Lock() | ||
defer jb.mutex.Unlock() | ||
if jb.state != Emitting { | ||
return nil, PopWhileBuffering | ||
} | ||
packet, err := jb.packets.PopAtTimestamp(ts) | ||
if err != nil { | ||
jb.stats.underflow_count++ | ||
jb.emit(BufferUnderflow) | ||
return (*rtp.Packet)(nil), err | ||
} | ||
jb.updateState() | ||
return packet, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly> | ||
// SPDX-License-Identifier: MIT | ||
|
||
package jitterbuffer | ||
|
||
import ( | ||
"math" | ||
"testing" | ||
|
||
"github.com/pion/rtp" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestJitterBuffer(t *testing.T) { | ||
assert := assert.New(t) | ||
t.Run("Appends packets in order", func(t *testing.T) { | ||
jb := New() | ||
assert.Equal(jb.last_sequence, uint16(0)) | ||
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}) | ||
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}}) | ||
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}}) | ||
|
||
assert.Equal(jb.last_sequence, uint16(5002)) | ||
|
||
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 512}, Payload: []byte{0x02}}) | ||
|
||
assert.Equal(jb.last_sequence, uint16(5012)) | ||
assert.Equal(jb.stats.out_of_order_count, uint32(1)) | ||
assert.Equal(jb.packets.Length(), uint16(4)) | ||
assert.Equal(jb.last_sequence, uint16(5012)) | ||
}) | ||
|
||
t.Run("Appends packets and begins playout", func(t *testing.T) { | ||
jb := New() | ||
for i := 0; i < 100; i++ { | ||
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) | ||
} | ||
assert.Equal(jb.packets.Length(), uint16(100)) | ||
assert.Equal(jb.state, Emitting) | ||
assert.Equal(jb.playout_head, uint16(5012)) | ||
head, err := jb.Pop() | ||
assert.Equal(head.SequenceNumber, uint16(5012)) | ||
assert.Equal(err, nil) | ||
}) | ||
t.Run("Wraps playout correctly", func(t *testing.T) { | ||
jb := New() | ||
for i := 0; i < 100; i++ { | ||
|
||
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) | ||
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) | ||
} | ||
assert.Equal(jb.packets.Length(), uint16(100)) | ||
assert.Equal(jb.state, Emitting) | ||
assert.Equal(jb.playout_head, uint16(math.MaxUint16-32)) | ||
head, err := jb.Pop() | ||
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) | ||
assert.Equal(err, nil) | ||
for i := 0; i < 100; i++ { | ||
head, err := jb.Pop() | ||
if i < 99 { | ||
assert.Equal(head.SequenceNumber, uint16((math.MaxUint16-31+i)%math.MaxUint16)) | ||
assert.Equal(err, nil) | ||
} else { | ||
assert.Equal(head, (*rtp.Packet)(nil)) | ||
} | ||
} | ||
}) | ||
t.Run("Pops at timestamp correctly", func(t *testing.T) { | ||
jb := New() | ||
for i := 0; i < 100; i++ { | ||
|
||
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) | ||
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) | ||
} | ||
assert.Equal(jb.packets.Length(), uint16(100)) | ||
assert.Equal(jb.state, Emitting) | ||
head, err := jb.PopAtTimestamp(uint32(513)) | ||
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32+1)) | ||
assert.Equal(err, nil) | ||
head, err = jb.PopAtTimestamp(uint32(513)) | ||
assert.Equal(head, (*rtp.Packet)(nil)) | ||
assert.NotEqual(err, nil) | ||
|
||
head, err = jb.Pop() | ||
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) | ||
assert.Equal(err, nil) | ||
}) | ||
} |
Oops, something went wrong.