Skip to content

Commit

Permalink
gst_nmos_*_receiver: Master_enable control added
Browse files Browse the repository at this point in the history
  • Loading branch information
LufeBisect committed Jan 27, 2025
1 parent 460f2cc commit 6be94e7
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 124 deletions.
104 changes: 65 additions & 39 deletions cpp/libs/gst_nmos_plugins/src/gst_nmos_audio_receiver_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,53 +331,79 @@ void construct_pipeline(GstNmosaudioreceiver* self)
fmt::print("Invalid format sent for current receiver.\n");
}
}

void create_nmos(GstNmosaudioreceiver* self)
{
const auto node_config_json = create_node_config(self->config);
if(node_config_json == nullptr)
{
GST_ERROR_OBJECT(self, "Failed to initialize NMOS client. No valid node JSON location given.");
return;
}
const auto device_config_json = create_device_config(self->config);
nlohmann::json_abi_v3_11_3::json receiver_config_json = nullptr;

receiver_config_json = create_receiver_config(self->config);

auto receiver_callback = [self](const std::optional<std::string>& sdp, bool master_enabled) {
fmt::print("Receiver Activation Callback: SDP={}, Master Enabled={}\n", sdp.has_value() ? sdp.value() : "None",
master_enabled);

if(sdp)
auto initialize_nmos = [&]() -> bool {
const auto node_config_json = create_node_config(self->config);
if(node_config_json == nullptr)
{
fmt::print("Received SDP: {}\n", sdp.value());
auto sdp_settings = parse_sdp(sdp.value());
if(sdp_settings.has_value() && sdp.value() != self->sdp_string)
{
self->sdp_settings = sdp_settings.value();
self->sdp_string = sdp.value();
remove_old_bin(self);
construct_pipeline(self);
gst_element_set_state(GST_ELEMENT(self), GST_STATE_PLAYING);
}
GST_ERROR_OBJECT(self, "Failed to initialize NMOS client. No valid node JSON location given.");
return false;
}
else

auto result = ossrf::nmos_client_t::create(self->config.node.id, node_config_json.dump());
if(!result.has_value())
{
fmt::print("No SDP provided.\n");
GST_ERROR_OBJECT(self, "Failed to initialize NMOS client.");
return false;
}
fmt::print("Master enabled: {}\n", master_enabled);

self->client = std::move(result.value());
return true;
};

auto result = ossrf::nmos_client_t::create(self->config.node.id, node_config_json.dump());
if(!result.has_value())
{
GST_ERROR_OBJECT(self, "Failed to initialize NMOS client.");
return;
}
self->client = std::move(result.value());
self->client->add_device(device_config_json.dump());
self->client->add_receiver(self->config.device.id, receiver_config_json.dump(), receiver_callback);
if(!initialize_nmos()) return;

// Add device and receiver configurations
self->client->add_device(create_device_config(self->config).dump());
self->client->add_receiver(
self->config.device.id, create_receiver_config(self->config).dump(),
[self](const std::optional<std::string>& sdp, bool master_enabled) {
fmt::print("Receiver Activation Callback: SDP={}, Master Enabled={}\n",
sdp.has_value() ? sdp.value() : "None", master_enabled);

if(master_enabled)
{
if(sdp)
{
fmt::print("Received SDP: {}\n", sdp.value());
auto sdp_settings = parse_sdp(sdp.value());
if(sdp_settings.has_value() && sdp.value() != self->sdp_string)
{
self->sdp_settings = sdp_settings.value();
self->sdp_string = sdp.value();
remove_old_bin(self);
construct_pipeline(self);
auto time = gst_element_get_base_time(GST_ELEMENT(self));
gst_element_set_state(GST_ELEMENT(self), GST_STATE_PLAYING);
gst_pad_set_offset(self->bin_pad, static_cast<long>(gst_clock_get_time(self->clock) - time));
}
}
else if(!sdp && self->sdp_string != "")
{
fmt::print("No new SDP provided, enabling master pipeline.\n");
remove_old_bin(self);
construct_pipeline(self);
auto time = gst_element_get_base_time(GST_ELEMENT(self));
gst_element_set_state(GST_ELEMENT(self), GST_STATE_PLAYING);
gst_pad_set_offset(self->bin_pad, static_cast<long>(gst_clock_get_time(self->clock) - time));
}
}
else
{
if(sdp)
{
fmt::print("Disabling master: SDP received but master is not enabled.\n");
remove_old_bin(self);
}
else
{
fmt::print("Master not enabled and no SDP provided.\n");
}
}
fmt::print("Master enabled: {}\n", master_enabled);
});

GST_INFO_OBJECT(self, "NMOS client initialized successfully.");
}

Expand Down
140 changes: 100 additions & 40 deletions cpp/libs/gst_nmos_plugins/src/gst_nmos_sender_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ GST_DEBUG_CATEGORY_STATIC(gst_nmossender_debug_category);
typedef struct _GstNmossender
{
GstBin parent;
GstElementHandle<_GstElement> bin;
GstElementHandle<_GstElement> queue;
GstElementHandle<_GstElement> video_payloader;
GstElementHandle<_GstElement> audio_payloader_16;
GstElementHandle<_GstElement> audio_payloader_24;
GstElementHandle<_GstElement> udpsink;
GstPad* pad;
gulong block_id;
ossrf::nmos_client_uptr client;
GstCaps* caps;
config_fields_t config;
bool nmos_active;
} GstNmossender;

typedef struct _GstNmossenderClass
Expand Down Expand Up @@ -157,6 +161,11 @@ static void gst_nmossender_get_property(GObject* object, guint property_id, GVal
}
}

