diff --git a/pkg/jitterbuffer/jitter_buffer.go b/pkg/jitterbuffer/jitter_buffer.go new file mode 100644 index 00000000..069a136e --- /dev/null +++ b/pkg/jitterbuffer/jitter_buffer.go @@ -0,0 +1,186 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT +/* + JitterBuffer provides a buffer for RTP packets designed to help + counteract non-deterministic sources of latency +*/ +package jitterbuffer + +import ( + "errors" + "math" + "sync" + + "github.com/pion/rtp" +) + +type JitterBufferState uint16 + +type JitterBufferEvent string + +var ( + BufferUnderrun = errors.New("Invalid Peek: Empty jitter buffer") + PopWhileBuffering = errors.New("Attempt to pop while buffering") +) + +const ( + Buffering JitterBufferState = iota + Emitting +) + +const ( + StartBuffering JitterBufferEvent = "startBuffering" + BeginPlayback = "playing" + BufferUnderflow = "underflow" + BufferOverflow = "overflow" +) + +func (jbs JitterBufferState) String() string { + switch jbs { + case Buffering: + return "Buffering" + case Emitting: + return "Emitting" + } + return "unknown" +} + +type JitterBufferOption struct { +} +type ( + Option func(jb *JitterBuffer) + JitterBufferEventListener func(event JitterBufferEvent, jb *JitterBuffer) +) + +type JitterBuffer struct { + packets *PriorityQueue + last_sequence uint16 + playout_head uint16 + playout_ready bool + state JitterBufferState + sample_rate uint16 + payload_sample_rate int + max_depth int + stats JitterBufferStats + listeners []JitterBufferEventListener + mutex sync.Mutex +} + +type JitterBufferStats struct { + out_of_order_count uint32 + empty_count uint32 + underflow_count uint32 + overflow_count uint32 + jitter float32 + max_jitter float32 +} + +// New will initialize a jitter buffer and its associated statistics +func New(opts ...Option) *JitterBuffer { + jb := &JitterBuffer{state: Buffering, stats: JitterBufferStats{0, 0, 0, 0, .0, .0}, packets: NewQueue()} + for _, o := range opts { + o(jb) + } + return jb +} + +// The jitter buffer may emit events correspnding, interested listerns should +// look at JitterBufferEvent for available events +func (jb *JitterBuffer) Listen(event JitterBufferEvent, cb JitterBufferEventListener) { + jb.listeners = append(jb.listeners, cb) +} + +func (jb *JitterBuffer) updateStats(last_packet_seq_no uint16) { + // If we have at least one packet, and the next packet being pushed in is not + // at the expected sequence number increment the out of order count + if jb.packets.Length() > 0 && last_packet_seq_no != ((jb.last_sequence+1)%math.MaxUint16) { + jb.stats.out_of_order_count++ + } + jb.last_sequence = last_packet_seq_no +} + +// Push an RTP packet into the jitter buffer, this does not clone +// the data so if the memory is expected to be reused, the caller should +// take this in to account and pass a copy of the packet they wish to buffer +func (jb *JitterBuffer) Push(packet *rtp.Packet) { + jb.mutex.Lock() + defer jb.mutex.Unlock() + if jb.packets.Length() > 100 { + jb.stats.overflow_count++ + jb.emit(BufferOverflow) + } + if !jb.playout_ready && jb.packets.Length() == 0 { + jb.playout_head = packet.SequenceNumber + } + jb.updateStats(packet.SequenceNumber) + jb.packets.Push(packet, packet.SequenceNumber) + jb.updateState() +} + +func (jb *JitterBuffer) emit(event JitterBufferEvent) { + for _, l := range jb.listeners { + l(event, jb) + } +} + +func (jb *JitterBuffer) updateState() { + // For now, we only look at the number of packets captured in the play buffer + if jb.packets.Length() >= 50 && jb.state == Buffering { + jb.state = Emitting + jb.emit(BeginPlayback) + } +} + +// Peek at the packet which is either: +// +// At the playout head when we are emitting, and the playoutHead flag is true +// +// or else +// +// At the last sequence received +func (jb *JitterBuffer) Peek(playoutHead bool) (*rtp.Packet, error) { + jb.mutex.Lock() + defer jb.mutex.Unlock() + if jb.packets.Length() < 1 { + return nil, BufferUnderrun + } + if playoutHead && jb.state == Emitting { + return jb.packets.Find(jb.playout_head) + } + return jb.packets.Find(jb.last_sequence) +} + +// Pop an RTP packet from the jitter buffer at the current playout head +func (jb *JitterBuffer) Pop() (*rtp.Packet, error) { + jb.mutex.Lock() + defer jb.mutex.Unlock() + if jb.state != Emitting { + return nil, PopWhileBuffering + } + packet, err := jb.packets.PopAt(jb.playout_head) + if err != nil { + jb.stats.underflow_count++ + jb.emit(BufferUnderflow) + return (*rtp.Packet)(nil), err + } + jb.playout_head = (jb.playout_head + 1) % math.MaxUint16 + jb.updateState() + return packet, nil +} + +// Pop an RTP packet from the jitter buffer at the current playout head +func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) { + jb.mutex.Lock() + defer jb.mutex.Unlock() + if jb.state != Emitting { + return nil, PopWhileBuffering + } + packet, err := jb.packets.PopAtTimestamp(ts) + if err != nil { + jb.stats.underflow_count++ + jb.emit(BufferUnderflow) + return (*rtp.Packet)(nil), err + } + jb.updateState() + return packet, nil +} diff --git a/pkg/jitterbuffer/jitter_buffer_test.go b/pkg/jitterbuffer/jitter_buffer_test.go new file mode 100644 index 00000000..3030711e --- /dev/null +++ b/pkg/jitterbuffer/jitter_buffer_test.go @@ -0,0 +1,88 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "math" + "testing" + + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestJitterBuffer(t *testing.T) { + assert := assert.New(t) + t.Run("Appends packets in order", func(t *testing.T) { + jb := New() + assert.Equal(jb.last_sequence, uint16(0)) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}}) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}}) + + assert.Equal(jb.last_sequence, uint16(5002)) + + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 512}, Payload: []byte{0x02}}) + + assert.Equal(jb.last_sequence, uint16(5012)) + assert.Equal(jb.stats.out_of_order_count, uint32(1)) + assert.Equal(jb.packets.Length(), uint16(4)) + assert.Equal(jb.last_sequence, uint16(5012)) + }) + + t.Run("Appends packets and begins playout", func(t *testing.T) { + jb := New() + for i := 0; i < 100; i++ { + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) + } + assert.Equal(jb.packets.Length(), uint16(100)) + assert.Equal(jb.state, Emitting) + assert.Equal(jb.playout_head, uint16(5012)) + head, err := jb.Pop() + assert.Equal(head.SequenceNumber, uint16(5012)) + assert.Equal(err, nil) + }) + t.Run("Wraps playout correctly", func(t *testing.T) { + jb := New() + for i := 0; i < 100; i++ { + + sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) + } + assert.Equal(jb.packets.Length(), uint16(100)) + assert.Equal(jb.state, Emitting) + assert.Equal(jb.playout_head, uint16(math.MaxUint16-32)) + head, err := jb.Pop() + assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) + assert.Equal(err, nil) + for i := 0; i < 100; i++ { + head, err := jb.Pop() + if i < 99 { + assert.Equal(head.SequenceNumber, uint16((math.MaxUint16-31+i)%math.MaxUint16)) + assert.Equal(err, nil) + } else { + assert.Equal(head, (*rtp.Packet)(nil)) + } + } + }) + t.Run("Pops at timestamp correctly", func(t *testing.T) { + jb := New() + for i := 0; i < 100; i++ { + + sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) + } + assert.Equal(jb.packets.Length(), uint16(100)) + assert.Equal(jb.state, Emitting) + head, err := jb.PopAtTimestamp(uint32(513)) + assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32+1)) + assert.Equal(err, nil) + head, err = jb.PopAtTimestamp(uint32(513)) + assert.Equal(head, (*rtp.Packet)(nil)) + assert.NotEqual(err, nil) + + head, err = jb.Pop() + assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) + assert.Equal(err, nil) + }) +} diff --git a/pkg/jitterbuffer/priority_queue.go b/pkg/jitterbuffer/priority_queue.go new file mode 100644 index 00000000..7092d14d --- /dev/null +++ b/pkg/jitterbuffer/priority_queue.go @@ -0,0 +1,164 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "errors" + + "github.com/pion/rtp" +) + +type PriorityQueue struct { + next *node + length uint16 +} + +type node struct { + val *rtp.Packet + next *node + prev *node + prio uint16 +} + +var InvalidOperation = errors.new("Attempt to find or pop on an empty list") +var NotFound = errors.new("Priority not found") + +func NewQueue() *PriorityQueue { + return &PriorityQueue{ + next: nil, + length: 0, + } +} + +func newNode(val *rtp.Packet, prio uint16) *node { + return &node{ + val: val, + prev: nil, + next: nil, + prio: prio, + } +} + +func (q *PriorityQueue) Find(sqNum uint16) (*rtp.Packet, error) { + if q.next.prio == sqNum { + return q.next.val, nil + } + + if sqNum < q.next.prio { + return nil, InvalidOperation + } + next := q.next + for next != nil { + if next.prio == sqNum { + return next.val, nil + } + next = next.next + } + return nil, NotFound +} + +func (q *PriorityQueue) Push(val *rtp.Packet, prio uint16) { + newPq := newNode(val, prio) + if q.next == nil { + q.next = newPq + q.length++ + return + } + if prio < q.next.prio { + newPq.next = q.next + q.next.prev = newPq + q.next = newPq + q.length++ + return + } + head := q.next + prev := q.next + for head != nil { + if prio <= head.prio { + break + } + prev = head + head = head.next + } + if head == nil { + if prev != nil { + prev.next = newPq + } + newPq.prev = prev + } else { + newPq.next = head + newPq.prev = prev + if prev != nil { + prev.next = newPq + } + head.prev = newPq + } + q.length++ +} + +func (q *PriorityQueue) Length() uint16 { + return q.length +} + +func (q *PriorityQueue) Pop() (*rtp.Packet, error) { + if q.next == nil { + return nil, InvalidOperation + } + val := q.next.val + q.length-- + q.next = q.next.next + return val, nil +} + +func (q *PriorityQueue) PopAt(sqNum uint16) (*rtp.Packet, error) { + if q.next == nil { + return nil, InvalidOperation + } + if q.next.prio == sqNum { + val := q.next.val + q.next = q.next.next + return val, nil + } + pos := q.next + prev := q.next.prev + for pos != nil { + if pos.prio == sqNum { + val := pos.val + prev.next = pos.next + if prev.next != nil { + prev.next.prev = prev + } + return val, nil + } + prev = pos + pos = pos.next + } + return nil, NotFound +} + +func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) { + if q.next == nil { + return nil, InvalidOperation + } + if q.next.val.Timestamp == timestamp { + val := q.next.val + q.next = q.next.next + return val, nil + } + pos := q.next + prev := q.next.prev + for pos != nil { + if pos.val.Timestamp == timestamp { + val := pos.val + prev.next = pos.next + if prev.next != nil { + prev.next.prev = prev + } + return val, nil + } + prev = pos + pos = pos.next + } + return nil, NotFound +} diff --git a/pkg/jitterbuffer/priority_queue_test.go b/pkg/jitterbuffer/priority_queue_test.go new file mode 100644 index 00000000..f9b9b9ab --- /dev/null +++ b/pkg/jitterbuffer/priority_queue_test.go @@ -0,0 +1,70 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "testing" + + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestPriorityQueue(t *testing.T) { + assert := assert.New(t) + t.Run("Appends packets in order", func(t *testing.T) { + pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}} + q := NewQueue() + q.Push(pkt, pkt.SequenceNumber) + pkt2 := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5004, Timestamp: 500}, Payload: []byte{0x02}} + q.Push(pkt2, pkt2.SequenceNumber) + assert.Equal(q.next.next.val, pkt2) + assert.Equal(q.next.prio, uint16(5000)) + assert.Equal(q.next.next.prio, uint16(5004)) + }) + t.Run("Appends many in order", func(t *testing.T) { + q := NewQueue() + for i := 0; i < 100; i++ { + q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i)) + } + assert.Equal(uint16(100), q.Length()) + last := (*node)(nil) + cur := q.next + for cur != nil { + last = cur + cur = cur.next + if cur != nil { + assert.Equal(cur.prio, last.prio+1) + } + } + assert.Equal(q.next.prio, uint16(5012)) + assert.Equal(last.prio, uint16(5012+99)) + }) + t.Run("Can remove an element", func(t *testing.T) { + pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}} + q := NewQueue() + q.Push(pkt, pkt.SequenceNumber) + pkt2 := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5004, Timestamp: 500}, Payload: []byte{0x02}} + q.Push(pkt2, pkt2.SequenceNumber) + for i := 0; i < 100; i++ { + q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i)) + } + popped, _ := q.Pop() + assert.Equal(popped.SequenceNumber, uint16(5000)) + _, _ = q.Pop() + nextPop, _ := q.Pop() + assert.Equal(nextPop.SequenceNumber, uint16(5012)) + }) + t.Run("Appends in order", func(t *testing.T) { + q := NewQueue() + for i := 0; i < 100; i++ { + q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i)) + } + assert.Equal(uint16(100), q.Length()) + pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}} + q.Push(pkt, pkt.SequenceNumber) + assert.Equal(pkt, q.next.val) + assert.Equal(uint16(101), q.Length()) + assert.Equal(q.next.prio, uint16(5000)) + }) +}