Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NACK responder: bypass auxiliary SSRCs #310

Merged
merged 1 commit into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/nack/responder_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri
n.streamsMu.Unlock()

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
// If this packet doesn't belong to the main SSRC, do not add it to rtpBuffer
if header.SSRC != info.SSRC {
return writer.Write(header, payload, attributes)
}

pkt, err := n.packetFactory.NewPacket(header, payload, info.SSRCRetransmission, info.PayloadTypeRetransmission)
if err != nil {
return 0, err
Expand Down
78 changes: 75 additions & 3 deletions pkg/nack/responder_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestResponderInterceptor(t *testing.T) {
}()

for _, seqNum := range []uint16{10, 11, 12, 14, 15} {
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 1}}))

select {
case p := <-stream.WrittenRTP():
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestResponderInterceptor_RFC4588(t *testing.T) {
}()

for _, seqNum := range []uint16{10, 11, 12, 14, 15} {
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 1}}))

select {
case p := <-stream.WrittenRTP():
Expand All @@ -272,7 +272,7 @@ func TestResponderInterceptor_RFC4588(t *testing.T) {
},
})

// seq number 13 was never sent, so it can't be resent
// seq number 13 was never sent, so it can't be present
for _, seqNum := range []uint16{11, 12, 15} {
select {
case p := <-stream.WrittenRTP():
Expand All @@ -290,3 +290,75 @@ func TestResponderInterceptor_RFC4588(t *testing.T) {
case <-time.After(10 * time.Millisecond):
}
}

func TestResponderInterceptor_BypassUnknownSSRCs(t *testing.T) {
f, err := NewResponderInterceptor(
ResponderSize(8),
ResponderLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
require.NoError(t, err)

i, err := f.NewInterceptor("")
require.NoError(t, err)

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 1,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
}, i)
defer func() {
require.NoError(t, stream.Close())
}()

// Send some packets with both SSRCs to check that only SSRC=1 added to the buffer
for _, seqNum := range []uint16{10, 11, 12, 14, 15} {
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 1}}))
// This packet should be bypassed and not added to the buffer.
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 2}}))

select {
case p := <-stream.WrittenRTP():
require.Equal(t, seqNum, p.SequenceNumber)
require.Equal(t, uint32(1), p.SSRC)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtp packet not found")
}

select {
case p := <-stream.WrittenRTP():
require.Equal(t, seqNum, p.SequenceNumber)
require.Equal(t, uint32(2), p.SSRC)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtp packet not found")
}
}

// This packet should be bypassed and not added to the buffer.
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: 13, SSRC: 2}}))
select {
case p := <-stream.WrittenRTP():
require.Equal(t, uint16(13), p.SequenceNumber)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtp packet not found")
}

stream.ReceiveRTCP([]rtcp.Packet{
&rtcp.TransportLayerNack{
MediaSSRC: 1,
SenderSSRC: 1,
Nacks: []rtcp.NackPair{
{PacketID: 11, LostPackets: 0b1011}, // sequence numbers: 11, 12, 13, 15
},
},
})

// seq number 13 was sent with different ssrc, it should not be present
for _, seqNum := range []uint16{11, 12, 15} {
select {
case p := <-stream.WrittenRTP():
require.Equal(t, uint32(1), p.SSRC)
require.Equal(t, seqNum, p.SequenceNumber)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtp packet not found")
}
}
}
Loading