Skip to content

Source Code Analysis

yangjiechina edited this page Nov 20, 2024 · 1 revision

前言

      在介绍系统架构之前,我们先回忆日常对流媒体服务的使用场景,无非就是推流端推流到服务器,再由服务器输出拉流端所需的流协议。单论业务逻辑,流媒体服务相比其他动辄数以百计的接口的业务系统可谓是简单之极。本文也将按照此逻辑流程,着重介绍每个角色在代码中是如何实现的,交互时是如何处理的。再多提一句,LKM定位是直播服务器,不涉及复杂的传输协议。初衷是为大家提供一款简单、纯粹的流媒体服务器,这点大家在代码中将深有体会,所以大家大可不必因陌生而产生恐惧。

如何做到高并发

      高并发的评判标准是什么?需要参考哪些指标?在提供同等服务质量的情况下,所消耗的CPU、内存更低。其中CPU指标更为重要,因为CPU过载,将对整个系统,所有服务带来影响。同样如果毫无节制的使用内存,造成内存不足进而引发与磁盘交换空间,带来的影响也是系统级的。两者不是互斥关系,内存方面更多的是注意避免内存泄漏,以及合理有效的使用内存缓存。所以下面只会围绕CPU的影响条件来展开讲。常见的消耗CPU多的地方:

  1. 锁开销大
  2. 与内核态的交互频率过高

降低锁开销

      以生产者和消费者模型为例,假如一个生产者,多个消费者,分别运行在不同的线程,为保证读写安全,会加以锁保护。锁的开销会随着消费者数量的增加,蹭蹭往上涨。想要优化降低锁的开销,需要对业务有深刻的理解,以及较强的代码功底。除此之外,还有另外一种常见做法,通过线程切换来做到无锁并发。举个例子,生产者线程内部维护着一个任务队列(cas或锁的粒度小),将生产和消费任务都交给任务队列由生产线程处理。这样生产者和消费者之间不会有锁竞争,影响业务运作。当然这样做,也并非十全十美,其本质是将矛盾转移给了操作系统,同样线程切换的消耗也是系统级的。如何抉择,需要结合系统业务,自行评估。而协程切换是在用户态进行的,创建和切换开销小,避免了频繁的线程切换对系统性能的影响。

降低与内核态的交互频率

      无他,加缓存。将多次写给fd的数据,合并一次写入。LKM中合并写的目的就在此,一次发送数百毫秒的传输流,降低频率,减少开销。

推流端封装

      LKM中推流源抽象为Source接口,实现类PublishSource。具体协议的的推流源负责握手、协议解析、解析出通用的AVStream和AVPacket,回调给PublishSource再生成各种协议的输出流,转发给拉流端。


type Source interface {
	// GetID 返回SourceID
	GetID() string

	SetID(id string)

	// Input 输入推流数据
	Input(data []byte) error

	// GetType 返回推流类型
	GetType() SourceType

	SetType(sourceType SourceType)

	// OriginStreams 返回推流的原始Streams
	OriginStreams() []utils.AVStream

	// TranscodeStreams 返回转码的Streams
	TranscodeStreams() []utils.AVStream

	// AddSink 添加Sink, 在此之前请确保Sink已经握手、授权通过. 如果Source还未WriteHeader,先将Sink添加到等待队列.
	// 匹配拉流期望的编码器, 创建TransStream或向已经存在TransStream添加Sink
	AddSink(sink Sink)

	// RemoveSink 删除Sink
	RemoveSink(sink Sink)
    ...
}

针对每个推流源,都将启动一个事件协程,从StreamPipe读取推流数据、交给推流源的Input函数处理。从MainContextEvents管道读取需要在事件协程执行的函数。


