Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] ported CMsgPublisher/Subscriber to v6 #1944

Merged
merged 17 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/mon/mon_cli/src/ecal_mon_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ void ProcEcho(const std::string& topic_name, int msg_count)
eCAL::string::CSubscriber<std::string> sub(topic_name);
std::atomic<int> cnt(msg_count);
auto msg_cb = [&cnt](const std::string& msg_) { if (cnt != 0) { std::cout << msg_ << std::endl; if (cnt > 0) cnt--; } };
sub.AddReceiveCallback(std::bind(msg_cb, std::placeholders::_2));
sub.SetReceiveCallback(std::bind(msg_cb, std::placeholders::_2));

while(eCAL::Ok() && (cnt != 0))
{
Expand All @@ -357,7 +357,7 @@ void ProcProto(const std::string& topic_name, int msg_count)
eCAL::protobuf::CDynamicSubscriber sub(topic_name);
std::atomic<int> cnt(msg_count);
auto msg_cb = [&cnt](const std::shared_ptr<google::protobuf::Message>& msg_) { if (cnt != 0) { std::cout << msg_->DebugString() << std::endl; if (cnt > 0) cnt--; } };
sub.AddReceiveCallback(std::bind(msg_cb, std::placeholders::_2));
sub.SetReceiveCallback(std::bind(msg_cb, std::placeholders::_2));

// enter main loop
while(eCAL::Ok() && (cnt != 0))
Expand Down
16 changes: 8 additions & 8 deletions app/mon/mon_plugins/capnproto_reflection/src/plugin_widget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ PluginWidget::PluginWidget(const QString& topic_name, const QString& topic_type,
ui_.publish_timestamp_warning_label->setVisible(false);

// Add eCAL Callbacks
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.AddErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));

// Button connections
connect(ui_.expand_button, &QPushButton::clicked, [this]() { tree_view_->expandAll(); });
Expand Down Expand Up @@ -102,8 +102,8 @@ PluginWidget::~PluginWidget() noexcept
qDebug().nospace() << "[" << metaObject()->className() << "]: Deleting Widget for topic " << topic_name_;
#endif // NDEBUG

subscriber_.RemReceiveCallback();
//subscriber_.RemErrorCallback();
subscriber_.RemoveReceiveCallback();
//subscriber_.RemoveErrorCallback();

{
std::lock_guard<std::mutex> lock(capnproto_message_mutex_);
Expand Down Expand Up @@ -322,14 +322,14 @@ void PluginWidget::onUpdate()
void PluginWidget::onResume()
{
// Add eCAL Callbacks
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.AddErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
}

void PluginWidget::onPause()
{
subscriber_.RemReceiveCallback();
subscriber_.RemErrorCallback();
subscriber_.RemoveReceiveCallback();
subscriber_.RemoveErrorCallback();
}

QWidget* PluginWidget::getWidget()
Expand Down
16 changes: 8 additions & 8 deletions app/mon/mon_plugins/protobuf_reflection/src/plugin_widget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ PluginWidget::PluginWidget(const QString& topic_name, const QString& topic_type,
ui_.publish_timestamp_warning_label->setVisible(false);

// Add eCAL Callbacks
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.AddErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));

// Button connections
connect(ui_.expand_button, &QPushButton::clicked, [this]() { tree_view_->expandAll(); });
Expand Down Expand Up @@ -105,8 +105,8 @@ PluginWidget::~PluginWidget()
qDebug().nospace() << "[" << PluginWidget::metaObject()->className() << "]: Deleting Widget for topic " << topic_name_;
#endif // NDEBUG

subscriber_.RemReceiveCallback();
subscriber_.RemErrorCallback();
subscriber_.RemoveReceiveCallback();
subscriber_.RemoveErrorCallback();

