Skip to content

Commit

Permalink
[core] event handling cleanup (#1921)
Browse files Browse the repository at this point in the history
* Removed `corrupted` and `update_connection` events from `eSubscriberEvent` and `ePublisherEvent` enums. Removed `clock` field from `SPubEventCallbackData` and `SSubEventCallbackData` structs. Added `FireDroppedEvent` function to `CSubscriberImpl` and updated dropped message handling.
  • Loading branch information
rex-schilasky authored Jan 20, 2025
1 parent c0a858e commit 8a70566
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 148 deletions.
44 changes: 18 additions & 26 deletions ecal/core/include/ecal/ecal_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,16 @@ namespace eCAL
none = 0,
connected = 1,
disconnected = 2,
dropped = 3,
corrupted = 5,
update_connection = 6,
dropped = 3
};

inline std::string to_string(eSubscriberEvent event_) {
switch (event_) {
case eSubscriberEvent::none: return "NONE";
case eSubscriberEvent::connected: return "CONNECTED";
case eSubscriberEvent::disconnected: return "DISCONNECTED";
case eSubscriberEvent::dropped : return "DROPPED";
case eSubscriberEvent::corrupted: return "CORRUPTED";
case eSubscriberEvent::update_connection : return "UPDATED_CONNECTION";
default: return "Unknown";
case eSubscriberEvent::none: return "NONE";
case eSubscriberEvent::connected: return "CONNECTED";
case eSubscriberEvent::disconnected: return "DISCONNECTED";
case eSubscriberEvent::dropped : return "DROPPED";
default: return "Unknown";
}
}

Expand All @@ -77,18 +73,16 @@ namespace eCAL
none = 0,
connected = 1,
disconnected = 2,
dropped = 3,
update_connection = 4,
dropped = 3
};

inline std::string to_string(ePublisherEvent event_) {
switch (event_) {
case ePublisherEvent::none: return "NONE";
case ePublisherEvent::connected: return "CONNECTED";
case ePublisherEvent::disconnected: return "DISCONNECTED";
case ePublisherEvent::dropped: return "DROPPED";
case ePublisherEvent::update_connection: return "UPDATED_CONNECTION";
default: return "Unknown";
case ePublisherEvent::none: return "NONE";
case ePublisherEvent::connected: return "CONNECTED";
case ePublisherEvent::disconnected: return "DISCONNECTED";
case ePublisherEvent::dropped: return "DROPPED";
default: return "Unknown";
}
}

Expand Down Expand Up @@ -153,10 +147,9 @@ namespace eCAL
**/
struct SPubEventCallbackData
{
ePublisherEvent type{ ePublisherEvent::none }; //!< publisher event type
long long time{ 0 }; //!< publisher event time in µs
long long clock{ 0 }; //!< publisher event clock
SDataTypeInformation tdatatype; //!< datatype description of the connected subscriber (for pub_event_update_connection only)
ePublisherEvent event_type{ ePublisherEvent::none }; //!< publisher event type
long long event_time{ 0 }; //!< publisher event time in µs (eCAL time)
SDataTypeInformation subscriber_datatype; //!< datatype description of the connected subscriber
};

/**
Expand All @@ -172,10 +165,9 @@ namespace eCAL
**/
struct SSubEventCallbackData
{
eSubscriberEvent type{ eSubscriberEvent::none }; //!< subscriber event type
long long time{ 0 }; //!< subscriber event time in µs
long long clock{ 0 }; //!< subscriber event clock
SDataTypeInformation tdatatype; //!< topic information of the connected subscriber (for sub_event_update_connection only)
eSubscriberEvent event_type{ eSubscriberEvent::none }; //!< subscriber event type
long long event_time{ 0 }; //!< subscriber event time in µs (eCAL time)
SDataTypeInformation publisher_datatype; //!< topic information of the connected publisher
};