// LoopEvent 循环读取事件
func LoopEvent(source Source) {
	// 将超时计时器放在此处开启, 方便在退出的时候关闭
	var receiveTimer *time.Timer
	var idleTimer *time.Timer

	defer func() {
		log.Sugar.Debugf("主协程执行结束 source: %s", source.GetID())

		// 关闭计时器
		if receiveTimer != nil {
			receiveTimer.Stop()
		}
		if idleTimer != nil {
			idleTimer.Stop()
		}
	}()

	// 开启收流超时计时器
	if AppConfig.Hooks.IsEnableOnReceiveTimeout() && AppConfig.ReceiveTimeout > 0 {
		receiveTimer = StartReceiveDataTimer(source)
	}

	// 开启拉流空闲超时计时器
	if AppConfig.Hooks.IsEnableOnIdleTimeout() && AppConfig.IdleTimeout > 0 {
		idleTimer = StartIdleTimer(source)
	}

	for {
		select {
		// 读取推流数据
		case data := <-source.StreamPipe():
			if AppConfig.ReceiveTimeout > 0 {
				source.SetLastPacketTime(time.Now())
			}
            
			if err := source.Input(data); err != nil {
				log.Sugar.Errorf("处理输入流失败 释放source:%s err:%s", source.GetID(), err.Error())
				source.DoClose()
				return
			}

			break
		// 切换到主协程,执行该函数. 目的是用于无锁化处理推拉流的连接与断开, 推流源断开, 查询推流源信息等事件. 不要做耗时操作, 否则会影响推拉流.
		case event := <-source.MainContextEvents():
			event()

			if source.IsClosed() {
				// 处理推流管道剩余的数据?
				return
			}

			break
		}
	}
}

RTMP对Input处理,从rtmp_stack解析出Audio和Video的Message:


func (p *Publisher) Input(data []byte) error {
	return p.stack.Input(nil, data)
}

GB28181对Input处理,解析rtp包,再解析PS流:


func (source *BaseGBSource) Input(data []byte) error {
	// 国标级联转发
	for _, transStream := range source.TransStreams {
		if transStream.GetProtocol() != stream.TransStreamGBStreamForward {
			continue
		}

		bytes := transStream.(*ForwardStream).WrapData(data)
		rtpPacket := [1][]byte{bytes}
		source.DispatchBuffer(transStream, -1, rtpPacket[:], -1, true)
	}

	packet := rtp.Packet{}
	_ = packet.Unmarshal(data)
	return source.deMuxerCtx.Input(packet.Payload)
}

1078对Input处理,丢给粘包解码器:


func (s *Session) Input(data []byte) error {
	return s.decoder.Input(data)
}

所有推流源,解析出AVpacket,都将回调到OnDeMuxPacket函数,再生成各种协议的输出流,转发给拉流端。

func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
	if AppConfig.GOPCache && s.existVideo {
		s.gopBuffer.AddPacket(packet)
	}

	// 分发给各个传输流
	for _, transStream := range s.TransStreams {
		s.DispatchPacket(transStream, packet)
	}

	// 未开启GOP缓存或只存在音频流, 释放掉内存
	if !AppConfig.GOPCache || !s.existVideo {
		s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeTail()
	}
}

// DispatchPacket 分发AVPacket
func (s *PublishSource) DispatchPacket(transStream TransStream, packet utils.AVPacket) {
	data, timestamp, videoKey, err := transStream.Input(packet)
	if err != nil || len(data) < 1 {
		return
	}

	s.DispatchBuffer(transStream, packet.Index(), data, timestamp, videoKey)
}

// DispatchBuffer 分发传输流
func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data [][]byte, timestamp int64, videoKey bool) {
	sinks := s.TransStreamSinks[transStream.GetID()]
	exist := transStream.IsExistVideo()
	for _, sink := range sinks {

		// 如果存在视频, 确保向sink发送的第一帧是关键帧
		if exist && sink.GetSentPacketCount() < 1 {
			if !videoKey {
				continue
			}

			if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
				s.write(sink, index, extraData, timestamp)
			}
		}

		s.write(sink, index, data, timestamp)
	}
}

// 向sink推流
func (s *PublishSource) write(sink Sink, index int, data [][]byte, timestamp int64) {
	err := sink.Write(index, data, timestamp)
	if err == nil {
		sink.IncreaseSentPacketCount()
		//return
	}

	// 推流超时, 可能是服务器或拉流端带宽不够、拉流端不读取数据等情况造成内核发送缓冲区满, 进而阻塞.
	// 直接关闭连接. 当然也可以将sink先挂起, 后续再继续推流.
	_, ok := err.(*transport.ZeroWindowSizeError)
	if ok {
		log.Sugar.Errorf("向sink推流超时,关闭连接. sink: %s", sink.GetID())
		sink.Close()
	}
}

拉流和结束拉流都切换到事件协程执行。


func (s *PublishSource) PostEvent(cb func()) {
	s.mainContextEvents <- cb
}

