From 4cda00c4e97d4b403f976ae1130badf21e846648 Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Fri, 31 Jan 2025 14:38:44 +0300 Subject: [PATCH] NACK responder: bypass auxiliary SSRCs --- pkg/nack/responder_interceptor.go | 5 ++ pkg/nack/responder_interceptor_test.go | 78 +++++++++++++++++++++++++- 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/pkg/nack/responder_interceptor.go b/pkg/nack/responder_interceptor.go index 58e34301..8c21b078 100644 --- a/pkg/nack/responder_interceptor.go +++ b/pkg/nack/responder_interceptor.go @@ -113,6 +113,11 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri n.streamsMu.Unlock() return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + // If this packet doesn't belong to the main SSRC, do not add it to rtpBuffer + if header.SSRC != info.SSRC { + return writer.Write(header, payload, attributes) + } + pkt, err := n.packetFactory.NewPacket(header, payload, info.SSRCRetransmission, info.PayloadTypeRetransmission) if err != nil { return 0, err diff --git a/pkg/nack/responder_interceptor_test.go b/pkg/nack/responder_interceptor_test.go index 019d85e5..c48d4fec 100644 --- a/pkg/nack/responder_interceptor_test.go +++ b/pkg/nack/responder_interceptor_test.go @@ -57,7 +57,7 @@ func TestResponderInterceptor(t *testing.T) { }() for _, seqNum := range []uint16{10, 11, 12, 14, 15} { - require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}})) + require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 1}})) select { case p := <-stream.WrittenRTP(): @@ -252,7 +252,7 @@ func TestResponderInterceptor_RFC4588(t *testing.T) { }() for _, seqNum := range []uint16{10, 11, 12, 14, 15} { - require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}})) + require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 1}})) select { case p := <-stream.WrittenRTP(): @@ -272,7 +272,7 @@ func TestResponderInterceptor_RFC4588(t *testing.T) { }, }) - // seq number 13 was never sent, so it can't be resent + // seq number 13 was never sent, so it can't be present for _, seqNum := range []uint16{11, 12, 15} { select { case p := <-stream.WrittenRTP(): @@ -290,3 +290,75 @@ func TestResponderInterceptor_RFC4588(t *testing.T) { case <-time.After(10 * time.Millisecond): } } + +func TestResponderInterceptor_BypassUnknownSSRCs(t *testing.T) { + f, err := NewResponderInterceptor( + ResponderSize(8), + ResponderLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ) + require.NoError(t, err) + + i, err := f.NewInterceptor("") + require.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 1, + RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}}, + }, i) + defer func() { + require.NoError(t, stream.Close()) + }() + + // Send some packets with both SSRCs to check that only SSRC=1 added to the buffer + for _, seqNum := range []uint16{10, 11, 12, 14, 15} { + require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 1}})) + // This packet should be bypassed and not added to the buffer. + require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 2}})) + + select { + case p := <-stream.WrittenRTP(): + require.Equal(t, seqNum, p.SequenceNumber) + require.Equal(t, uint32(1), p.SSRC) + case <-time.After(10 * time.Millisecond): + t.Fatal("written rtp packet not found") + } + + select { + case p := <-stream.WrittenRTP(): + require.Equal(t, seqNum, p.SequenceNumber) + require.Equal(t, uint32(2), p.SSRC) + case <-time.After(10 * time.Millisecond): + t.Fatal("written rtp packet not found") + } + } + + // This packet should be bypassed and not added to the buffer. + require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: 13, SSRC: 2}})) + select { + case p := <-stream.WrittenRTP(): + require.Equal(t, uint16(13), p.SequenceNumber) + case <-time.After(10 * time.Millisecond): + t.Fatal("written rtp packet not found") + } + + stream.ReceiveRTCP([]rtcp.Packet{ + &rtcp.TransportLayerNack{ + MediaSSRC: 1, + SenderSSRC: 1, + Nacks: []rtcp.NackPair{ + {PacketID: 11, LostPackets: 0b1011}, // sequence numbers: 11, 12, 13, 15 + }, + }, + }) + + // seq number 13 was sent with different ssrc, it should not be present + for _, seqNum := range []uint16{11, 12, 15} { + select { + case p := <-stream.WrittenRTP(): + require.Equal(t, uint32(1), p.SSRC) + require.Equal(t, seqNum, p.SequenceNumber) + case <-time.After(10 * time.Millisecond): + t.Fatal("written rtp packet not found") + } + } +}