From b61ddee83f4c580c7b3aae6b898265f5a5a6c846 Mon Sep 17 00:00:00 2001 From: LufeBisect Date: Tue, 28 Jan 2025 13:58:56 +0000 Subject: [PATCH] nmosvideoreceiver:Timeout detection, .h files to .hpp --- .../{element_class.h => element_class.hpp} | 7 +- ...configuration.h => nmos_configuration.hpp} | 24 +-- .../src/gst_nmos_audio_receiver_plugin.cpp | 117 +++++------ .../src/gst_nmos_sender_plugin.cpp | 43 ++-- .../src/gst_nmos_video_receiver_plugin.cpp | 190 ++++++++++++------ cpp/libs/gst_nmos_plugins/src/utils.cpp | 6 +- .../src/{utils.h => utils.hpp} | 4 +- 7 files changed, 208 insertions(+), 183 deletions(-) rename cpp/libs/gst_nmos_plugins/include/{element_class.h => element_class.hpp} (92%) rename cpp/libs/gst_nmos_plugins/include/{nmos_configuration.h => nmos_configuration.hpp} (73%) rename cpp/libs/gst_nmos_plugins/src/{utils.h => utils.hpp} (93%) diff --git a/cpp/libs/gst_nmos_plugins/include/element_class.h b/cpp/libs/gst_nmos_plugins/include/element_class.hpp similarity index 92% rename from cpp/libs/gst_nmos_plugins/include/element_class.h rename to cpp/libs/gst_nmos_plugins/include/element_class.hpp index 490b78f..495c8d7 100644 --- a/cpp/libs/gst_nmos_plugins/include/element_class.h +++ b/cpp/libs/gst_nmos_plugins/include/element_class.hpp @@ -12,7 +12,7 @@ template class GstElementHandle const char* element_name = nullptr) { T* elem = reinterpret_cast(gst_element_factory_make(factory_name, element_name)); - if(!elem) + if(elem == nullptr) { return nullptr; } @@ -32,10 +32,8 @@ template class GstElementHandle GstElementHandle(const GstElementHandle&) = delete; GstElementHandle& operator=(const GstElementHandle&) = delete; - // Move constructor GstElementHandle(GstElementHandle&& other) noexcept : handle_(other.handle_) { other.handle_ = nullptr; } - // Move assignment GstElementHandle& operator=(GstElementHandle&& other) noexcept { if(this != &other) @@ -58,6 +56,7 @@ template class GstElementHandle } } + /// Destruction of this object will no longer unref the owned element. void reset(T* new_ptr = nullptr) { handle_ = new_ptr; @@ -65,8 +64,6 @@ template class GstElementHandle T* get() const { return handle_; } - explicit operator bool() const { return (handle_ != nullptr); } - private: explicit GstElementHandle(T* handle) : handle_(handle) {} diff --git a/cpp/libs/gst_nmos_plugins/include/nmos_configuration.h b/cpp/libs/gst_nmos_plugins/include/nmos_configuration.hpp similarity index 73% rename from cpp/libs/gst_nmos_plugins/include/nmos_configuration.h rename to cpp/libs/gst_nmos_plugins/include/nmos_configuration.hpp index 6069318..d57e81a 100644 --- a/cpp/libs/gst_nmos_plugins/include/nmos_configuration.h +++ b/cpp/libs/gst_nmos_plugins/include/nmos_configuration.hpp @@ -2,20 +2,20 @@ #include #include -typedef struct node_fields_t +struct node_fields_t { std::string id; std::string configuration_location; -} node_fields_t; +}; -typedef struct device_fields_t +struct device_fields_t { std::string id; std::string label; std::string description; -} device_fields_t; +}; -typedef struct video_media_fields_t +struct video_media_fields_t { gint width; gint height; @@ -23,25 +23,25 @@ typedef struct video_media_fields_t gint frame_rate_den; std::string sampling; std::string structure; -} video_media_fields_t; +}; -typedef struct audio_media_fields_t +struct audio_media_fields_t { std::string format; gint number_of_channels; gint sampling_rate; gint packet_time; -} audio_media_fields_t; +}; -typedef struct network_fields_t +struct network_fields_t { std::string source_address; std::string interface_name; std::string destination_address; gint destination_port; -} network_fields_t; +}; -typedef struct config_fields_t +struct config_fields_t { node_fields_t node; device_fields_t device; @@ -54,4 +54,4 @@ typedef struct config_fields_t network_fields_t network; std::string interface_name; std::string address; -} config_fields_t; +}; diff --git a/cpp/libs/gst_nmos_plugins/src/gst_nmos_audio_receiver_plugin.cpp b/cpp/libs/gst_nmos_plugins/src/gst_nmos_audio_receiver_plugin.cpp index 0fbc25b..a0ab8e1 100644 --- a/cpp/libs/gst_nmos_plugins/src/gst_nmos_audio_receiver_plugin.cpp +++ b/cpp/libs/gst_nmos_plugins/src/gst_nmos_audio_receiver_plugin.cpp @@ -25,10 +25,10 @@ #include "bisect/sdp.h" #include "bisect/sdp/reader.h" #include "bisect/nmoscpp/configuration.h" -#include "utils.h" #include "ossrf/nmos/api/nmos_client.h" -#include "../include/element_class.h" -#include "../include/nmos_configuration.h" +#include "utils.hpp" +#include "../include/element_class.hpp" +#include "../include/nmos_configuration.hpp" #include #include @@ -99,23 +99,15 @@ static void gst_nmosaudioreceiver_set_property(GObject* object, guint property_i switch(static_cast(property_id)) { case PropertyId::NodeId: self->config.node.id = g_value_dup_string(value); break; - case PropertyId::NodeConfigFileLocation: self->config.node.configuration_location = g_value_dup_string(value); break; - case PropertyId::DeviceId: self->config.device.id = g_value_dup_string(value); break; - case PropertyId::DeviceLabel: self->config.device.label = g_value_dup_string(value); break; - case PropertyId::DeviceDescription: self->config.device.description = g_value_dup_string(value); break; - case PropertyId::ReceiverId: self->config.id = g_value_dup_string(value); break; - case PropertyId::ReceiverLabel: self->config.label = g_value_dup_string(value); break; - case PropertyId::ReceiverDescription: self->config.description = g_value_dup_string(value); break; - case PropertyId::DstAddress: self->config.address = g_value_dup_string(value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec); break; @@ -128,7 +120,6 @@ static void gst_nmosaudioreceiver_get_property(GObject* object, guint property_i switch(static_cast(property_id)) { - case PropertyId::NodeId: g_value_set_string(value, self->config.node.id.c_str()); break; case PropertyId::NodeConfigFileLocation: g_value_set_string(value, self->config.node.configuration_location.c_str()); @@ -157,10 +148,10 @@ void remove_old_bin(GstNmosaudioreceiver* self) return; } gulong block_id = - gst_pad_add_probe(self->element_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, block_pad_probe_cb, NULL, NULL); + gst_pad_add_probe(self->element_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, block_pad_probe_cb, nullptr, nullptr); gst_element_send_event(GST_ELEMENT(self), gst_event_new_flush_start()); - gst_element_send_event(GST_ELEMENT(self), gst_event_new_flush_stop(FALSE)); + gst_element_send_event(GST_ELEMENT(self), gst_event_new_flush_stop(false)); self->clock = gst_element_get_clock(GST_ELEMENT(self)); @@ -245,8 +236,8 @@ void construct_pipeline(GstNmosaudioreceiver* self) g_object_set(G_OBJECT(self->udp_src.get()), "caps", caps, nullptr); - g_object_set(G_OBJECT(self->rtp_jitter_buffer.get()), "do-retransmission", true, "do-lost", true, "mode", 0, - "latency", 0, nullptr); + g_object_set(G_OBJECT(self->rtp_jitter_buffer.get()), "do-lost", false, "do-retransmission", false, "mode", 0, + "latency", 10, nullptr); g_object_set(G_OBJECT(self->queue.get()), "max-size-buffers", 3, nullptr); @@ -263,14 +254,12 @@ void construct_pipeline(GstNmosaudioreceiver* self) if(audio_info.bits_per_sample == 16) { - if(gst_element_link_many(self->udp_src.get(), self->rtp_jitter_buffer.get(), self->queue.get(), self->rtp_audio_depay_16.get(), nullptr) == false) { GST_ERROR_OBJECT(self, "Failed to link elements inside the bin."); return; } - // Create a ghost pad from the depay's src bin_src_pad = gst_element_get_static_pad(self->rtp_audio_depay_16.get(), "src"); if(bin_src_pad == nullptr) @@ -323,7 +312,6 @@ void construct_pipeline(GstNmosaudioreceiver* self) GST_ERROR_OBJECT(self, "Failed to link plugin ghost pad to bin's ghost pad."); return; } - gst_object_unref(plugin_pad); } catch(std::bad_variant_access const& ex) @@ -356,53 +344,47 @@ void create_nmos(GstNmosaudioreceiver* self) // Add device and receiver configurations self->client->add_device(create_device_config(self->config).dump()); - self->client->add_receiver( - self->config.device.id, create_receiver_config(self->config).dump(), - [self](const std::optional& sdp, bool master_enabled) { - fmt::print("Receiver Activation Callback: SDP={}, Master Enabled={}\n", - sdp.has_value() ? sdp.value() : "None", master_enabled); - - if(master_enabled) - { - if(sdp) - { - fmt::print("Received SDP: {}\n", sdp.value()); - auto sdp_settings = parse_sdp(sdp.value()); - if(sdp_settings.has_value() && sdp.value() != self->sdp_string) - { - self->sdp_settings = sdp_settings.value(); - self->sdp_string = sdp.value(); - remove_old_bin(self); - construct_pipeline(self); - auto time = gst_element_get_base_time(GST_ELEMENT(self)); - gst_element_set_state(GST_ELEMENT(self), GST_STATE_PLAYING); - gst_pad_set_offset(self->bin_pad, static_cast(gst_clock_get_time(self->clock) - time)); - } - } - else if(!sdp && self->sdp_string != "") - { - fmt::print("No new SDP provided, enabling master pipeline.\n"); - remove_old_bin(self); - construct_pipeline(self); - auto time = gst_element_get_base_time(GST_ELEMENT(self)); - gst_element_set_state(GST_ELEMENT(self), GST_STATE_PLAYING); - gst_pad_set_offset(self->bin_pad, static_cast(gst_clock_get_time(self->clock) - time)); - } - } - else - { - if(sdp) - { - fmt::print("Disabling master: SDP received but master is not enabled.\n"); - remove_old_bin(self); - } - else - { - fmt::print("Master not enabled and no SDP provided.\n"); - } - } - fmt::print("Master enabled: {}\n", master_enabled); - }); + self->client->add_receiver(self->config.device.id, create_receiver_config(self->config).dump(), + [self](const std::optional& sdp, bool master_enabled) { + fmt::print("Receiver Activation Callback: SDP={}, Master Enabled={}\n", + sdp.has_value() ? sdp.value() : "None", master_enabled); + if(master_enabled) + { + if(sdp) + { + fmt::print("Received SDP: {}\n", sdp.value()); + auto sdp_settings = parse_sdp(sdp.value()); + if(sdp_settings.has_value() && sdp.value() != self->sdp_string) + { + self->sdp_settings = sdp_settings.value(); + self->sdp_string = sdp.value(); + remove_old_bin(self); + construct_pipeline(self); + gst_element_set_state(GST_ELEMENT(self), GST_STATE_PLAYING); + } + } + else if(!sdp && self->sdp_string != "") + { + fmt::print("No new SDP provided, enabling master pipeline.\n"); + remove_old_bin(self); + construct_pipeline(self); + gst_element_set_state(GST_ELEMENT(self), GST_STATE_PLAYING); + } + } + else + { + if(sdp) + { + fmt::print("Disabling master: SDP received but master is not enabled.\n"); + remove_old_bin(self); + } + else + { + fmt::print("Master not enabled and no SDP provided.\n"); + } + } + fmt::print("Master enabled: {}\n", master_enabled); + }); GST_INFO_OBJECT(self, "NMOS client initialized successfully."); } @@ -411,7 +393,6 @@ static GstStateChangeReturn gst_nmosaudioreceiver_change_state(GstElement* eleme { GstStateChangeReturn ret; GstNmosaudioreceiver* self = GST_NMOSAUDIORECEIVER(element); - switch(transition) { case GST_STATE_CHANGE_NULL_TO_READY: { @@ -491,7 +472,7 @@ static gboolean plugin_init(GstPlugin* plugin) return gst_element_register(plugin, "nmosaudioreceiver", GST_RANK_NONE, GST_TYPE_NMOSAUDIORECEIVER); } -#define VERSION "0.1" +#define VERSION "1.0" #define PACKAGE "gst-nmos-audio-receiver-plugin" #define PACKAGE_NAME "AMWA NMOS Sender and Receiver Framework Plugins" #define GST_PACKAGE_ORIGIN "https://www.amwa.tv/" diff --git a/cpp/libs/gst_nmos_plugins/src/gst_nmos_sender_plugin.cpp b/cpp/libs/gst_nmos_plugins/src/gst_nmos_sender_plugin.cpp index 69e9e66..a09f04a 100644 --- a/cpp/libs/gst_nmos_plugins/src/gst_nmos_sender_plugin.cpp +++ b/cpp/libs/gst_nmos_plugins/src/gst_nmos_sender_plugin.cpp @@ -21,10 +21,10 @@ */ #include "bisect/json.h" -#include "utils.h" #include "ossrf/nmos/api/nmos_client.h" -#include "../include/element_class.h" -#include "../include/nmos_configuration.h" +#include "utils.hpp" +#include "../include/element_class.hpp" +#include "../include/nmos_configuration.hpp" #include #include @@ -45,7 +45,6 @@ typedef struct _GstNmossender GstElementHandle<_GstElement> audio_payloader_16; GstElementHandle<_GstElement> audio_payloader_24; GstElementHandle<_GstElement> udpsink; - GstPad* pad; gulong block_id; ossrf::nmos_client_uptr client; GstCaps* caps; @@ -95,36 +94,25 @@ static void gst_nmossender_set_property(GObject* object, guint property_id, cons switch(static_cast(property_id)) { case PropertyId::NodeId: self->config.node.id = g_value_dup_string(value); break; - case PropertyId::NodeConfigFileLocation: self->config.node.configuration_location = g_value_dup_string(value); break; - case PropertyId::DeviceId: self->config.device.id = g_value_dup_string(value); break; - case PropertyId::DeviceLabel: self->config.device.label = g_value_dup_string(value); break; - case PropertyId::DeviceDescription: self->config.device.description = g_value_dup_string(value); break; - case PropertyId::SenderId: self->config.id = g_value_dup_string(value); break; - case PropertyId::SenderLabel: self->config.label = g_value_dup_string(value); break; - case PropertyId::SenderDescription: self->config.description = g_value_dup_string(value); break; - case PropertyId::SourceAddress: self->config.network.source_address = g_value_dup_string(value); break; - case PropertyId::InterfaceName: self->config.network.interface_name = g_value_dup_string(value); g_object_set(G_OBJECT(self->udpsink.get()), "bind_address", self->config.network.interface_name.c_str(), nullptr); break; - case PropertyId::DestinationAddress: self->config.network.destination_address = g_value_dup_string(value); g_object_set(G_OBJECT(self->udpsink.get()), "host", self->config.network.destination_address.c_str(), nullptr); break; - case PropertyId::DestinationPort: self->config.network.destination_port = atoi(g_value_get_string(value)); g_object_set(G_OBJECT(self->udpsink.get()), "port", self->config.network.destination_port, nullptr); @@ -163,7 +151,11 @@ static void gst_nmossender_get_property(GObject* object, guint property_id, GVal static GstPadProbeReturn block_pad_probe_cb(GstPad* pad, GstPadProbeInfo* info, gpointer user_data) { - return GST_PAD_PROBE_DROP; + if(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER) + { + return GST_PAD_PROBE_DROP; + } + return GST_PAD_PROBE_OK; } /* Event handler for the sink pad */ @@ -176,7 +168,6 @@ static gboolean gst_nmossender_sink_event(GstPad* pad, GstObject* parent, GstEve case GST_EVENT_CAPS: { GstCaps* caps = nullptr; gst_event_parse_caps(event, &caps); - if(caps) { gchar* caps_str = gst_caps_to_string(caps); @@ -269,7 +260,6 @@ static gboolean gst_nmossender_sink_event(GstPad* pad, GstObject* parent, GstEve { GST_WARNING_OBJECT(self, "Unsupported media type: %s", media_type.c_str()); } - g_free(caps_str); } else @@ -290,16 +280,15 @@ void create_nmos(GstNmossender* self) auto sender_activation_callback = [self](bool master_enabled, const nlohmann::json& transport_params) { fmt::print("nmos_sender_callback: master_enabled={}, transport_params={}\n", master_enabled, transport_params.dump()); - + GstPad* pad = gst_element_get_static_pad(self->queue.get(), "sink"); if(master_enabled) { if(transport_params.is_array() && !transport_params.empty()) { if(self->block_id == 0) { - self->block_id = gst_pad_add_probe(self->pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, - block_pad_probe_cb, NULL, NULL); - gst_element_set_state(self->udpsink.get(), GST_STATE_PAUSED); + self->block_id = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, block_pad_probe_cb, + nullptr, nullptr); } const auto& param = transport_params[0]; std::string dest_ip; @@ -314,18 +303,16 @@ void create_nmos(GstNmossender* self) } g_object_set(G_OBJECT(self->udpsink.get()), "host", dest_ip.c_str(), "port", dest_port, nullptr); } - gst_element_set_state(self->udpsink.get(), GST_STATE_PLAYING); if(self->block_id != 0) { - gst_pad_remove_probe(self->pad, self->block_id); + gst_pad_remove_probe(pad, self->block_id); self->block_id = 0; } } else { self->block_id = - gst_pad_add_probe(self->pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, block_pad_probe_cb, NULL, NULL); - gst_element_set_state(self->udpsink.get(), GST_STATE_PAUSED); + gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, block_pad_probe_cb, nullptr, nullptr); } }; @@ -502,8 +489,6 @@ static void gst_nmossender_init(GstNmossender* self) gst_object_unref(sink_ghost_pad); return; } - - self->pad = sink_ghost_pad; } static gboolean plugin_init(GstPlugin* plugin) @@ -511,7 +496,7 @@ static gboolean plugin_init(GstPlugin* plugin) return gst_element_register(plugin, "nmossender", GST_RANK_NONE, GST_TYPE_NMOSSENDER); } -#define VERSION "0.1" +#define VERSION "1.0" #define PACKAGE "gst-nmos-sender-plugin" #define PACKAGE_NAME "AMWA NMOS Sender and Receiver Framework Plugins" #define GST_PACKAGE_ORIGIN "https://www.amwa.tv/" diff --git a/cpp/libs/gst_nmos_plugins/src/gst_nmos_video_receiver_plugin.cpp b/cpp/libs/gst_nmos_plugins/src/gst_nmos_video_receiver_plugin.cpp index 13c8cdb..7288249 100644 --- a/cpp/libs/gst_nmos_plugins/src/gst_nmos_video_receiver_plugin.cpp +++ b/cpp/libs/gst_nmos_plugins/src/gst_nmos_video_receiver_plugin.cpp @@ -25,10 +25,10 @@ #include "bisect/sdp.h" #include "bisect/sdp/reader.h" #include "bisect/nmoscpp/configuration.h" -#include "utils.h" #include "ossrf/nmos/api/nmos_client.h" -#include "gst_nmos_plugins/include/element_class.h" -#include "gst_nmos_plugins/include/nmos_configuration.h" +#include "utils.hpp" +#include "gst_nmos_plugins/include/element_class.hpp" +#include "gst_nmos_plugins/include/nmos_configuration.hpp" #include #include @@ -49,17 +49,19 @@ typedef struct _GstNmosvideoreceiver GstBin parent; GstPad* element_pad; GstPad* bin_pad; - GstClock* clock; GstElementHandle<_GstElement> bin; GstElementHandle<_GstElement> udp_src; GstElementHandle<_GstElement> rtp_video_depay; GstElementHandle<_GstElement> rtp_jitter_buffer; GstElementHandle<_GstElement> queue; + ossrf::nmos_client_uptr client; config_fields_t config; - std::string sdp_string; sdp_settings_t sdp_settings; + std::string sdp_string; bool nmos_active; + bool pipeline_clear; + gint64 last_buffer_time; } GstNmosvideoreceiver; typedef struct _GstNmosvideoreceiverClass @@ -101,23 +103,15 @@ static void gst_nmosvideoreceiver_set_property(GObject* object, guint property_i switch(static_cast(property_id)) { case PropertyId::NodeId: self->config.node.id = g_value_dup_string(value); break; - case PropertyId::NodeConfigFileLocation: self->config.node.configuration_location = g_value_dup_string(value); break; - case PropertyId::DeviceId: self->config.device.id = g_value_dup_string(value); break; - case PropertyId::DeviceLabel: self->config.device.label = g_value_dup_string(value); break; - case PropertyId::DeviceDescription: self->config.device.description = g_value_dup_string(value); break; - case PropertyId::ReceiverId: self->config.id = g_value_dup_string(value); break; - case PropertyId::ReceiverLabel: self->config.label = g_value_dup_string(value); break; - case PropertyId::ReceiverDescription: self->config.description = g_value_dup_string(value); break; - case PropertyId::DstAddress: self->config.address = g_value_dup_string(value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec); break; @@ -159,12 +153,10 @@ void remove_old_bin(GstNmosvideoreceiver* self) return; } gulong block_id = - gst_pad_add_probe(self->element_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, block_pad_probe_cb, NULL, NULL); + gst_pad_add_probe(self->element_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, block_pad_probe_cb, nullptr, nullptr); gst_element_send_event(GST_ELEMENT(self), gst_event_new_flush_start()); - gst_element_send_event(GST_ELEMENT(self), gst_event_new_flush_stop(FALSE)); - - self->clock = gst_element_get_clock(GST_ELEMENT(self)); + gst_element_send_event(GST_ELEMENT(self), gst_event_new_flush_stop(false)); gst_element_set_state(self->bin.get(), GST_STATE_NULL); @@ -184,11 +176,80 @@ void remove_old_bin(GstNmosvideoreceiver* self) if(block_id != 0) { gst_pad_remove_probe(self->element_pad, block_id); + self->element_pad = nullptr; } } +void restart_jitter(GstNmosvideoreceiver* self) +{ + gst_element_set_state(GST_ELEMENT(self->bin.get()), GST_STATE_NULL); + gst_element_send_event(GST_ELEMENT(self), gst_event_new_flush_start()); + gst_element_send_event(GST_ELEMENT(self), gst_event_new_flush_stop(false)); + + gst_element_unlink(self->udp_src.get(), self->rtp_jitter_buffer.get()); + gst_element_unlink(self->rtp_jitter_buffer.get(), self->queue.get()); + + gst_bin_remove(GST_BIN(self->bin.get()), self->rtp_jitter_buffer.get()); + self->rtp_jitter_buffer.reset(); + + auto maybeJitter = GstElementHandle::create_element("rtpjitterbuffer", nullptr); + if(std::holds_alternative(maybeJitter)) + { + GST_ERROR_OBJECT(self, "Failed to recreate rtpjitterbuffer"); + return; + } + self->rtp_jitter_buffer = std::move(std::get>(maybeJitter)); + + gst_bin_add(GST_BIN(self->bin.get()), self->rtp_jitter_buffer.get()); + g_object_set(G_OBJECT(self->rtp_jitter_buffer.get()), "do-lost", false, "do-retransmission", false, "mode", 0, + "latency", 10, nullptr); + + if(!gst_element_link(self->udp_src.get(), self->rtp_jitter_buffer.get())) + { + GST_ERROR_OBJECT(self, "Failed to link udp_src to new jitter!\n\n"); + return; + } + if(!gst_element_link(self->rtp_jitter_buffer.get(), self->queue.get())) + { + GST_ERROR_OBJECT(self, "Failed to link new jitter to queue!\n"); + return; + } + gst_element_sync_state_with_parent(self->rtp_jitter_buffer.get()); + gst_element_set_state(GST_ELEMENT(self->bin.get()), GST_STATE_PLAYING); +} + +static GstPadProbeReturn buffer_monitor_probe_cb(GstPad* pad, GstPadProbeInfo* info, gpointer user_data) +{ + if(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER) + { + GstNmosvideoreceiver* self = (GstNmosvideoreceiver*)user_data; + self->last_buffer_time = g_get_monotonic_time(); + if(self->pipeline_clear == true) + { + self->pipeline_clear = false; + } + } + return GST_PAD_PROBE_OK; +} + +static gboolean check_no_data_timeout(gpointer user_data) +{ + GstNmosvideoreceiver* self = (GstNmosvideoreceiver*)user_data; + + gint64 now = g_get_monotonic_time(); + if((now - self->last_buffer_time) > (10 * G_GINT64_CONSTANT(1000000)) && self->pipeline_clear == false) + { + self->pipeline_clear = true; + restart_jitter(self); + fmt::print("Timeout detected. Restarting pipeline.\n"); + } + return true; +} + void construct_pipeline(GstNmosvideoreceiver* self) { + self->last_buffer_time = g_get_monotonic_time(); + g_timeout_add_seconds(1, (GSourceFunc)check_no_data_timeout, self); auto format = self->sdp_settings.format; try { @@ -251,8 +312,8 @@ void construct_pipeline(GstNmosvideoreceiver* self) g_object_set(G_OBJECT(self->udp_src.get()), "caps", caps, nullptr); - g_object_set(G_OBJECT(self->rtp_jitter_buffer.get()), "do-retransmission", true, "do-lost", true, "mode", 2, - "latency", 0, nullptr); + g_object_set(G_OBJECT(self->rtp_jitter_buffer.get()), "do-lost", false, "do-retransmission", false, "mode", 0, + "latency", 10, nullptr); g_object_set(G_OBJECT(self->queue.get()), "max-size-buffers", 3, nullptr); @@ -271,6 +332,10 @@ void construct_pipeline(GstNmosvideoreceiver* self) return; } + GstPad* pad = gst_element_get_static_pad(self->udp_src.get(), "src"); + gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, buffer_monitor_probe_cb, self, NULL); + gst_object_unref(pad); + // Create a ghost pad from the depay's src GstPad* bin_src_pad = gst_element_get_static_pad(self->rtp_video_depay.get(), "src"); if(bin_src_pad == nullptr) @@ -339,50 +404,47 @@ void create_nmos(GstNmosvideoreceiver* self) // Add device and receiver configurations self->client->add_device(create_device_config(self->config).dump()); - self->client->add_receiver( - self->config.device.id, create_receiver_config(self->config).dump(), - [self](const std::optional& sdp, bool master_enabled) { - fmt::print("Receiver Activation Callback: SDP={}, Master Enabled={}\n", - sdp.has_value() ? sdp.value() : "None", master_enabled); - - if(master_enabled) - { - if(sdp) - { - fmt::print("Received SDP: {}\n", sdp.value()); - auto sdp_settings = parse_sdp(sdp.value()); - if(sdp_settings.has_value() && sdp.value() != self->sdp_string) - { - self->sdp_settings = sdp_settings.value(); - self->sdp_string = sdp.value(); - remove_old_bin(self); - construct_pipeline(self); - auto time = gst_element_get_base_time(GST_ELEMENT(self)); - gst_element_set_state(GST_ELEMENT(self), GST_STATE_PLAYING); - gst_pad_set_offset(self->bin_pad, static_cast(gst_clock_get_time(self->clock) - time)); - } - } - else if(!sdp && self->sdp_string != "") - { - construct_pipeline(self); - auto time = gst_element_get_base_time(GST_ELEMENT(self)); - gst_element_set_state(GST_ELEMENT(self), GST_STATE_PLAYING); - gst_pad_set_offset(self->bin_pad, static_cast(gst_clock_get_time(self->clock) - time)); - } - } - else if(!master_enabled) - { - if(sdp) - { - remove_old_bin(self); - } - else - { - fmt::print("No SDP provided.\n"); - } - } - fmt::print("Master enabled: {}\n", master_enabled); - }); + self->client->add_receiver(self->config.device.id, create_receiver_config(self->config).dump(), + [self](const std::optional& sdp, bool master_enabled) { + fmt::print("Receiver Activation Callback: SDP={}, Master Enabled={}\n", + sdp.has_value() ? sdp.value() : "None", master_enabled); + if(master_enabled) + { + if(sdp) + { + fmt::print("Received SDP: {}\n", sdp.value()); + auto sdp_settings = parse_sdp(sdp.value()); + if(sdp_settings.has_value() && sdp.value() != self->sdp_string) + { + self->sdp_settings = sdp_settings.value(); + self->sdp_string = sdp.value(); + remove_old_bin(self); + construct_pipeline(self); + gst_element_set_state(GST_ELEMENT(self), GST_STATE_PLAYING); + } + } + else if(!sdp && self->sdp_string != "") + { + fmt::print("No new SDP provided, enabling master pipeline.\n"); + remove_old_bin(self); + construct_pipeline(self); + gst_element_set_state(GST_ELEMENT(self), GST_STATE_PLAYING); + } + } + else + { + if(sdp) + { + fmt::print("Disabling master: SDP received but master is not enabled.\n"); + remove_old_bin(self); + } + else + { + fmt::print("Master not enabled and no SDP provided.\n"); + } + } + fmt::print("Master enabled: {}\n", master_enabled); + }); GST_INFO_OBJECT(self, "NMOS client initialized successfully."); } @@ -471,7 +533,7 @@ static gboolean plugin_init(GstPlugin* plugin) return gst_element_register(plugin, "nmosvideoreceiver", GST_RANK_NONE, GST_TYPE_NMOSVIDEORECEIVER); } -#define VERSION "0.1" +#define VERSION "1.0" #define PACKAGE "gst-nmos-video-receiver-plugin" #define PACKAGE_NAME "AMWA NMOS Sender and Receiver Framework Plugins" #define GST_PACKAGE_ORIGIN "https://www.amwa.tv/" diff --git a/cpp/libs/gst_nmos_plugins/src/utils.cpp b/cpp/libs/gst_nmos_plugins/src/utils.cpp index 6e3f300..173c0a8 100644 --- a/cpp/libs/gst_nmos_plugins/src/utils.cpp +++ b/cpp/libs/gst_nmos_plugins/src/utils.cpp @@ -1,11 +1,11 @@ -#include "utils.h" +#include "utils.hpp" #include "ossrf/nmos/api/nmos_client.h" #include "bisect/nmoscpp/configuration.h" #include "bisect/expected/macros.h" #include "bisect/expected.h" #include "bisect/json.h" -#include "gst_nmos_plugins/include/element_class.h" -#include "gst_nmos_plugins/include/nmos_configuration.h" +#include "gst_nmos_plugins/include/element_class.hpp" +#include "gst_nmos_plugins/include/nmos_configuration.hpp" #include using namespace bisect; diff --git a/cpp/libs/gst_nmos_plugins/src/utils.h b/cpp/libs/gst_nmos_plugins/src/utils.hpp similarity index 93% rename from cpp/libs/gst_nmos_plugins/src/utils.h rename to cpp/libs/gst_nmos_plugins/src/utils.hpp index 10ef2f1..b11f9eb 100644 --- a/cpp/libs/gst_nmos_plugins/src/utils.h +++ b/cpp/libs/gst_nmos_plugins/src/utils.hpp @@ -1,11 +1,11 @@ #pragma once +#include "../include/element_class.hpp" +#include "../include/nmos_configuration.hpp" #include #include #include #include #include -#include "../include/element_class.h" -#include "../include/nmos_configuration.h" void create_default_config_fields_sender(config_fields_t* config); void create_default_config_fields_video_receiver(config_fields_t* config);