func (s *PublishSource) AddSink(sink Sink) {
	s.PostEvent(func() {
		if !s.completed {
			AddSinkToWaitingQueue(sink.GetSourceID(), sink)
		} else {
			if !s.doAddSink(sink) {
				sink.Close()
			}
		}
	})
}

func (s *PublishSource) RemoveSink(sink Sink) {
	s.PostEvent(func() {
		s.doRemoveSink(sink)
	})
}

拉流端封装

拉流端封装为Sink。每个拉流Server创建对应协议的Sink,添加到Source时,根据拉流协议,创建对应的TransStream。

RTMP创建Sink:


func (s *Session) OnPlay(app, stream_ string) utils.HookState {
	streamName, values := stream.ParseUrl(stream_)

	sourceId := s.generateSourceID(app, streamName)
	sink := NewSink(stream.NetAddr2SinkId(s.conn.RemoteAddr()), sourceId, s.conn, s.stack)
	sink.SetUrlValues(values)

	log.Sugar.Infof("rtmp onplay app: %s stream: %s sink: %v conn: %s", app, stream_, sink.GetID(), s.conn.RemoteAddr().String())

	_, state := stream.PreparePlaySink(sink)
	if utils.HookStateOK != state {
		log.Sugar.Errorf("rtmp拉流失败 source: %s sink: %s", sourceId, sink.GetID())
	} else {
		s.handle = sink
	}

	return state
}

Http-FLV创建Sink:


func (api *ApiServer) onHttpFLV(sourceId string, w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "video/x-flv")
	w.Header().Set("Connection", "Keep-Alive")
	w.Header().Set("Transfer-Encoding", "chunked")

	hj, ok := w.(http.Hijacker)
	if !ok {
		http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)
	conn, _, err := hj.Hijack()
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	sink := flv.NewFLVSink(api.generateSinkID(r.RemoteAddr), sourceId, conn)
	sink.SetUrlValues(r.URL.Query())
	log.Sugar.Infof("http-flv 连接 sink:%s", sink.String())

	_, state := stream.PreparePlaySink(sink)
	if utils.HookStateOK != state {
		log.Sugar.Warnf("http-flv 播放失败 sink:%s", sink.String())

		w.WriteHeader(http.StatusForbidden)
		return
	}

	bytes := make([]byte, 64)
	for {
		if _, err := conn.Read(bytes); err != nil {
			log.Sugar.Infof("http-flv 断开连接 sink:%s", sink.String())
			sink.Close()
			break
		}
	}
}

Source创建对应的TransStream。


// 创建sink需要的输出流
func (s *PublishSource) doAddSink(sink Sink) bool {
    ....

	var streams [5]utils.AVStream
	var size int

	for _, stream_ := range s.originStreams.All() {
		if disableVideo && stream_.Type() == utils.AVMediaTypeVideo {
			continue
		}

		streams[size] = stream_
		size++
	}

    // 查找或创建对应的TransStream
	transStreamId := GenerateTransStreamID(sink.GetProtocol(), streams[:size]...)
	transStream, exist := s.TransStreams[transStreamId]
	if !exist {
		var err error
		transStream, err = s.CreateTransStream(transStreamId, sink.GetProtocol(), streams[:size])
		if err != nil {
			log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID)
			return false
		}

		s.TransStreams[transStreamId] = transStream
	}

	sink.SetTransStreamID(transStreamId)

	{
		sink.Lock()
		defer sink.UnLock()

		if SessionStateClosed == sink.GetState() {
			log.Sugar.Warnf("AddSink失败, sink已经断开连接 %s", sink.String())
		} else {
			sink.SetState(SessionStateTransferring)
		}
	}

	err := sink.StartStreaming(transStream)
	if err != nil {
		log.Sugar.Errorf("开始推流失败 err: %s", err.Error())
		return false
	}

	// 还没做好准备(rtsp拉流还在协商sdp中), 暂不推流
	if !sink.IsReady() {
		return true
	}

	// TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响.
	conn, ok := sink.GetConn().(*transport.Conn)
	if ok && sink.IsTCPStreaming() && transStream.OutStreamBufferCapacity() > 2 {
		conn.EnableAsyncWriteMode(transStream.OutStreamBufferCapacity() - 2)
	}

	// 发送已有的缓存数据
	// 此处发送缓存数据,必须要存在关键帧的输出流才发,否则等DispatchPacket时再发送extra。
	data, timestamp, _ := transStream.ReadKeyFrameBuffer()
	if len(data) > 0 {
		if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
			s.write(sink, 0, extraData, timestamp)
		}

		s.write(sink, 0, data, timestamp)
	}

	// 累加拉流计数
	if s.recordSink != sink {
		s.sinkCount++
		log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
	}

	s.sinks[sink.GetID()] = sink
	s.TransStreamSinks[transStreamId][sink.GetID()] = sink

	// 新建传输流,发送已经缓存的音视频帧
	if !exist && AppConfig.GOPCache && s.existVideo {
		s.DispatchGOPBuffer(transStream)
	}

	return true
}

