Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] event handling cleanup #1921

Merged
merged 3 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 18 additions & 26 deletions ecal/core/include/ecal/ecal_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,16 @@ namespace eCAL
none = 0,
connected = 1,
disconnected = 2,
dropped = 3,
corrupted = 5,
update_connection = 6,
dropped = 3
};

inline std::string to_string(eSubscriberEvent event_) {
switch (event_) {
case eSubscriberEvent::none: return "NONE";
case eSubscriberEvent::connected: return "CONNECTED";
case eSubscriberEvent::disconnected: return "DISCONNECTED";
case eSubscriberEvent::dropped : return "DROPPED";
case eSubscriberEvent::corrupted: return "CORRUPTED";
case eSubscriberEvent::update_connection : return "UPDATED_CONNECTION";
default: return "Unknown";
case eSubscriberEvent::none: return "NONE";
case eSubscriberEvent::connected: return "CONNECTED";
case eSubscriberEvent::disconnected: return "DISCONNECTED";
case eSubscriberEvent::dropped : return "DROPPED";
default: return "Unknown";
}
}

Expand All @@ -77,18 +73,16 @@ namespace eCAL
none = 0,
connected = 1,
disconnected = 2,
dropped = 3,
update_connection = 4,
dropped = 3
};

inline std::string to_string(ePublisherEvent event_) {
switch (event_) {
case ePublisherEvent::none: return "NONE";
case ePublisherEvent::connected: return "CONNECTED";
case ePublisherEvent::disconnected: return "DISCONNECTED";
case ePublisherEvent::dropped: return "DROPPED";
case ePublisherEvent::update_connection: return "UPDATED_CONNECTION";
default: return "Unknown";
case ePublisherEvent::none: return "NONE";
case ePublisherEvent::connected: return "CONNECTED";
case ePublisherEvent::disconnected: return "DISCONNECTED";
case ePublisherEvent::dropped: return "DROPPED";
default: return "Unknown";
}
}

Expand Down Expand Up @@ -153,10 +147,9 @@ namespace eCAL
**/
struct SPubEventCallbackData
{
ePublisherEvent type{ ePublisherEvent::none }; //!< publisher event type
long long time{ 0 }; //!< publisher event time in µs
long long clock{ 0 }; //!< publisher event clock
SDataTypeInformation tdatatype; //!< datatype description of the connected subscriber (for pub_event_update_connection only)
ePublisherEvent event_type{ ePublisherEvent::none }; //!< publisher event type
long long event_time{ 0 }; //!< publisher event time in µs (eCAL time)
SDataTypeInformation subscriber_datatype; //!< datatype description of the connected subscriber
};

/**
Expand All @@ -172,10 +165,9 @@ namespace eCAL
**/
struct SSubEventCallbackData
{
eSubscriberEvent type{ eSubscriberEvent::none }; //!< subscriber event type
long long time{ 0 }; //!< subscriber event time in µs
long long clock{ 0 }; //!< subscriber event clock
SDataTypeInformation tdatatype; //!< topic information of the connected subscriber (for sub_event_update_connection only)
eSubscriberEvent event_type{ eSubscriberEvent::none }; //!< subscriber event type
long long event_time{ 0 }; //!< subscriber event time in µs (eCAL time)
SDataTypeInformation publisher_datatype; //!< topic information of the connected publisher
};