static GstPadProbeReturn block_pad_probe_cb(GstPad* pad, GstPadProbeInfo* info, gpointer user_data)
{
return GST_PAD_PROBE_DROP;
}

/* Event handler for the sink pad */
static gboolean gst_nmossender_sink_event(GstPad* pad, GstObject* parent, GstEvent* event)
{
Expand Down Expand Up @@ -276,57 +285,102 @@ static gboolean gst_nmossender_sink_event(GstPad* pad, GstObject* parent, GstEve
return gst_pad_event_default(pad, parent, event);
}

/* State Change so it doesn't boot NMOS without pipeline being set to playing */
static GstStateChangeReturn gst_nmossender_change_state(GstElement* element, GstStateChange transition)
void create_nmos(GstNmossender* self)
{
GstStateChangeReturn ret;
GstNmossender* self = GST_NMOSSENDER(element);

auto sender_activation_callback = [](bool master_enabled, const nlohmann::json& transport_params) {
auto sender_activation_callback = [self](bool master_enabled, const nlohmann::json& transport_params) {
fmt::print("nmos_sender_callback: master_enabled={}, transport_params={}\n", master_enabled,
transport_params.dump());
};

switch(transition)
{
case GST_STATE_CHANGE_PAUSED_TO_PLAYING: {

const auto node_config_json = create_node_config(self->config);
if(node_config_json == nullptr)
{
GST_ERROR_OBJECT(self, "Failed to initialize NMOS client. No valid node JSON location given.");
return GST_STATE_CHANGE_FAILURE;
}
const auto device_config_json = create_device_config(self->config);
nlohmann::json_abi_v3_11_3::json sender_config_json = nullptr;

if(self->config.is_audio)
if(master_enabled)
{
sender_config_json = create_audio_sender_config(self->config);
if(transport_params.is_array() && !transport_params.empty())
{
if(self->block_id == 0)
{
self->block_id = gst_pad_add_probe(self->pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
block_pad_probe_cb, NULL, NULL);
gst_element_set_state(self->udpsink.get(), GST_STATE_PAUSED);
}
const auto& param = transport_params[0];
std::string dest_ip;
if(param.contains("destination_ip"))
{
dest_ip = param["destination_ip"].get<std::string>();
}
int dest_port = 9999;
if(param.contains("destination_port"))
{
dest_port = param["destination_port"].get<int>();
}
g_object_set(G_OBJECT(self->udpsink.get()), "host", dest_ip.c_str(), "port", dest_port, nullptr);
}
gst_element_set_state(self->udpsink.get(), GST_STATE_PLAYING);
if(self->block_id != 0)
{
gst_pad_remove_probe(self->pad, self->block_id);
self->block_id = 0;
}
}
else
{
sender_config_json = create_video_sender_config(self->config);
self->block_id =
gst_pad_add_probe(self->pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, block_pad_probe_cb, NULL, NULL);
gst_element_set_state(self->udpsink.get(), GST_STATE_PAUSED);
}
};

auto result = ossrf::nmos_client_t::create(self->config.node.id, node_config_json.dump());
if(result.has_value() == false)
{
GST_ERROR_OBJECT(self, "Failed to initialize NMOS client. Node ID: %s", self->config.node.id.c_str());
return GST_STATE_CHANGE_FAILURE;
}
const auto node_config_json = create_node_config(self->config);
if(node_config_json == nullptr)
{
GST_ERROR_OBJECT(self, "Failed to initialize NMOS client. No valid node JSON location given.");
return;
}
const auto device_config_json = create_device_config(self->config);
nlohmann::json_abi_v3_11_3::json sender_config_json = nullptr;

self->client = std::move(result.value());
if(!self->client->add_device(device_config_json.dump()))
{
GST_ERROR_OBJECT(self, "Failed to add device to NMOS client");
return GST_STATE_CHANGE_FAILURE;
}
if(self->config.is_audio)
{
sender_config_json = create_audio_sender_config(self->config);
}
else
{
sender_config_json = create_video_sender_config(self->config);
}

auto result = ossrf::nmos_client_t::create(self->config.node.id, node_config_json.dump());
if(result.has_value() == false)
{
GST_ERROR_OBJECT(self, "Failed to initialize NMOS client. Node ID: %s", self->config.node.id.c_str());
return;
}

self->client = std::move(result.value());
if(!self->client->add_device(device_config_json.dump()))
{
GST_ERROR_OBJECT(self, "Failed to add device to NMOS client");
return;
}

if(!self->client->add_sender(self->config.device.id, sender_config_json.dump(), sender_activation_callback))
if(!self->client->add_sender(self->config.device.id, sender_config_json.dump(), sender_activation_callback))
{
GST_ERROR_OBJECT(self, "Failed to add sender to NMOS client");
return;
}
}

/* State Change so it doesn't boot NMOS without pipeline being set to playing */
static GstStateChangeReturn gst_nmossender_change_state(GstElement* element, GstStateChange transition)
{
GstStateChangeReturn ret;
GstNmossender* self = GST_NMOSSENDER(element);

switch(transition)
{
case GST_STATE_CHANGE_PAUSED_TO_PLAYING: {
if(self->nmos_active != true)
{
GST_ERROR_OBJECT(self, "Failed to add sender to NMOS client");
return GST_STATE_CHANGE_FAILURE;
create_nmos(self);
self->nmos_active = true;
}
}
break;
Expand Down Expand Up @@ -392,13 +446,16 @@ static void gst_nmossender_class_init(GstNmossenderClass* klass)
/* Object initialization */
static void gst_nmossender_init(GstNmossender* self)
{
auto maybeBin = GstElementHandle<GstElement>::create_bin("dynamic-bin");

auto maybeQueue = GstElementHandle<GstElement>::create_element("queue", nullptr);
auto maybeVideoPay = GstElementHandle<GstElement>::create_element("rtpvrawpay", nullptr);
auto maybeAudioPay16 = GstElementHandle<GstElement>::create_element("rtpL16pay", nullptr);
auto maybeAudioPay24 = GstElementHandle<GstElement>::create_element("rtpL24pay", nullptr);
auto maybeUdpSink = GstElementHandle<GstElement>::create_element("udpsink", nullptr);

if(std::holds_alternative<std::nullptr_t>(maybeQueue) || std::holds_alternative<std::nullptr_t>(maybeVideoPay) ||
if(std::holds_alternative<std::nullptr_t>(maybeBin) || std::holds_alternative<std::nullptr_t>(maybeQueue) ||
std::holds_alternative<std::nullptr_t>(maybeVideoPay) ||
std::holds_alternative<std::nullptr_t>(maybeAudioPay16) ||
std::holds_alternative<std::nullptr_t>(maybeAudioPay24) || std::holds_alternative<std::nullptr_t>(maybeUdpSink))
{
Expand All @@ -414,7 +471,8 @@ static void gst_nmossender_init(GstNmossender* self)

// set properties
g_object_set(G_OBJECT(self->queue.get()), "max-size-buffers", 1, nullptr);
g_object_set(G_OBJECT(self->udpsink.get()), "host", "127.0.0.1", "port", 9999, nullptr);
g_object_set(G_OBJECT(self->udpsink.get()), "host", "127.0.0.1", "port", 9999, "is-live", true, "async", false,
nullptr);
create_default_config_fields_sender(&self->config);

gst_bin_add_many(GST_BIN(self), self->queue.get(), self->video_payloader.get(), self->audio_payloader_24.get(),
Expand Down Expand Up @@ -444,6 +502,8 @@ static void gst_nmossender_init(GstNmossender* self)
gst_object_unref(sink_ghost_pad);
return;
}

self->pad = sink_ghost_pad;
}

static gboolean plugin_init(GstPlugin* plugin)
Expand Down
Loading

0 comments on commit 6be94e7

Please sign in to comment.