{
std::lock_guard<std::mutex> lock(proto_message_mutex_);
Expand Down Expand Up @@ -319,14 +319,14 @@ void PluginWidget::onUpdate()
void PluginWidget::onResume()
{
// Add eCAL Callbacks
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.AddErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
}

void PluginWidget::onPause()
{
subscriber_.RemReceiveCallback();
subscriber_.RemErrorCallback();
subscriber_.RemoveReceiveCallback();
subscriber_.RemoveErrorCallback();
}

QWidget* PluginWidget::getWidget()
Expand Down
8 changes: 4 additions & 4 deletions app/mon/mon_plugins/signals_plotting/src/plugin_widget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ PluginWidget::PluginWidget(const QString& topic_name, const QString& topic_type,
ui_.publish_timestamp_warning_label->setVisible(false);

// Add eCAL Callbacks
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.AddErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::onProtoMessageCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetErrorCallback(std::bind(&PluginWidget::onProtoErrorCallback, this, std::placeholders::_1));

// Button connections
connect(ui_.expand_button, &QPushButton::clicked, [this]() { tree_view_->expandAll(); });
Expand Down Expand Up @@ -127,8 +127,8 @@ PluginWidget::~PluginWidget()
qDebug().nospace() << "[" << PluginWidget::metaObject()->className() << "]: Deleting Widget for topic " << topic_name_;
#endif // NDEBUG

subscriber_.RemReceiveCallback();
subscriber_.RemErrorCallback();
subscriber_.RemoveReceiveCallback();
subscriber_.RemoveErrorCallback();

{
std::lock_guard<std::mutex> lock(proto_message_mutex_);
Expand Down
8 changes: 4 additions & 4 deletions app/mon/mon_plugins/string_reflection/src/plugin_widget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ PluginWidget::PluginWidget(const QString& topic_name, const QString&, QWidget* p
ui_.content_layout->addWidget(text_edit_);

// Connect the eCAL Subscriber
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::ecalMessageReceivedCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::ecalMessageReceivedCallback, this, std::placeholders::_2, std::placeholders::_3));
}

PluginWidget::~PluginWidget()
{
subscriber_.RemReceiveCallback();
subscriber_.RemoveReceiveCallback();
}

void PluginWidget::ecalMessageReceivedCallback(const std::string& message, long long publish_timestamp_usecs)
Expand Down Expand Up @@ -115,12 +115,12 @@ void PluginWidget::onUpdate()

void PluginWidget::onResume()
{
subscriber_.AddReceiveCallback(std::bind(&PluginWidget::ecalMessageReceivedCallback, this, std::placeholders::_2, std::placeholders::_3));
subscriber_.SetReceiveCallback(std::bind(&PluginWidget::ecalMessageReceivedCallback, this, std::placeholders::_2, std::placeholders::_3));
}

void PluginWidget::onPause()
{
subscriber_.RemReceiveCallback();
subscriber_.RemoveReceiveCallback();
}

void PluginWidget::updateStringMessageView()
Expand Down
2 changes: 1 addition & 1 deletion app/mon/mon_tui/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ int main(int argc, char** argv)
config.logging.receiver.enable = true;

auto status = eCAL::Initialize(config, "eCALMon TUI", eCAL::Init::Default | eCAL::Init::Monitoring);
if (status == -1) std::cerr << "Failed to init" << std::endl;
if (status == false) std::cerr << "Failed to init" << std::endl;
rex-schilasky marked this conversation as resolved.
Show resolved Hide resolved
rex-schilasky marked this conversation as resolved.
Show resolved Hide resolved
eCAL::Process::SetState(eCAL::Process::eSeverity::healthy, eCAL::Process::eSeverityLevel::level1, "Running");

TUI::Start(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ProtoMessageVisualizationViewModel : public MessageVisualizationViewModel
: subscriber{topic}
{
using namespace std::placeholders;
subscriber.AddReceiveCallback(std::bind(&ProtoMessageVisualizationViewModel::OnMessage, this, _2, _3));
subscriber.SetReceiveCallback(std::bind(&ProtoMessageVisualizationViewModel::OnMessage, this, _2, _3));
}

ProtectedMessage message() const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class StringMessageVisualizationViewModel : public MessageVisualizationViewModel
: subscriber{topic}
{
using namespace std::placeholders;
subscriber.AddReceiveCallback(std::bind(&StringMessageVisualizationViewModel::OnMessage, this, _2, _3));
subscriber.SetReceiveCallback(std::bind(&StringMessageVisualizationViewModel::OnMessage, this, _2, _3));
}

std::string message() const
Expand Down
7 changes: 2 additions & 5 deletions app/play/play_core/src/measurement_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,6 @@ void MeasurementContainer::CreatePublishers(const std::map<std::string, std::str
void MeasurementContainer::DeInitializePublishers()
{
// Clear the publisher map
for (auto& publisher_info : publisher_map_)
{
publisher_info.second.publisher_.Destroy();
}
publisher_map_.clear();

// Remove pointers to publishers from all frames
Expand Down Expand Up @@ -219,7 +215,8 @@ bool MeasurementContainer::PublishFrame(long long index)
{
timestamp_usecs = std::chrono::duration_cast<std::chrono::microseconds>(frame_table_[index].send_timestamp_.time_since_epoch()).count();
}
frame_table_[index].publisher_info_->publisher_.SetID(frame_table_[index].send_id_);
// this is not supported by the eCAL v6 API
//frame_table_[index].publisher_info_->publisher_.SetID(frame_table_[index].send_id_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this in any way be problematic?
So with eCAL 5, publishers will have the same ID they had when originally recording the measurement. In eCAL 6, they won't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its about the topic id for subscriber side filtering. This feature is rarely used and no way to support this in the future, it's removed on both sides (pub and sub). So in v5 I can filter out different "streams" of the the same topic on subscriber side in v6 the connection will just receive all messages and I need to implement an ID myself (for example as part of my protobuf description).

frame_table_[index].publisher_info_->publisher_.Send(send_buffer_, data_size, timestamp_usecs);
frame_table_[index].publisher_info_->message_counter_++;
return true;
Expand Down
4 changes: 2 additions & 2 deletions app/play/play_core/src/measurement_container.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <memory>

#include <ecal/ecal.h>
#include <ecal/v5/ecal_publisher.h>
#include <ecal/pubsub/publisher.h>
#include <ecalhdf5/eh5_meas.h>

#include "continuity_report.h"
Expand Down Expand Up @@ -91,7 +91,7 @@ class MeasurementContainer
private:
struct PublisherInfo
{
eCAL::v5::CPublisher publisher_;
eCAL::CPublisher publisher_;
long long message_counter_;

PublisherInfo(const std::string& topic_name, const eCAL::SDataTypeInformation& info_)
Expand Down
3 changes: 3 additions & 0 deletions app/rec/rec_server_cli/src/ecal_rec_server_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ std::unique_ptr<eCAL::rec_cli::command::Record> record_command;
int main(int argc, char** argv)
{
#ifdef WIN32
(void)argc; // suppress unused warning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why surpress it only for Windows? also maybe just comment in the function declaration int main(int /*argc*/, char** /*argv*/) (also this has nothing to do with this PR!!! 😏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For LINUX platform both arguments are used unfortunately and yes it's not really related but I tried to fix at least some warnings when I touched a file.

(void)argv; // suppress unused warning

EcalUtils::WinCpChanger win_cp_changer(CP_UTF8); // The WinCpChanger will set the Codepage back to the original, once destroyed
#endif // WIN32

Expand Down
6 changes: 3 additions & 3 deletions app/sys/sys_gui/src/widgets/mmawidget/mma_host_item.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ MmaHostItem::MmaHostItem(QTreeWidget* tree_widget, const QString& hostname)
// Create eCAL Subscriber
mma_subscriber = std::unique_ptr<eCAL::protobuf::CSubscriber<eCAL::pb::mma::State>>
(new eCAL::protobuf::CSubscriber<eCAL::pb::mma::State>("machine_state_" + hostname_.toStdString()));
mma_subscriber->AddReceiveCallback(std::bind(&MmaHostItem::mmaReceivedCallback, this, std::placeholders::_1, std::placeholders::_2));
mma_subscriber->SetReceiveCallback(std::bind(&MmaHostItem::mmaReceivedCallback, this, std::placeholders::_1, std::placeholders::_2));

// Register custom Type in order to directly pass the monitoring state
qRegisterMetaType<eCAL::pb::mma::State>("eCAL::pb::mma::State");
Expand Down Expand Up @@ -227,10 +227,10 @@ void MmaHostItem::disable()
setEnabled(false);
}

void MmaHostItem::mmaReceivedCallback(const char* /*topic_name*/, const eCAL::pb::mma::State& state)
void MmaHostItem::mmaReceivedCallback(const eCAL::STopicId& /*topic_id_*/, const eCAL::pb::mma::State& state_)
{
// only emit the signal in order to make use of the Qt event loop
emit mmaReceivedSignal(state);
emit mmaReceivedSignal(state_);
}

void MmaHostItem::machineStateChanged(eCAL::pb::mma::State state)
Expand Down
2 changes: 1 addition & 1 deletion app/sys/sys_gui/src/widgets/mmawidget/mma_host_item.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public slots:

QTimer* deactivation_timer;

void mmaReceivedCallback(const char* topic_name, const eCAL::pb::mma::State& state);
void mmaReceivedCallback(const eCAL::STopicId& topic_id_, const eCAL::pb::mma::State& state_);

QString normalizedDataAsString(unsigned long long bytes);

Expand Down
18 changes: 8 additions & 10 deletions contrib/ecaltime/simtime/src/ecal_time_simtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,20 @@ eCAL::CSimTime::CSimTime() :
bool eCAL::CSimTime::initialize()
{
std::unique_lock<std::mutex> lk(initialize_mutex);
if (!is_initialized) {
if (!is_initialized)
{
//eCAL::Initialize("ecal_sim_time_listener", eCAL::Init::Subscriber);
// this has to be done by the parent process
// needs to be fixed with an improved reference counting
// in eCAL::Initialize ..

if (sim_time_subscriber.Create("__sim_time__")) {
sim_time_subscriber.AddReceiveCallback(std::bind(&eCAL::CSimTime::onSimTimeMessage, this, std::placeholders::_2));
is_initialized = true;
}
else {
is_initialized = false;
}
return is_initialized;
sim_time_subscriber = std::make_unique<eCAL::protobuf::CSubscriber<eCAL::pb::SimTime>>("__sim_time__");
sim_time_subscriber->SetReceiveCallback(std::bind(&eCAL::CSimTime::onSimTimeMessage, this, std::placeholders::_2));
is_initialized = true;
return true;
}
else {
else
{
return false;
}
}
Expand Down
2 changes: 1 addition & 1 deletion contrib/ecaltime/simtime/src/ecal_time_simtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ namespace eCAL
bool first_message_received; /**< Whether we received at least one Message (used for the status message)*/
eCAL::pb::SimTime::eState play_state; /**< Current state (used for the status message)*/

eCAL::protobuf::CSubscriber<eCAL::pb::SimTime> sim_time_subscriber; /**< Subscriber for getting simulation timestamps */
std::unique_ptr<eCAL::protobuf::CSubscriber<eCAL::pb::SimTime>> sim_time_subscriber; /**< Subscriber for getting simulation timestamps */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of don't like that now we're "forcing" code to use pointers.
Do you now have to check the pointers in other locations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without a Create/Destroy interface and the removed empty default constructors I see no alternative here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think about error handling for all communication entities (pub/sub/server/client) in case construction fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the thing is, construction is unlikely to fail. Failure is more likely to occur during the runtime. E.g. The connections (memory files, sockets, ...) are usually not created during construction, but when we receive monitoring information about other entities in the system.
So we have to find a way to deal with this effectively.
Maybe error/event callbacks would be interesting in that case, too.


std::mutex time_mutex; /**< Mutex for computing the current simulation time */
long long last_measurement_time; /**< Last received simulation time */
Expand Down
2 changes: 1 addition & 1 deletion contrib/mma/src/mma_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ int main(int argc, char** argv)
std::cout << app_version_header << std::endl << ecal_version_header << std::endl << std::endl;

// initialize eCAL API
if (eCAL::Initialize(MMA_APPLICATION_NAME, eCAL::Init::Publisher) < 0)
if (eCAL::Initialize(MMA_APPLICATION_NAME, eCAL::Init::Publisher))
{
std::cout << "eCAL initialization failed !";
return 1;
Expand Down
Loading
Loading