/**
Expand Down
28 changes: 6 additions & 22 deletions ecal/core/src/pubsub/ecal_publisher_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <ecal/ecal_log.h>
#include <ecal/ecal_payload_writer.h>
#include <ecal/ecal_process.h>
#include <ecal/ecal_time.h>

#if ECAL_CORE_REGISTRATION
#include "registration/ecal_registration_provider.h"
Expand Down Expand Up @@ -489,8 +490,7 @@ namespace eCAL
#endif

// add key to connection map, including connection state
bool is_new_connection = false;
bool is_updated_connection = false;
bool is_new_connection = false;
{
const std::lock_guard<std::mutex> lock(m_connection_map_mutex);
auto subscription_info_iter = m_connection_map.find(subscription_info_);
Expand All @@ -511,11 +511,6 @@ namespace eCAL
{
is_new_connection = true;
}
// the connection was active, so we just update it
else
{
is_updated_connection = true;
}

// update the data type, the layer states and set the state active
connection = SConnection{ data_type_info_, sub_layer_states_, true };
Expand All @@ -532,11 +527,6 @@ namespace eCAL
// fire connect event
FireConnectEvent(subscription_info_, data_type_info_);
}
else if (is_updated_connection)
{
// fire update event
FireUpdateEvent(subscription_info_, data_type_info_);
}

#ifndef NDEBUG
Logging::Log(Logging::log_level_debug3, m_attributes.topic_name + "::CPublisherImpl::ApplySubscriberRegistration");
Expand Down Expand Up @@ -739,10 +729,9 @@ namespace eCAL
if(m_event_id_callback)
{
SPubEventCallbackData data;
data.type = type_;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
data.tdatatype = data_type_info_;
data.event_type = type_;
data.event_time = eCAL::Time::GetMicroSeconds();
data.subscriber_datatype = data_type_info_;

Registration::STopicId topic_id;
topic_id.topic_id.entity_id = subscription_info_.entity_id;
Expand All @@ -763,7 +752,7 @@ namespace eCAL
{
v5::SPubEventCallbackData event_data;
event_data.type = type_;
event_data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
event_data.time = eCAL::Time::GetMicroSeconds();
event_data.clock = 0;
event_data.tid = std::to_string(subscription_info_.entity_id);
event_data.tdatatype = data_type_info_;
Expand All @@ -779,11 +768,6 @@ namespace eCAL
FireEvent(ePublisherEvent::connected, subscription_info_, data_type_info_);
}

void CPublisherImpl::FireUpdateEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(ePublisherEvent::update_connection, subscription_info_, data_type_info_);
}

void CPublisherImpl::FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(ePublisherEvent::disconnected, subscription_info_, data_type_info_);
Expand Down
3 changes: 1 addition & 2 deletions ecal/core/src/pubsub/ecal_publisher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ namespace eCAL
void FireEvent(const ePublisherEvent type_, const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_);

void FireConnectEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_);
void FireUpdateEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_);
void FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_);

size_t GetConnectionCount();
Expand Down Expand Up @@ -168,7 +167,7 @@ namespace eCAL
EventCallbackMapT m_event_callback_map;

std::mutex m_event_id_callback_mutex;
PubEventCallbackT m_event_id_callback;
PubEventCallbackT m_event_id_callback;

long long m_id = 0;
long long m_clock = 0;
Expand Down
77 changes: 22 additions & 55 deletions ecal/core/src/pubsub/ecal_subscriber_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <ecal/ecal_config.h>
#include <ecal/ecal_log.h>
#include <ecal/ecal_process.h>
#include <ecal/ecal_time.h>

#if ECAL_CORE_REGISTRATION
#include "registration/ecal_registration_provider.h"
Expand Down Expand Up @@ -308,8 +309,7 @@ namespace eCAL
#endif

// add key to connection map, including connection state
bool is_new_connection = false;
bool is_updated_connection = false;
bool is_new_connection = false;
{
const std::lock_guard<std::mutex> lock(m_connection_map_mtx);
auto publication_info_iter = m_connection_map.find(publication_info_);
Expand All @@ -330,11 +330,6 @@ namespace eCAL
{
is_new_connection = true;
}
// the connection was active, so we just update it
else
{
is_updated_connection = true;
}

// update the data type and layer states, even if the connection is not new
connection = SConnection{ data_type_info_, pub_layer_states_, true };
Expand All @@ -350,11 +345,6 @@ namespace eCAL
// fire connect event
FireConnectEvent(publication_info_, data_type_info_);
}
else if (is_updated_connection)
{
// fire update event
FireUpdateEvent(publication_info_, data_type_info_);
}

#ifndef NDEBUG
Logging::Log(Logging::log_level_debug3, m_attributes.topic_name + "::CSubscriberImpl::ApplyPublisherRegistration");
Expand Down Expand Up @@ -497,7 +487,11 @@ namespace eCAL
// - a dropped message
// - an out-of-order message
// - a multiple sent message
if (!CheckMessageClock(topic_info_.tid, clock_))
SPublicationInfo publication_info;
publication_info.entity_id = topic_info_.tid;
publication_info.host_name = topic_info_.hname;
publication_info.process_id = topic_info_.pid;
if (!CheckMessageClock(publication_info, clock_))
{
// we will not process that message
return(0);
Expand Down Expand Up @@ -788,10 +782,9 @@ namespace eCAL
if (m_event_id_callback)
{
SSubEventCallbackData data;
data.type = type_;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
data.tdatatype = data_type_info_;
data.event_type = type_;
data.event_time = eCAL::Time::GetMicroSeconds();
data.publisher_datatype = data_type_info_;

Registration::STopicId topic_id;
topic_id.topic_id.entity_id = publication_info_.entity_id;
Expand All @@ -812,7 +805,7 @@ namespace eCAL
{
v5::SSubEventCallbackData data;
data.type = type_;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.time = eCAL::Time::GetMicroSeconds();
data.clock = 0;
data.tid = std::to_string(publication_info_.entity_id);
data.tdatatype = data_type_info_;
Expand All @@ -828,14 +821,14 @@ namespace eCAL
FireEvent(eSubscriberEvent::connected, publication_info_, data_type_info_);
}

void CSubscriberImpl::FireUpdateEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
void CSubscriberImpl::FireDisconnectEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(eSubscriberEvent::update_connection, publication_info_, data_type_info_);
FireEvent(eSubscriberEvent::disconnected, publication_info_, data_type_info_);
}

void CSubscriberImpl::FireDisconnectEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
void CSubscriberImpl::FireDroppedEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_)
{
FireEvent(eSubscriberEvent::disconnected, publication_info_, data_type_info_);
FireEvent(eSubscriberEvent::dropped, publication_info_, data_type_info_);
}

size_t CSubscriberImpl::GetConnectionCount()
Expand All @@ -852,14 +845,14 @@ namespace eCAL
return count;
}

bool CSubscriberImpl::CheckMessageClock(const Registration::EntityIdT& tid_, long long current_clock_)
bool CSubscriberImpl::CheckMessageClock(const SPublicationInfo& publication_info_, long long current_clock_)
{
auto iter = m_writer_counter_map.find(tid_);
auto iter = m_writer_counter_map.find(publication_info_.entity_id);

// initial entry
if (iter == m_writer_counter_map.end())
{
m_writer_counter_map[tid_] = current_clock_;
m_writer_counter_map[publication_info_.entity_id] = current_clock_;
return true;
}
// clock entry exists
Expand Down Expand Up @@ -906,37 +899,11 @@ namespace eCAL
msg += "\')";
Logging::Log(log_level_warning, msg);
#endif
// new event handling with topic id
if (m_event_id_callback)
{
SSubEventCallbackData data;
data.type = eSubscriberEvent::dropped;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = current_clock_;

Registration::STopicId topic_id;
topic_id.topic_name = m_attributes.topic_name;
const std::lock_guard<std::mutex> lock(m_event_id_callback_mutex);

// call event callback
m_event_id_callback(topic_id, data);
}

// deprecated event handling with topic name
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_mutex);
auto citer = m_event_callback_map.find(eSubscriberEvent::dropped);
if (citer != m_event_callback_map.end() && citer->second)
{
v5::SSubEventCallbackData data;
data.type = eSubscriberEvent::dropped;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = current_clock_;

// call event callback
(citer->second)(m_attributes.topic_name.c_str(), &data);
}
}
// fire dropped event
// we do not know the data type of the dropped message here
// so we use an empty data type information
FireDroppedEvent(publication_info_, SDataTypeInformation());

// increase the drop counter
m_message_drops += clock_difference;
Expand Down
6 changes: 3 additions & 3 deletions ecal/core/src/pubsub/ecal_subscriber_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ namespace eCAL
void FireEvent(const eSubscriberEvent type_, const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_);

void FireConnectEvent (const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_);
void FireUpdateEvent (const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_);
void FireDisconnectEvent(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_);

void FireDroppedEvent (const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_);

size_t GetConnectionCount();

bool CheckMessageClock(const Registration::EntityIdT& tid_, long long current_clock_);
bool CheckMessageClock(const SPublicationInfo& publication_info_, long long current_clock_);

int32_t GetFrequency();

Expand Down
4 changes: 1 addition & 3 deletions lang/c/core/src/ecal_publisher_cimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ namespace
case eCAL::ePublisherEvent::connected: return pub_event_connected;
case eCAL::ePublisherEvent::disconnected: return pub_event_disconnected;
case eCAL::ePublisherEvent::dropped: return pub_event_dropped;
case eCAL::ePublisherEvent::update_connection: return pub_event_update_connection;
default: return pub_event_none;
default: return pub_event_none;
}
}

Expand All @@ -50,7 +49,6 @@ namespace
case pub_event_connected: return eCAL::ePublisherEvent::connected;
case pub_event_disconnected: return eCAL::ePublisherEvent::disconnected;
case pub_event_dropped: return eCAL::ePublisherEvent::dropped;
case pub_event_update_connection: return eCAL::ePublisherEvent::update_connection;
default: return eCAL::ePublisherEvent::none;
}
}
Expand Down
6 changes: 1 addition & 5 deletions lang/c/core/src/ecal_subscriber_cimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ namespace
case eCAL::eSubscriberEvent::connected: return sub_event_connected;
case eCAL::eSubscriberEvent::disconnected: return sub_event_disconnected;
case eCAL::eSubscriberEvent::dropped: return sub_event_dropped;
case eCAL::eSubscriberEvent::corrupted: return sub_event_corrupted;
case eCAL::eSubscriberEvent::update_connection: return sub_event_update_connection;
default: return sub_event_none;
default: return sub_event_none;
}
}

Expand All @@ -51,8 +49,6 @@ namespace
case sub_event_connected: return eCAL::eSubscriberEvent::connected;
case sub_event_disconnected: return eCAL::eSubscriberEvent::disconnected;
case sub_event_dropped: return eCAL::eSubscriberEvent::dropped;
case sub_event_corrupted: return eCAL::eSubscriberEvent::corrupted;
case sub_event_update_connection: return eCAL::eSubscriberEvent::update_connection;
default: return eCAL::eSubscriberEvent::none;
}
}
Expand Down
Loading
Loading