Skip to content

Commit

Permalink
ffmpeg: Add offline state and reconnection attempts to ffmpeg consumer
Browse files Browse the repository at this point in the history
TODO: need minor refactoring
  • Loading branch information
niklaspandersson authored and deadbeef84 committed Jun 5, 2024
1 parent 0acc8e0 commit f20f499
Showing 1 changed file with 44 additions and 5 deletions.
49 changes: 44 additions & 5 deletions src/modules/ffmpeg/consumer/ffmpeg_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ extern "C" {
#include <libavutil/opt.h>
#include <libavutil/pixfmt.h>
#include <libavutil/samplefmt.h>
#include <libavutil/channel_layout.h>
}
#ifdef _MSC_VER
#pragma warning(pop)
Expand All @@ -71,9 +70,13 @@ extern "C" {
#include <tbb/concurrent_queue.h>
#include <tbb/parallel_invoke.h>

#include <chrono>
#include <future>
#include <memory>
#include <thread>

using namespace std::chrono_literals;

namespace caspar { namespace ffmpeg {

// TODO multiple output streams
Expand Down Expand Up @@ -325,10 +328,10 @@ struct Stream

if (in_frame.first) {
if (enc->codec_type == AVMEDIA_TYPE_VIDEO) {
frame = make_av_video_frame(in_frame.first, format_desc);
frame = make_av_video_frame(in_frame.first, format_desc);
frame->pts = in_frame.second;
} else if (enc->codec_type == AVMEDIA_TYPE_AUDIO) {
frame = make_av_audio_frame(in_frame.first, format_desc);
frame = make_av_audio_frame(in_frame.first, format_desc);
frame->pts = in_frame.second * frame->nb_samples;
} else {
// TODO
Expand Down Expand Up @@ -386,6 +389,9 @@ struct ffmpeg_consumer : public core::frame_consumer
tbb::concurrent_bounded_queue<std::pair<core::const_frame, std::int64_t> > frame_buffer_;
std::thread frame_thread_;

std::atomic<bool> offline_;
std::future<void> offline_timeout_;

common::bit_depth depth_;

public:
Expand All @@ -398,6 +404,7 @@ struct ffmpeg_consumer : public core::frame_consumer
, realtime_(realtime)
, path_(std::move(path))
, args_(std::move(args))
, offline_(false)
, depth_(depth)
{
state_["file/path"] = u8(path_);
Expand Down Expand Up @@ -565,6 +572,7 @@ struct ffmpeg_consumer : public core::frame_consumer

auto packet_cb = [&](std::shared_ptr<AVPacket>&& pkt) { packet_buffer.push(std::move(pkt)); };

offline_ = false;
std::int64_t frame_number = 0;
while (true) {
{
Expand Down Expand Up @@ -605,14 +613,45 @@ struct ffmpeg_consumer : public core::frame_consumer
});
}

void go_offline()
{
if (frame_thread_.joinable()) {
frame_thread_.join();
}
frame_buffer_.clear();
offline_ = true;
offline_timeout_ = std::async(std::launch::async, []() { std::this_thread::sleep_for(5s); });
CASPAR_LOG(info) << print() << " Attempting reconnection in 5s";
}

void try_go_online() { initialize(format_desc_, channel_index_); }

std::future<bool> send(core::video_field field, core::const_frame frame) override
{
// TODO - field alignment

{
try {
std::lock_guard<std::mutex> lock(exception_mutex_);
if (exception_ != nullptr) {
std::rethrow_exception(exception_);
auto exception = exception_;
exception_ = nullptr;
std::rethrow_exception(exception);
}
} catch (...) {
if (!offline_) {
CASPAR_LOG_CURRENT_EXCEPTION();
} else {
CASPAR_LOG(warning) << print() << " Reconnection attempt failed";
}
go_offline();
}

if (offline_) {
auto status = offline_timeout_.wait_for(0s);
if (status == std::future_status::ready) {
try_go_online();
} else {
return make_ready_future(true);
}
}

Expand Down

0 comments on commit f20f499

Please sign in to comment.