diff --git a/compositor_api/src/types.rs b/compositor_api/src/types.rs index b4a15ffb8..a9f47cc32 100644 --- a/compositor_api/src/types.rs +++ b/compositor_api/src/types.rs @@ -43,6 +43,7 @@ pub use register_input::Mp4Input; pub use register_input::RtpInput; pub use register_input::WhipInput; pub use register_output::Mp4Output; +pub use register_output::RtmpOutput; pub use register_output::RtpOutput; pub use register_output::WhipOutput; diff --git a/compositor_api/src/types/from_register_output.rs b/compositor_api/src/types/from_register_output.rs index 0e662b194..96d9a781f 100644 --- a/compositor_api/src/types/from_register_output.rs +++ b/compositor_api/src/types/from_register_output.rs @@ -10,6 +10,7 @@ use compositor_pipeline::pipeline::{ output::{ self, mp4::{Mp4AudioTrack, Mp4OutputOptions, Mp4VideoTrack}, + rtmp::{RtmpAudioTrack, RtmpVideoTrack}, whip::WhipAudioOptions, }, }; @@ -262,6 +263,65 @@ impl TryFrom for pipeline::RegisterOutputOptions for pipeline::RegisterOutputOptions { + type Error = TypeError; + + fn try_from(value: RtmpOutput) -> Result { + let RtmpOutput { url, video, audio } = value; + let video_track = video.as_ref().map(|v| match v.encoder { + VideoEncoderOptions::FfmpegH264 { .. } => RtmpVideoTrack { + width: v.resolution.width as u32, + height: v.resolution.height as u32, + }, + }); + let audio_track = audio.as_ref().map(|a| match &a.encoder { + RtmpAudioEncoderOptions::Aac { + channels, + sample_rate, + } => RtmpAudioTrack { + channels: channels.clone().into(), + sample_rate: sample_rate.unwrap_or(44100), + }, + }); + + let (video_encoder_options, output_video_options) = maybe_video_options(video)?; + let (audio_encoder_options, output_audio_options) = match audio { + Some(OutputRtmpAudioOptions { + mixing_strategy, + send_eos_when, + encoder, + initial, + }) => { + let audio_encoder_options: AudioEncoderOptions = encoder.into(); + let output_audio_options = pipeline::OutputAudioOptions { + initial: initial.try_into()?, + end_condition: send_eos_when.unwrap_or_default().try_into()?, + mixing_strategy: mixing_strategy.unwrap_or(MixingStrategy::SumClip).into(), + channels: audio_encoder_options.channels(), + }; + + (Some(audio_encoder_options), Some(output_audio_options)) + } + None => (None, None), + }; + + let output_options = output::OutputOptions { + output_protocol: output::OutputProtocolOptions::Rtmp(output::rtmp::RtmpSenderOptions { + url, + video: video_track, + audio: audio_track, + }), + video: video_encoder_options, + audio: audio_encoder_options, + }; + Ok(Self { + output_options, + video: output_video_options, + audio: output_audio_options, + }) + } +} + fn maybe_video_options( options: Option, ) -> Result< @@ -308,6 +368,20 @@ impl From for pipeline::encoder::AudioEncoderOptions { } } +impl From for pipeline::encoder::AudioEncoderOptions { + fn from(value: RtmpAudioEncoderOptions) -> Self { + match value { + RtmpAudioEncoderOptions::Aac { + channels, + sample_rate, + } => AudioEncoderOptions::Aac(AacEncoderOptions { + channels: channels.into(), + sample_rate: sample_rate.unwrap_or(44100), + }), + } + } +} + impl From for pipeline::encoder::AudioEncoderOptions { fn from(value: RtpAudioEncoderOptions) -> Self { match value { diff --git a/compositor_api/src/types/register_output.rs b/compositor_api/src/types/register_output.rs index f7dd60a64..f8bb3c65a 100644 --- a/compositor_api/src/types/register_output.rs +++ b/compositor_api/src/types/register_output.rs @@ -27,6 +27,16 @@ pub struct RtpOutput { pub audio: Option, } +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct RtmpOutput { + pub url: String, + /// Video stream configuration. + pub video: Option, + /// Audio stream configuration. + pub audio: Option, +} + #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(deny_unknown_fields)] pub struct Mp4Output { @@ -90,6 +100,19 @@ pub struct OutputMp4AudioOptions { pub initial: Audio, } +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct OutputRtmpAudioOptions { + /// (**default="sum_clip"**) Specifies how audio should be mixed. + pub mixing_strategy: Option, + /// Condition for termination of output stream based on the input streams states. + pub send_eos_when: Option, + /// Audio encoder options. + pub encoder: RtmpAudioEncoderOptions, + /// Initial audio mixer configuration for output. + pub initial: Audio, +} + #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(deny_unknown_fields)] pub struct OutputWhipAudioOptions { @@ -141,6 +164,16 @@ pub enum Mp4AudioEncoderOptions { }, } +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)] +pub enum RtmpAudioEncoderOptions { + Aac { + channels: AudioChannels, + /// (**default=`44100`**) Sample rate. Allowed values: [8000, 16000, 24000, 44100, 48000]. + sample_rate: Option, + }, +} + #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)] pub enum WhipAudioEncoderOptions { diff --git a/compositor_pipeline/src/pipeline/output.rs b/compositor_pipeline/src/pipeline/output.rs index 4454d1f13..d3ec2a9ba 100644 --- a/compositor_pipeline/src/pipeline/output.rs +++ b/compositor_pipeline/src/pipeline/output.rs @@ -5,6 +5,7 @@ use compositor_render::{ }; use crossbeam_channel::{bounded, Receiver, Sender}; use mp4::{Mp4FileWriter, Mp4OutputOptions}; +use rtmp::RtmpSenderOptions; use tracing::debug; use crate::{audio_mixer::OutputSamples, error::RegisterOutputError, queue::PipelineEvent}; @@ -19,6 +20,7 @@ use super::{ use whip::{WhipSender, WhipSenderOptions}; pub mod mp4; +pub mod rtmp; pub mod rtp; pub mod whip; @@ -33,6 +35,7 @@ pub struct OutputOptions { #[derive(Debug, Clone)] pub enum OutputProtocolOptions { Rtp(RtpSenderOptions), + Rtmp(RtmpSenderOptions), Mp4(Mp4OutputOptions), Whip(WhipSenderOptions), } @@ -70,6 +73,10 @@ pub enum Output { sender: RtpSender, encoder: Encoder, }, + Rtmp { + sender: rtmp::RmtpSender, + encoder: Encoder, + }, Mp4 { writer: Mp4FileWriter, encoder: Encoder, @@ -118,6 +125,12 @@ impl OutputOptionsExt> for OutputOptions { Ok((Output::Rtp { sender, encoder }, Some(port))) } + OutputProtocolOptions::Rtmp(rtmp_options) => { + let sender = rtmp::RmtpSender::new(output_id, rtmp_options.clone(), packets) + .map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?; + + Ok((Output::Rtmp { sender, encoder }, None)) + } OutputProtocolOptions::Mp4(mp4_opt) => { let writer = Mp4FileWriter::new(output_id.clone(), mp4_opt.clone(), packets, ctx) .map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?; @@ -196,6 +209,7 @@ impl Output { pub fn frame_sender(&self) -> Option<&Sender>> { match &self { Output::Rtp { encoder, .. } => encoder.frame_sender(), + Output::Rtmp { encoder, .. } => encoder.frame_sender(), Output::Mp4 { encoder, .. } => encoder.frame_sender(), Output::Whip { encoder, .. } => encoder.frame_sender(), Output::EncodedData { encoder } => encoder.frame_sender(), @@ -206,6 +220,7 @@ impl Output { pub fn samples_batch_sender(&self) -> Option<&Sender>> { match &self { Output::Rtp { encoder, .. } => encoder.samples_batch_sender(), + Output::Rtmp { encoder, .. } => encoder.samples_batch_sender(), Output::Mp4 { encoder, .. } => encoder.samples_batch_sender(), Output::Whip { encoder, .. } => encoder.samples_batch_sender(), Output::EncodedData { encoder } => encoder.samples_batch_sender(), @@ -216,6 +231,7 @@ impl Output { pub fn resolution(&self) -> Option { match &self { Output::Rtp { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()), + Output::Rtmp { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()), Output::Mp4 { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()), Output::Whip { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()), Output::EncodedData { encoder } => encoder.video.as_ref().map(|v| v.resolution()), @@ -226,6 +242,7 @@ impl Output { pub fn request_keyframe(&self, output_id: OutputId) -> Result<(), RequestKeyframeError> { let encoder = match &self { Output::Rtp { encoder, .. } => encoder, + Output::Rtmp { encoder, .. } => encoder, Output::Mp4 { encoder, .. } => encoder, Output::Whip { encoder, .. } => encoder, Output::EncodedData { encoder } => encoder, @@ -252,6 +269,10 @@ impl Output { .video .as_ref() .map(|_| OutputFrameFormat::PlanarYuv420Bytes), + Output::Rtmp { encoder, .. } => encoder + .video + .as_ref() + .map(|_| OutputFrameFormat::PlanarYuv420Bytes), Output::EncodedData { encoder } => encoder .video .as_ref() diff --git a/compositor_pipeline/src/pipeline/output/mp4.rs b/compositor_pipeline/src/pipeline/output/mp4.rs index 41b111312..bd27e3b67 100644 --- a/compositor_pipeline/src/pipeline/output/mp4.rs +++ b/compositor_pipeline/src/pipeline/output/mp4.rs @@ -36,15 +36,6 @@ pub struct Mp4AudioTrack { pub sample_rate: u32, } -pub enum Mp4OutputVideoTrack { - H264 { width: u32, height: u32 }, -} - -pub struct Mp4WriterOptions { - pub output_path: PathBuf, - pub video: Option, -} - pub struct Mp4FileWriter; impl Mp4FileWriter { @@ -118,10 +109,6 @@ fn init_ffmpeg_output( .map(|v| { const VIDEO_TIME_BASE: i32 = 90000; - let codec = match v.codec { - VideoCodec::H264 => ffmpeg::codec::Id::H264, - }; - let mut stream = output_ctx .add_stream(ffmpeg::codec::Id::H264) .map_err(OutputInitError::FfmpegMp4Error)?; @@ -129,7 +116,7 @@ fn init_ffmpeg_output( stream.set_time_base(ffmpeg::Rational::new(1, VIDEO_TIME_BASE)); let codecpar = unsafe { &mut *(*stream.as_mut_ptr()).codecpar }; - codecpar.codec_id = codec.into(); + codecpar.codec_id = ffmpeg::codec::Id::H264.into(); codecpar.codec_type = ffmpeg::ffi::AVMediaType::AVMEDIA_TYPE_VIDEO; codecpar.width = v.width as i32; codecpar.height = v.height as i32; diff --git a/compositor_pipeline/src/pipeline/output/rtmp.rs b/compositor_pipeline/src/pipeline/output/rtmp.rs new file mode 100644 index 000000000..590c4e2f3 --- /dev/null +++ b/compositor_pipeline/src/pipeline/output/rtmp.rs @@ -0,0 +1,251 @@ +use std::{ptr, time::Duration}; + +use compositor_render::{event_handler::emit_event, OutputId}; +use crossbeam_channel::Receiver; +use ffmpeg_next as ffmpeg; +use tracing::{debug, error}; + +use crate::{ + audio_mixer::AudioChannels, + error::OutputInitError, + event::Event, + pipeline::{EncodedChunk, EncodedChunkKind, EncoderOutputEvent}, +}; + +#[derive(Debug, Clone)] +pub struct RtmpSenderOptions { + pub url: String, + pub video: Option, + pub audio: Option, +} + +#[derive(Debug, Clone)] +pub struct RtmpVideoTrack { + pub width: u32, + pub height: u32, +} + +#[derive(Debug, Clone)] +pub struct RtmpAudioTrack { + pub channels: AudioChannels, + pub sample_rate: u32, +} + +pub struct RmtpSender; + +impl RmtpSender { + pub fn new( + output_id: &OutputId, + options: RtmpSenderOptions, + packets_receiver: Receiver, + ) -> Result { + let (output_ctx, video_stream, audio_stream) = init_ffmpeg_output(options)?; + + let output_id = output_id.clone(); + std::thread::Builder::new() + .name(format!("RTMP sender thread for output {}", output_id)) + .spawn(move || { + let _span = + tracing::info_span!("RTMP sender writer", output_id = output_id.to_string()) + .entered(); + + run_ffmpeg_output_thread(output_ctx, video_stream, audio_stream, packets_receiver); + emit_event(Event::OutputDone(output_id)); + debug!("Closing RTMP sender thread."); + }) + .unwrap(); + Ok(Self) + } +} + +fn init_ffmpeg_output( + options: RtmpSenderOptions, +) -> Result< + ( + ffmpeg::format::context::Output, + Option, + Option, + ), + OutputInitError, +> { + let mut output_ctx = + ffmpeg::format::output_as(&options.url, "flv").map_err(OutputInitError::FfmpegMp4Error)?; + + let mut stream_count = 0; + + let video_stream = options + .video + .map(|v| { + let mut stream = output_ctx + .add_stream(ffmpeg::codec::Id::H264) + .map_err(OutputInitError::FfmpegMp4Error)?; + + let codecpar = unsafe { &mut *(*stream.as_mut_ptr()).codecpar }; + codecpar.codec_id = ffmpeg::codec::Id::H264.into(); + codecpar.codec_type = ffmpeg::ffi::AVMediaType::AVMEDIA_TYPE_VIDEO; + codecpar.width = v.width as i32; + codecpar.height = v.height as i32; + + let id = stream_count; + stream_count += 1; + + Ok::(StreamState { + id, + timestamp_offset: None, + }) + }) + .transpose()?; + + let audio_stream = options + .audio + .map(|a| { + let channels = match a.channels { + AudioChannels::Mono => 1, + AudioChannels::Stereo => 2, + }; + let sample_rate = a.sample_rate as i32; + + let mut stream = output_ctx + .add_stream(ffmpeg::codec::Id::AAC) + .map_err(OutputInitError::FfmpegMp4Error)?; + + // If audio time base doesn't match sample rate, ffmpeg muxer produces incorrect timestamps. + stream.set_time_base(ffmpeg::Rational::new(1, sample_rate)); + + let codecpar = unsafe { &mut *(*stream.as_mut_ptr()).codecpar }; + codecpar.codec_id = ffmpeg::codec::Id::AAC.into(); + codecpar.codec_type = ffmpeg::ffi::AVMediaType::AVMEDIA_TYPE_AUDIO; + codecpar.sample_rate = sample_rate; + codecpar.ch_layout = ffmpeg::ffi::AVChannelLayout { + nb_channels: channels, + order: ffmpeg::ffi::AVChannelOrder::AV_CHANNEL_ORDER_UNSPEC, + // This value is ignored when order is AV_CHANNEL_ORDER_UNSPEC + u: ffmpeg::ffi::AVChannelLayout__bindgen_ty_1 { mask: 0 }, + // Field doc: "For some private data of the user." + opaque: ptr::null_mut(), + }; + + let id = stream_count; + stream_count += 1; + + Ok::(StreamState { + id, + timestamp_offset: None, + }) + }) + .transpose()?; + + output_ctx + .write_header() + .map_err(OutputInitError::FfmpegMp4Error)?; + + Ok((output_ctx, video_stream, audio_stream)) +} + +fn run_ffmpeg_output_thread( + mut output_ctx: ffmpeg::format::context::Output, + mut video_stream: Option, + mut audio_stream: Option, + packets_receiver: Receiver, +) { + let mut received_video_eos = video_stream.as_ref().map(|_| false); + let mut received_audio_eos = audio_stream.as_ref().map(|_| false); + + for packet in packets_receiver { + match packet { + EncoderOutputEvent::Data(chunk) => { + write_chunk(chunk, &mut video_stream, &mut audio_stream, &mut output_ctx); + } + EncoderOutputEvent::VideoEOS => match received_video_eos { + Some(false) => received_video_eos = Some(true), + Some(true) => { + error!("Received multiple video EOS events."); + } + None => { + error!("Received video EOS event on non video output."); + } + }, + EncoderOutputEvent::AudioEOS => match received_audio_eos { + Some(false) => received_audio_eos = Some(true), + Some(true) => { + error!("Received multiple audio EOS events."); + } + None => { + error!("Received audio EOS event on non audio output."); + } + }, + }; + + if received_video_eos.unwrap_or(true) && received_audio_eos.unwrap_or(true) { + if let Err(err) = output_ctx.write_trailer() { + error!("Failed to write trailer to RTMP stream: {}.", err); + }; + break; + } + } +} + +fn write_chunk( + chunk: EncodedChunk, + video_stream: &mut Option, + audio_stream: &mut Option, + output_ctx: &mut ffmpeg::format::context::Output, +) { + let packet = create_packet(chunk, video_stream, audio_stream); + if let Some(packet) = packet { + if let Err(err) = packet.write(output_ctx) { + error!("Failed to write packet to RTMP stream: {}.", err); + } + } +} + +fn create_packet( + chunk: EncodedChunk, + video_stream: &mut Option, + audio_stream: &mut Option, +) -> Option { + let stream_state = match chunk.kind { + EncodedChunkKind::Video(_) => { + match video_stream.as_mut() { + Some(stream_state) => Some(stream_state), + None => { + error!("Failed to create packet for video chunk. No video stream registered on init."); + None + } + } + } + EncodedChunkKind::Audio(_) => { + match audio_stream.as_mut() { + Some(stream_state) => Some(stream_state), + None => { + error!("Failed to create packet for audio chunk. No audio stream registered on init."); + None + } + } + } + }?; + let timestamp_offset = stream_state.timestamp_offset(&chunk); + let pts = chunk.pts - timestamp_offset; + // let dts = chunk.dts.map(|dts| dts - timestamp_offset).unwrap_or(pts); + let dts = chunk.dts.map(|dts| dts - timestamp_offset); + + let mut packet = ffmpeg::Packet::copy(&chunk.data); + packet.set_pts(Some((pts.as_secs_f64() * 1000.0) as i64)); + packet.set_dts(dts.map(|dts| (dts.as_secs_f64() * 1000.0) as i64)); + packet.set_time_base(ffmpeg::Rational::new(1, 1000)); + packet.set_stream(stream_state.id); + + Some(packet) +} + +#[derive(Debug, Clone)] +struct StreamState { + id: usize, + timestamp_offset: Option, +} + +impl StreamState { + fn timestamp_offset(&mut self, chunk: &EncodedChunk) -> Duration { + *self.timestamp_offset.get_or_insert(chunk.pts) + } +} diff --git a/integration_tests/examples/rtmp.rs b/integration_tests/examples/rtmp.rs new file mode 100644 index 000000000..e70677218 --- /dev/null +++ b/integration_tests/examples/rtmp.rs @@ -0,0 +1,116 @@ +use anyhow::Result; +use compositor_api::types::Resolution; +use serde_json::json; +use std::{process::Command, time::Duration}; + +use integration_tests::examples::{self, run_example}; + +const BUNNY_URL: &str = + "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"; + +const VIDEO_RESOLUTION: Resolution = Resolution { + width: 1280, + height: 720, +}; + +const OUTPUT_URL: &str = "rtmp://a.rtmp.youtube.com/live2/appkey"; + +fn main() { + run_example(client_code); +} + +fn client_code() -> Result<()> { + Command::new("ffplay") + .args(["-listen", "1", OUTPUT_URL]) + .spawn()?; + + examples::post( + "input/input_1/register", + &json!({ + "type": "mp4", + "url": BUNNY_URL, + "required": true, + "offset_ms": 0, + }), + )?; + + let shader_source = include_str!("./silly.wgsl"); + examples::post( + "shader/shader_example_1/register", + &json!({ + "source": shader_source, + }), + )?; + + examples::post( + "output/output_1/register", + &json!({ + "type": "rtmp_stream", + "url": OUTPUT_URL, + "video": { + "resolution": { + "width": VIDEO_RESOLUTION.width, + "height": VIDEO_RESOLUTION.height, + }, + "encoder": { + "type": "ffmpeg_h264", + "preset": "ultrafast" + }, + "initial": { + "root": { + "type": "view", + "children": [ + { + "type": "rescaler", + "width": VIDEO_RESOLUTION.width, + "height": VIDEO_RESOLUTION.height, + "top": 0, + "left": 0, + "child": { + "type": "input_stream", + "input_id": "input_1", + } + }, + { + "type": "view", + "bottom": 0, + "left": 0, + "width": VIDEO_RESOLUTION.width, + "height": 100, + "background_color_rgba": "#00000088", + "children": [ + { "type": "view" }, + { + "type": "text", + "text": "LiveCompositor 😃😍", + "font_size": 80, + "color_rgba": "#40E0D0FF", + "weight": "bold", + }, + { "type": "view" } + ] + } + ] + } + } + }, + "audio": { + "encoder": { + "type": "aac", + "channels": "stereo" + }, + "initial": { + "inputs": [ + {"input_id": "input_1"} + ] + } + } + }), + )?; + + std::thread::sleep(Duration::from_millis(500)); + + examples::post("start", &json!({}))?; + + Ok(()) +} diff --git a/src/routes/register_request.rs b/src/routes/register_request.rs index 61cce9c95..da82b80fa 100644 --- a/src/routes/register_request.rs +++ b/src/routes/register_request.rs @@ -13,8 +13,8 @@ use crate::{ use compositor_api::{ error::ApiError, types::{ - DeckLink, ImageSpec, InputId, Mp4Input, Mp4Output, OutputId, RendererId, RtpInput, - RtpOutput, ShaderSpec, WebRendererSpec, WhipInput, WhipOutput, + DeckLink, ImageSpec, InputId, Mp4Input, Mp4Output, OutputId, RendererId, RtmpOutput, + RtpInput, RtpOutput, ShaderSpec, WebRendererSpec, WhipInput, WhipOutput, }, }; @@ -34,6 +34,7 @@ pub enum RegisterInput { #[serde(tag = "type", rename_all = "snake_case")] pub enum RegisterOutput { RtpStream(RtpOutput), + RtmpStream(RtmpOutput), Mp4(Mp4Output), Whip(WhipOutput), } @@ -96,6 +97,9 @@ pub(super) async fn handle_output( RegisterOutput::Whip(whip) => { Pipeline::register_output(&api.pipeline, output_id.into(), whip.try_into()?)? } + RegisterOutput::RtmpStream(rtmp) => { + Pipeline::register_output(&api.pipeline, output_id.into(), rtmp.try_into()?)? + } }; match response { Some(Port(port)) => Ok(Response::RegisteredPort { port: Some(port) }),