传输流封装

TranStream 调用Input函数输入AVPacket,按照流协议封装音视频帧,返回传输流。需要注意一点,目前传输流只会生成一份,转发给该协议的所有Sink,如果某个Sink拉流从未断开,而Source是重新推流,该Sink会出现时间戳错误,可能会影响到播放。

RTMP封装传输流,Input返回值分别为:由chunk组成的合并写块、时间戳、合并写块是否存在关键视频帧。ReadExtraData函数返回传输流的编码器信息,ReadKeyFrameBuffer函数返回最近的包含视频关键帧的合并写切片。Sink在拉流时,会优先调用这2个函数,将最新的传输流缓存发送给Sink,实现视频秒开。

func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error) {
	t.ClearOutStreamBuffer()

	var data []byte
	var chunk *librtmp.Chunk
	var videoPkt bool
	var videoKey bool
	// rtmp chunk消息体的数据大小
	var payloadSize int
	// 先向rtmp buffer写的flv头,再按照chunk size分割,所以第一个chunk要跳过flv头大小
	var chunkPayloadOffset int
	var dts int64
	var pts int64

	dts = packet.ConvertDts(1000)
	pts = packet.ConvertPts(1000)
	ct := pts - dts

	if utils.AVMediaTypeAudio == packet.MediaType() {
		data = packet.Data()
		chunk = &t.audioChunk
		chunkPayloadOffset = 2
		payloadSize += chunkPayloadOffset + len(data)
	} else if utils.AVMediaTypeVideo == packet.MediaType() {
		videoPkt = true
		videoKey = packet.KeyFrame()
		data = packet.AVCCPacketData()
		chunk = &t.videoChunk
		chunkPayloadOffset = t.muxer.ComputeVideoDataSize(uint32(ct))
		payloadSize += chunkPayloadOffset + len(data)
	}

	// 遇到视频关键帧, 发送剩余的流, 创建新切片
	if videoKey {
		if segment := t.MWBuffer.FlushSegment(); len(segment) > 0 {
			t.AppendOutStreamBuffer(segment)
		}
	}

	// 分配内存
	// 固定type0
	chunkHeaderSize := 12
	// type3chunk数量
	numChunks := (payloadSize - 1) / t.chunkSize
	rtmpMsgSize := chunkHeaderSize + payloadSize + numChunks
	// 如果时间戳超过3字节, 每个chunk都需要多4字节的扩展时间戳
	if dts >= 0xFFFFFF && dts <= 0xFFFFFFFF {
		rtmpMsgSize += (1 + numChunks) * 4
	}

	allocate := t.MWBuffer.Allocate(rtmpMsgSize, dts, videoKey)

	// 写chunk header
	chunk.Length = payloadSize
	chunk.Timestamp = uint32(dts)
	n := chunk.ToBytes(allocate)

	// 写flv
	if videoPkt {
		n += t.muxer.WriteVideoData(allocate[n:], uint32(ct), packet.KeyFrame(), false)
	} else {
		n += t.muxer.WriteAudioData(allocate[n:], false)
	}

	n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset)
	utils.Assert(len(allocate) == n)

	// 合并写满了再发
	if segment := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
		t.AppendOutStreamBuffer(segment)
	}

	return t.OutBuffer[:t.OutBufferSize], 0, true, nil
}

func (t *transStream) ReadExtraData(_ int64) ([][]byte, int64, error) {
	utils.Assert(t.headerSize > 0)

	// 发送sequence header
	return [][]byte{t.header[:t.headerSize]}, 0, nil
}

func (t *transStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
	t.ClearOutStreamBuffer()

	// 发送当前内存池已有的合并写切片
	t.MWBuffer.ReadSegmentsFromKeyFrameIndex(func(bytes []byte) {
		t.AppendOutStreamBuffer(bytes)
	})

	return t.OutBuffer[:t.OutBufferSize], 0, nil
}