/**
Expand Down
28 changes: 6 additions & 22 deletions ecal/core/src/pubsub/ecal_publisher_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <ecal/ecal_log.h>
#include <ecal/ecal_payload_writer.h>
#include <ecal/ecal_process.h>
#include <ecal/ecal_time.h>

#if ECAL_CORE_REGISTRATION
#include "registration/ecal_registration_provider.h"
Expand Down Expand Up @@ -489,8 +490,7 @@ namespace eCAL
#endif

// add key to connection map, including connection state
bool is_new_connection = false;
bool is_updated_connection = false;
bool is_new_connection = false;
{
const std::lock_guard<std::mutex> lock(m_connection_map_mutex);
auto subscription_info_iter = m_connection_map.find(subscription_info_);
Expand All @@ -511,11 +511,6 @@ namespace eCAL
{
is_new_connection = true;
}
// the connection was active, so we just update it
else
{
is_updated_connection = true;
}

// update the data type, the layer states and set the state active
connection = SConnection{ data_type_info_, sub_layer_states_, true };
Expand All @@ -532,11 +527,6 @@ namespace eCAL
// fire connect event
FireConnectEvent(subscription_info_, data_type_info_);
}
else if (is_updated_connection)
{
// fire update event
FireUpdateEvent(subscription_info_, data_type_info_);
}

#ifndef NDEBUG
Logging::Log(Logging::log_level_debug3, m_attributes.topic_name + "::CPublisherImpl::ApplySubscriberRegistration");
Expand Down Expand Up @@ -739,10 +729,9 @@ namespace eCAL
if(m_event_id_callback)
{
SPubEventCallbackData data;
data.type = type_;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
data.tdatatype = data_type_info_;
data.event_type = type_;
data.event_time = eCAL::Time::GetMicroSeconds();
data.subscriber_datatype = data_type_info_;

Registration::STopicId topic_id;
topic_id.topic_id.entity_id = subscription_info_.entity_id;
Expand All @@ -763,7 +752,7 @@ namespace eCAL
{
v5::SPubEventCallbackData event_data;
event_data.type = type_;
event_data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
event_data.time = eCAL::Time::GetMicroSeconds();
event_data.clock = 0;
event_data.tid = std::to_string(subscription_info_.entity_id);
event_data.tdatatype = data_type_info_;
Expand All @@ -779,11 +768,6 @@ namespace eCAL
FireEvent(ePublisherEvent::connected, subscription_info_, data_type_info_);
}

void CPublisherImpl::FireUpdateEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(ePublisherEvent::update_connection, subscription_info_, data_type_info_);
}

void CPublisherImpl::FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(ePublisherEvent::disconnected, subscription_info_, data_type_info_);
Expand Down
3 changes: 1 addition & 2 deletions ecal/core/src/pubsub/ecal_publisher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ namespace eCAL
void FireEvent(const ePublisherEvent type_, const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_);

void FireConnectEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_);
void FireUpdateEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_);
void FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_);

size_t GetConnectionCount();
Expand Down Expand Up @@ -168,7 +167,7 @@ namespace eCAL
EventCallbackMapT m_event_callback_map;

std::mutex m_event_id_callback_mutex;
PubEventCallbackT m_event_id_callback;
PubEventCallbackT m_event_id_callback;

long long m_id = 0;
long long m_clock = 0;
Expand Down
77 changes: 22 additions & 55 deletions ecal/core/src/pubsub/ecal_subscriber_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <ecal/ecal_config.h>
#include <ecal/ecal_log.h>
#include <ecal/ecal_process.h>
#include <ecal/ecal_time.h>

#if ECAL_CORE_REGISTRATION
#include "registration/ecal_registration_provider.h"
Expand Down Expand Up @@ -308,8 +309,7 @@ namespace eCAL
#endif

// add key to connection map, including connection state
bool is_new_connection = false;
bool is_updated_connection = false;
bool is_new_connection = false;
{
const std::lock_guard<std::mutex> lock(m_connection_map_mtx);
auto publication_info_iter = m_connection_map.find(publication_info_);
Expand All @@ -330,11 +330,6 @@ namespace eCAL
{
is_new_connection = true;
}
// the connection was active, so we just update it
else
{
is_updated_connection = true;
}

// update the data type and layer states, even if the connection is not new
connection = SConnection{ data_type_info_, pub_layer_states_, true };
Expand All @@ -350,11 +345,6 @@ namespace eCAL
// fire connect event
FireConnectEvent(publication_info_, data_type_info_);
}
else if (is_updated_connection)
{
// fire update event
FireUpdateEvent(publication_info_, data_type_info_);
}

#ifndef NDEBUG
Logging::Log(Logging::log_level_debug3, m_attributes.topic_name + "::CSubscriberImpl::ApplyPublisherRegistration");
Expand Down Expand Up @@ -497,7 +487,11 @@ namespace eCAL
// - a dropped message
// - an out-of-order message
// - a multiple sent message
if (!CheckMessageClock(topic_info_.tid, clock_))
SPublicationInfo publication_info;
publication_info.entity_id = topic_info_.tid;
publication_info.host_name = topic_info_.hname;
publication_info.process_id = topic_info_.pid;
if (!CheckMessageClock(publication_info, clock_))
{
// we will not process that message
return(0);
Expand Down Expand Up @@ -788,10 +782,9 @@ namespace eCAL
if (m_event_id_callback)
{
SSubEventCallbackData data;
data.type = type_;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
data.tdatatype = data_type_info_;
data.event_type = type_;
data.event_time = eCAL::Time::GetMicroSeconds();
data.publisher_datatype = data_type_info_;

Registration::STopicId topic_id;
topic_id.topic_id.entity_id = publication_info_.entity_id;
Expand All @@ -812,7 +805,7 @@ namespace eCAL
{
v5::SSubEventCallbackData data;
data.type = type_;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.time = eCAL::Time::GetMicroSeconds();
data.clock = 0;
data.tid = std::to_string(publication_info_.entity_id);
data.tdatatype = data_type_info_;
Expand All @@ -828,14 +821,14 @@ namespace eCAL
FireEvent(eSubscriberEvent::connected, publication_info_, data_type_info_);
}

void CSubscriberImpl::FireUpdateEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
void CSubscriberImpl::FireDisconnectEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(eSubscriberEvent::update_connection, publication_info_, data_type_info_);
FireEvent(eSubscriberEvent::disconnected, publication_info_, data_type_info_);
}

void CSubscriberImpl::FireDisconnectEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
void CSubscriberImpl::FireDroppedEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(eSubscriberEvent::disconnected, publication_info_, data_type_info_);
FireEvent(eSubscriberEvent::dropped, publication_info_, data_type_info_);
}

size_t CSubscriberImpl::GetConnectionCount()
Expand All @@ -852,14 +845,14 @@ namespace eCAL
return count;
}

bool CSubscriberImpl::CheckMessageClock(const Registration::EntityIdT& tid_, long long current_clock_)
bool CSubscriberImpl::CheckMessageClock(const SPublicationInfo& publication_info_, long long current_clock_)
{
auto iter = m_writer_counter_map.find(tid_);
auto iter = m_writer_counter_map.find(publication_info_.entity_id);

// initial entry
if (iter == m_writer_counter_map.end())
{
m_writer_counter_map[tid_] = current_clock_;
m_writer_counter_map[publication_info_.entity_id] = current_clock_;
return true;
}
// clock entry exists
Expand Down Expand Up @@ -906,37 +899,11 @@ namespace eCAL
msg += "\')";
Logging::Log(log_level_warning, msg);
#endif
// new event handling with topic id
if (m_event_id_callback)
{
SSubEventCallbackData data;
data.type = eSubscriberEvent::dropped;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = current_clock_;

Registration::STopicId topic_id;
topic_id.topic_name = m_attributes.topic_name;
const std::lock_guard<std::mutex> lock(m_event_id_callback_mutex);

// call event callback
m_event_id_callback(topic_id, data);
}

// deprecated event handling with topic name
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_mutex);
auto citer = m_event_callback_map.find(eSubscriberEvent::dropped);
if (citer != m_event_callback_map.end() && citer->second)
{
v5::SSubEventCallbackData data;
data.type = eSubscriberEvent::dropped;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = current_clock_;

// call event callback
(citer->second)(m_attributes.topic_name.c_str(), &data);
}
}
// fire dropped event
// we do not know the data type of the dropped message here
// so we use an empty data type information
FireDroppedEvent(publication_info_, SDataTypeInformation());

// increase the drop counter
m_message_drops += clock_difference;
Expand Down
6 changes: 3 additions & 3 deletions ecal/core/src/pubsub/ecal_subscriber_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ namespace eCAL
void FireEvent(const eSubscriberEvent type_, const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_);

void FireConnectEvent (const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_);
void FireUpdateEvent (const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_);
void FireDisconnectEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_);

void FireDroppedEvent (const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_);

size_t GetConnectionCount();

bool CheckMessageClock(const Registration::EntityIdT& tid_, long long current_clock_);
bool CheckMessageClock(const SPublicationInfo& publication_info_, long long current_clock_);

int32_t GetFrequency();

Expand Down
4 changes: 1 addition & 3 deletions lang/c/core/src/ecal_publisher_cimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ namespace
case eCAL::ePublisherEvent::connected: return pub_event_connected;
case eCAL::ePublisherEvent::disconnected: return pub_event_disconnected;
case eCAL::ePublisherEvent::dropped: return pub_event_dropped;
case eCAL::ePublisherEvent::update_connection: return pub_event_update_connection;
default: return pub_event_none;
default: return pub_event_none;
}
}

Expand All @@ -50,7 +49,6 @@ namespace
case pub_event_connected: return eCAL::ePublisherEvent::connected;
case pub_event_disconnected: return eCAL::ePublisherEvent::disconnected;
case pub_event_dropped: return eCAL::ePublisherEvent::dropped;
case pub_event_update_connection: return eCAL::ePublisherEvent::update_connection;
default: return eCAL::ePublisherEvent::none;
}
}
Expand Down
6 changes: 1 addition & 5 deletions lang/c/core/src/ecal_subscriber_cimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ namespace
case eCAL::eSubscriberEvent::connected: return sub_event_connected;
case eCAL::eSubscriberEvent::disconnected: return sub_event_disconnected;
case eCAL::eSubscriberEvent::dropped: return sub_event_dropped;
case eCAL::eSubscriberEvent::corrupted: return sub_event_corrupted;
case eCAL::eSubscriberEvent::update_connection: return sub_event_update_connection;
default: return sub_event_none;
default: return sub_event_none;
}
}

Expand All @@ -51,8 +49,6 @@ namespace
case sub_event_connected: return eCAL::eSubscriberEvent::connected;
case sub_event_disconnected: return eCAL::eSubscriberEvent::disconnected;
case sub_event_dropped: return eCAL::eSubscriberEvent::dropped;
case sub_event_corrupted: return eCAL::eSubscriberEvent::corrupted;
case sub_event_update_connection: return eCAL::eSubscriberEvent::update_connection;
default: return eCAL::eSubscriberEvent::none;
}
}
Expand Down
1 change: 1 addition & 0 deletions lang/c/tests/core_test/src/core_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <ecal/cimpl/ecal_core_cimpl.h>
#include <ecal/cimpl/ecal_process_cimpl.h>
#include <ecal/ecal_defs.h>
#include <ecal/ecal_os.h>

#include <cstring>

Expand Down
Loading

0 comments on commit 8a70566

Please sign in to comment.