From 28145db53c9d09bb8bb3d314fe6d3c0e07f39a20 Mon Sep 17 00:00:00 2001 From: Florian Reimold <11774314+FlorianReimold@users.noreply.github.com> Date: Mon, 6 Nov 2023 14:40:43 +0100 Subject: [PATCH] Added perftool --- .../cpp/benchmarks/perftool/CMakeLists.txt | 52 ++++ samples/cpp/benchmarks/perftool/Readme.md | 52 ++++ samples/cpp/benchmarks/perftool/src/main.cpp | 249 ++++++++++++++++++ .../cpp/benchmarks/perftool/src/publisher.cpp | 180 +++++++++++++ .../cpp/benchmarks/perftool/src/publisher.h | 85 ++++++ .../perftool/src/publisher_statistics.h | 129 +++++++++ .../benchmarks/perftool/src/subscriber.cpp | 153 +++++++++++ .../cpp/benchmarks/perftool/src/subscriber.h | 91 +++++++ .../perftool/src/subscriber_statistics.h | 113 ++++++++ 9 files changed, 1104 insertions(+) create mode 100644 samples/cpp/benchmarks/perftool/CMakeLists.txt create mode 100644 samples/cpp/benchmarks/perftool/Readme.md create mode 100644 samples/cpp/benchmarks/perftool/src/main.cpp create mode 100644 samples/cpp/benchmarks/perftool/src/publisher.cpp create mode 100644 samples/cpp/benchmarks/perftool/src/publisher.h create mode 100644 samples/cpp/benchmarks/perftool/src/publisher_statistics.h create mode 100644 samples/cpp/benchmarks/perftool/src/subscriber.cpp create mode 100644 samples/cpp/benchmarks/perftool/src/subscriber.h create mode 100644 samples/cpp/benchmarks/perftool/src/subscriber_statistics.h diff --git a/samples/cpp/benchmarks/perftool/CMakeLists.txt b/samples/cpp/benchmarks/perftool/CMakeLists.txt new file mode 100644 index 0000000000..ee5e4cdea1 --- /dev/null +++ b/samples/cpp/benchmarks/perftool/CMakeLists.txt @@ -0,0 +1,52 @@ +# ========================= eCAL LICENSE ================================= +# +# Copyright (C) 2023 Continental Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ========================= eCAL LICENSE ================================= + +cmake_minimum_required(VERSION 3.13) +set(CMAKE_FIND_PACKAGE_PREFER_CONFIG ON) + +project(perftool) + +find_package(eCAL REQUIRED) + +set(source_files + src/main.cpp + src/publisher.cpp + src/publisher.h + src/publisher_statistics.h + src/subscriber.cpp + src/subscriber.h + src/subscriber_statistics.h +) + +#add_executable(${PROJECT_NAME} ${source_files}) +ecal_add_sample(${PROJECT_NAME} ${source_files}) + +target_link_libraries(${PROJECT_NAME} + eCAL::core +) + +target_include_directories(${PROJECT_NAME} PRIVATE src) + +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17) + +source_group(TREE "${CMAKE_CURRENT_LIST_DIR}" + FILES + ${source_files} +) + +ecal_install_sample(${PROJECT_NAME}) \ No newline at end of file diff --git a/samples/cpp/benchmarks/perftool/Readme.md b/samples/cpp/benchmarks/perftool/Readme.md new file mode 100644 index 0000000000..b3372a7b61 --- /dev/null +++ b/samples/cpp/benchmarks/perftool/Readme.md @@ -0,0 +1,52 @@ +# ecal-perftool + +The ecal-perftool is a simple application to estimate the performance of eCAL pub-sub connections using dummy-data being published a at a constant frequency. + +## Usage + +``` +Usage: + ecal_sample_perftool pub [options] +or: + ecal_sample_perftool sub [callback_delay_ms] [options] + +Options: + -q, --quiet: Do not print any output + -v, --verbose: Print all measured times for all messages + --busy-wait: Busy wait when receiving messages (i.e. burn CPU). For subscribers only. + --hickup : Further delay a single callback. For subscribers only. +``` + +## Output + +**Publisher**: + +``` +[ 78436.510] | cnt: 9 | loop_dt(ms) mean: 99.954 [ 99.587, 100.001] | loop_freq(Hz): 10.0 | snd_dt(ms) mean: 0.001 [ 0.001, 0.001] +[ 78437.525] | cnt: 10 | loop_dt(ms) mean: 100.000 [ 99.999, 100.001] | loop_freq(Hz): 10.0 | snd_dt(ms) mean: 0.001 [ 0.000, 0.001] +[ 78438.538] | cnt: 10 | loop_dt(ms) mean: 100.001 [ 99.999, 100.013] | loop_freq(Hz): 10.0 | snd_dt(ms) mean: 0.001 [ 0.000, 0.001] +[ 78439.545] | cnt: 10 | loop_dt(ms) mean: 99.999 [ 99.987, 100.002] | loop_freq(Hz): 10.0 | snd_dt(ms) mean: 0.001 [ 0.001, 0.001] +[ 78440.551] | cnt: 10 | loop_dt(ms) mean: 100.000 [ 99.999, 100.001] | loop_freq(Hz): 10.0 | snd_dt(ms) mean: 0.001 [ 0.001, 0.001] +``` + +- `[ xxx]`: Log system time +- `cnt`: Amount of messages sent since last log output +- `loop_dt`: Duration of publishing loop, consisting of mean `mean [min, max]` in milliseconds +- `loop_freq`: computed loop frequency in Hz +- `snd_dt`: Duration of the eCAL `CPublisher::Send()` call only, consisting of `mean [min, max]` in milliseconds + +**Subscriber** + +``` +[ 78927.089] | cnt: 10 | lost: 0 | msg_dt(ms) mean: 99.997 [ 99.967, 100.019] | msg_freq(Hz): 10.0 +[ 78928.103] | cnt: 10 | lost: 0 | msg_dt(ms) mean: 100.000 [ 99.964, 100.031] | msg_freq(Hz): 10.0 +[ 78929.104] | cnt: 10 | lost: 0 | msg_dt(ms) mean: 99.998 [ 99.966, 100.039] | msg_freq(Hz): 10.0 +[ 78930.117] | cnt: 10 | lost: 0 | msg_dt(ms) mean: 100.001 [ 99.993, 100.010] | msg_freq(Hz): 10.0 +[ 78931.132] | cnt: 10 | lost: 0 | msg_dt(ms) mean: 100.000 [ 99.966, 100.030] | msg_freq(Hz): 10.0 +``` + +- `[ xxx]`: Log system time +- `cnt`: Amount of received sent since last log output +- `lost`: Amount of dropped messages since the last log output. Determined by comparing the native eCAL message counter of each message to the previous. +- `msg_dt`: Duration between the received messages, consisting of `mean [min, max]` in milliseconds +- `msg_freq`: Computed message frequency in Hz diff --git a/samples/cpp/benchmarks/perftool/src/main.cpp b/samples/cpp/benchmarks/perftool/src/main.cpp new file mode 100644 index 0000000000..de88ae405b --- /dev/null +++ b/samples/cpp/benchmarks/perftool/src/main.cpp @@ -0,0 +1,249 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2023 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +#include +#include +#include +#include // IWYU pragma: keep + +#include "publisher.h" +#include "subscriber.h" + +#include +#include +#include +#include +#include +#include +#include + +#ifdef WIN32 + +#define WIN32_LEAN_AND_MEAN +#define NOMINMAX +#include +#endif + + +void printUsage(const std::string& arg0) +{ + std::cout << "Usage:" << std::endl; + std::cout << " " << arg0 << " pub [options]" << std::endl; + std::cout << "or:" << std::endl; + std::cout << " " << arg0 << " sub [callback_delay_ms] [options]" << std::endl; + std::cout << std::endl; + std::cout << "Options:" << std::endl; + std::cout << " -q, --quiet: Do not print any output" << std::endl; + std::cout << " -v, --verbose: Print all measured times for all messages" << std::endl; + std::cout << " --busy-wait: Busy wait when receiving messages (i.e. burn CPU). For subscribers only." << std::endl; + std::cout << " --hickup : Further delay a single callback. For subscribers only." << std::endl; + +} + +int main(int argc, char** argv) +{ +#ifdef WIN32 + SetConsoleOutputCP(CP_UTF8); +#endif // WIN32 + + bool quiet_arg = false; + bool verbose_print_times = false; + bool busy_wait_arg = false; + + bool hickup_arg = false; + std::chrono::steady_clock::duration hickup_time (0); + std::chrono::steady_clock::duration hickup_delay(0); + + // convert argc, argv to vector of strings + std::vector args; + args.reserve(static_cast(argc)); + for (int i = 0; i < argc; ++i) + { + args.emplace_back(argv[i]); + } + + // Check for -h / --help + if (args.size() < 2 + || std::find(args.begin(), args.end(), "-h") != args.end() + || std::find(args.begin(), args.end(), "--help") != args.end()) + { + printUsage(args[0]); + return 0; + } + + // find "--hickup" argument and remove it from args + { + auto hickup_arg_it = std::find(args.begin(), args.end(), "--hickup"); + if (hickup_arg_it != args.end()) + { + hickup_arg = true; + + // Check if there are enough arguments for the time and delay after the hickup_arg_it and parse those as doubles + if (args.size() < static_cast(std::distance(args.begin(), hickup_arg_it) + 3)) + { + std::cerr << "Invalid number of parameters after --hickup" << std::endl; + printUsage(args[0]); + return 1; + } + else + { + try + { + // Parse the next two arguments as double + const double hickup_time_ms = std::stod(*(std::next(hickup_arg_it, 1))); + const double hickup_delay_ms = std::stod(*(std::next(hickup_arg_it, 2))); + + hickup_time = std::chrono::duration_cast(std::chrono::duration(hickup_time_ms)); + hickup_delay = std::chrono::duration_cast(std::chrono::duration(hickup_delay_ms)); + } + catch (const std::exception& e) + { + std::cerr << "Failed parsing parameters after --hickup: " << e.what() << std::endl; + printUsage(args[0]); + return 1; + } + + // Remove all 3 parameters + args.erase(hickup_arg_it, std::next(hickup_arg_it, 3)); + } + } + } + + // find "--quiet" argument and remove it from args + { + auto quiet_arg_it = std::find(args.begin(), args.end(), "--quiet"); + if (quiet_arg_it != args.end()) + { + quiet_arg = true; + args.erase(quiet_arg_it); + } + } + + // find "-q" argument and remove it from args + { + auto q_arg_it = std::find(args.begin(), args.end(), "-q"); + if (q_arg_it != args.end()) + { + quiet_arg = true; + args.erase(q_arg_it); + } + } + + // find "--verbose" argument and remove it from args + { + auto verbose_arg_it = std::find(args.begin(), args.end(), "--verbose"); + if (verbose_arg_it != args.end()) + { + verbose_print_times = true; + args.erase(verbose_arg_it); + } + } + + // find "-v" argument and remove it from args + { + auto v_arg_it = std::find(args.begin(), args.end(), "-v"); + if (v_arg_it != args.end()) + { + verbose_print_times = true; + args.erase(v_arg_it); + } + } + + // Validate quite and verbose args + if (quiet_arg && verbose_print_times) + { + std::cerr << "Invalid arguments: Cannot use \"quiet\" and \"verbose\" simultaneously" << std::endl; + printUsage(argv[0]); + return 1; + } + + // find "--busy-wait" argument and remove it from args + { + auto busy_wait_arg_it = std::find(args.begin(), args.end(), "--busy-wait"); + if (busy_wait_arg_it != args.end()) + { + busy_wait_arg = true; + args.erase(busy_wait_arg_it); + } + } + + if (args[1] == "pub") + { + if (args.size() != 5) + { + std::cerr << "Invalid number of parameters" << std::endl; + printUsage(args[0]); + return 1; + } + const std::string topic_name = args[2]; + const double frequency_hz = std::stod(args[3]); + const unsigned long long payload_size_bytes = std::stoull(args[4]); + + // Initialize eCAL + eCAL::Initialize(argc, argv, "ecal-perftool"); + eCAL::Util::EnableLoopback(true); + + const Publisher publisher(topic_name, frequency_hz, payload_size_bytes, quiet_arg, verbose_print_times); + + // Just don't exit + while (eCAL::Ok()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + // finalize eCAL API + eCAL::Finalize(); + } + else if (args[1] == "sub") + { + if (args.size() < 3 || args.size() > 4) + { + std::cerr << "Invalid number of parameters" << std::endl; + printUsage(args[0]); + return 1; + } + + const std::string topic_name = args[2]; + const std::chrono::milliseconds callback_delay((args.size() >= 4 ? std::stoull(args[3]) : 0)); + + + // Initialize eCAL + eCAL::Initialize(argc, argv, "ecal-perftool"); + eCAL::Util::EnableLoopback(true); + + const Subscriber subscriber(topic_name, callback_delay, busy_wait_arg, hickup_arg, hickup_time, hickup_delay, quiet_arg, verbose_print_times); + + // Just don't exit + while (eCAL::Ok()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + // finalize eCAL API + eCAL::Finalize(); + } + else + { + std::cerr << "Invalid parameter: " << args[1] << std::endl; + printUsage(args[0]); + return 1; + } + + return 0; +} diff --git a/samples/cpp/benchmarks/perftool/src/publisher.cpp b/samples/cpp/benchmarks/perftool/src/publisher.cpp new file mode 100644 index 0000000000..0da97bb19b --- /dev/null +++ b/samples/cpp/benchmarks/perftool/src/publisher.cpp @@ -0,0 +1,180 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2023 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + + +#include "publisher.h" + +#include +#include +#include +#include +#include +#include +#include + +#include // IWYU pragma: keep + +#include "publisher_statistics.h" + +#ifdef WIN32 + #define NOMINMAX + #define WIN32_LEAN_AND_MEAN + #include // IWYU pragma: keep +#else + #include +#endif // WIN32 + +Publisher::Publisher(const std::string& topic_name, double frequency, std::size_t payload_size, bool quiet, bool log_print_verbose_times) + : ecal_pub (topic_name) + , frequency_ (frequency) + , is_interrupted_ (false) + , payload_ (payload_size) + , next_deadline_ (std::chrono::steady_clock::now() + period_) + , period_ (std::chrono::nanoseconds(static_cast(1e9 / frequency))) + , log_print_verbose_times_(log_print_verbose_times) +{ + statistics_.reserve(static_cast((frequency + 1.0) * 1.2)); + + // Start the thread + publisher_thread_ = std::make_unique([this](){ this->loop(); }); + + if (!quiet) + statistics_thread_ = std::make_unique([this](){ this->statisticsLoop(); }); +} + +// Destructor +Publisher::~Publisher() +{ + // Interrupt the thread + { + const std::unique_lock lock(mutex_); + is_interrupted_ = true; + condition_variable_.notify_all(); + } + + // Join the thread + publisher_thread_->join(); + + if (statistics_thread_) + statistics_thread_->join(); +} + +void Publisher::loop() +{ + while (!is_interrupted_) + { + PublishedMessage message_info; + + auto timepoint_snd_start = std::chrono::steady_clock::now(); + ecal_pub.Send(payload_.data(), payload_.size()); + auto timepoint_snd_end = std::chrono::steady_clock::now(); + + if (next_deadline_ > std::chrono::steady_clock::now()) + { + preciseWaitUntil(next_deadline_); + next_deadline_ += period_; + } + else + { + next_deadline_ = std::chrono::steady_clock::now() + period_; + } + + message_info.publish_time = timepoint_snd_start; + message_info.send_call_duration = timepoint_snd_end - timepoint_snd_start; + + if (statistics_thread_) + { + const std::lock_guardlock (mutex_); + statistics_.push_back(message_info); + } + } +} + +void Publisher::statisticsLoop() +{ + while (!is_interrupted_) + { + PublisherStatistics statistics; + statistics.reserve(static_cast((frequency_ + 1.0) * 1.2)); + + { + std::unique_locklock (mutex_); + + condition_variable_.wait_for(lock, std::chrono::seconds(1), [this]() { return bool(is_interrupted_); }); + + if (is_interrupted_) + return; + + if(statistics_.size() > 1) + statistics_.swap(statistics); + + // Initialize the new statistics vector with the last element of the old one. This is important for properly computing the loop time of the actual first message. + statistics_.push_back(statistics.back()); + } + + if (statistics.size() > 1) + printStatistics(statistics, log_print_verbose_times_); + else + std::cerr << "Not enough data" << std::endl; + } +} + +bool Publisher::preciseWaitUntil(std::chrono::steady_clock::time_point time) const +{ + constexpr auto max_time_to_poll_wait = std::chrono::milliseconds(20); + constexpr auto max_time_to_busy_wait = std::chrono::microseconds(5); + + while(true) + { + auto remaining_time_to_wait = time - std::chrono::steady_clock::now(); + + auto time_to_poll_wait = remaining_time_to_wait - max_time_to_busy_wait; + auto time_to_normal_wait = remaining_time_to_wait - max_time_to_poll_wait - max_time_to_busy_wait; + + if (time_to_normal_wait > std::chrono::steady_clock::duration::zero()) + { + std::unique_lock lock(mutex_); + condition_variable_.wait_for(lock, time_to_normal_wait, [this](){ return bool(is_interrupted_); }); + + if (is_interrupted_) + return false; + } + else if (time_to_poll_wait > std::chrono::steady_clock::duration::zero()) + { + while (std::chrono::steady_clock::now() < (time - max_time_to_busy_wait)) + { +#ifdef WIN32 + Sleep(0); // NOLINT(misc-include-cleaner) +#else + usleep(1); +#endif + if (is_interrupted_) + return false; + } + } + else + { + while ((std::chrono::steady_clock::now() < time) && !is_interrupted_) + { + // Busy wait + } + return !is_interrupted_; + } + } +} diff --git a/samples/cpp/benchmarks/perftool/src/publisher.h b/samples/cpp/benchmarks/perftool/src/publisher.h new file mode 100644 index 0000000000..6d56d43bcb --- /dev/null +++ b/samples/cpp/benchmarks/perftool/src/publisher.h @@ -0,0 +1,85 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2023 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "publisher_statistics.h" + +class Publisher +{ +////////////////////////////////////// +/// Publisher, Destructor +////////////////////////////////////// +public: + // Constructor that gets a frequency in Hz + Publisher(const std::string& topic_name, double frequency, std::size_t payload_size, bool quiet, bool log_print_verbose_times); + + // Delete copy + Publisher(const Publisher&) = delete; + Publisher& operator=(const Publisher&) = delete; + + // Delete move (the class uses a this reference) + Publisher(Publisher&&) noexcept = delete; + Publisher& operator=(Publisher&&) noexcept = delete; + + // Destructor + ~Publisher(); + +////////////////////////////////////// +/// Implementation +////////////////////////////////////// +private: + void loop(); + void statisticsLoop(); + + bool preciseWaitUntil(std::chrono::steady_clock::time_point time) const; + +////////////////////////////////////// +/// Member variables +////////////////////////////////////// +private: + eCAL::CPublisher ecal_pub; + const double frequency_; + std::vector payload_; + + std::unique_ptr publisher_thread_; + std::unique_ptr statistics_thread_; + + std::chrono::nanoseconds period_; + std::chrono::steady_clock::time_point next_deadline_; + + mutable std::mutex mutex_; + mutable std::condition_variable condition_variable_; + std::atomic is_interrupted_; + PublisherStatistics statistics_; + + const bool log_print_verbose_times_; +}; diff --git a/samples/cpp/benchmarks/perftool/src/publisher_statistics.h b/samples/cpp/benchmarks/perftool/src/publisher_statistics.h new file mode 100644 index 0000000000..c2af3bdeda --- /dev/null +++ b/samples/cpp/benchmarks/perftool/src/publisher_statistics.h @@ -0,0 +1,129 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2023 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +struct PublishedMessage +{ + std::chrono::steady_clock::time_point publish_time; + std::chrono::steady_clock::duration send_call_duration; +}; + +using PublisherStatistics = std::vector; + +inline void printStatistics(const PublisherStatistics& statistics, bool print_verbose_times) +{ + // Compute entire entire_duration from first to last message and the mean + auto entire_duration = statistics.back().publish_time - statistics.front().publish_time; + + // Get the minium and maximum send_call_duration. Skip the first element, as that one actually is from teh last iteration. + auto send_call_duration_min = std::chrono::steady_clock::duration::max(); + auto send_call_duration_max = std::chrono::steady_clock::duration::min(); + for (size_t i = 1; i < statistics.size(); ++i) + { + auto send_call_duration = statistics[i].send_call_duration; + if (send_call_duration < send_call_duration_min) + send_call_duration_min = send_call_duration; + if (send_call_duration > send_call_duration_max) + send_call_duration_max = send_call_duration; + } + + // Compute the mean send_call_duration. Skip the first element, as that one actually is from the last iteration. + auto send_call_duration_mean = std::chrono::steady_clock::duration(0); + for (size_t i = 1; i < statistics.size(); ++i) + { + send_call_duration_mean += statistics[i].send_call_duration; + } + send_call_duration_mean /= (statistics.size() - 1); + + // Get the minimum and maximum loop time (based on the publish_time timestamp) + auto loop_time_min = std::chrono::steady_clock::duration::max(); + auto loop_time_max = std::chrono::steady_clock::duration::min(); + for (size_t i = 1; i < statistics.size(); ++i) + { + auto loop_time = statistics[i].publish_time - statistics[i - 1].publish_time; + if (loop_time < loop_time_min) + loop_time_min = loop_time; + if (loop_time > loop_time_max) + loop_time_max = loop_time; + } + + // Compute the mean loop time (based on the publish_time timestamp) + auto loop_time_mean = entire_duration / (statistics.size() - 1); + + auto loop_frequency = 1.0 / std::chrono::duration_cast>(loop_time_mean).count(); + + + // Print statistics (mean and min/max) + { + std::stringstream ss; + ss << std::right << std::fixed; + ss << "[" << std::setprecision(3) << std::setw(10) << std::chrono::duration_cast>(std::chrono::steady_clock::now().time_since_epoch()).count() << "]"; + ss << " | cnt:" << std::setprecision(3) << std::setw(5) << statistics.size() - 1; + ss << " | loop_dt(ms) mean:" << std::setprecision(3) << std::setw(8) << std::chrono::duration_cast>(loop_time_mean).count(); + ss << " [" << std::setprecision(3) << std::setw(8) << std::chrono::duration_cast>(loop_time_min).count(); + ss << "," << std::setprecision(3) << std::setw(8) << std::chrono::duration_cast>(loop_time_max).count(); + ss << "]"; + ss << " | loop_freq(Hz):" << std::setprecision(1) << std::setw(7) << loop_frequency; + ss << " | snd_dt(ms) mean:" << std::setprecision(3) << std::setw(8) << std::chrono::duration_cast>(send_call_duration_mean).count(); + ss << " [" << std::setprecision(3) << std::setw(8) << std::chrono::duration_cast>(send_call_duration_min).count(); + ss << "," << std::setprecision(3) << std::setw(8) << std::chrono::duration_cast>(send_call_duration_max).count(); + ss << "]"; + + std::cerr << ss.str() << std::endl; + } + + // Print all times verbose + if (print_verbose_times) + { + std::stringstream ss; + ss << std::right << std::fixed; + + ss << " loop_dt(ms): "; + for (size_t i = 1; i < statistics.size(); ++i) + { + if (i > 1) + ss << " "; + + auto loop_time = statistics[i].publish_time - statistics[i - 1].publish_time; + ss << std::setprecision(1) << std::setw(5) << std::chrono::duration_cast>(loop_time).count(); + } + ss << std::endl; + + ss << " snd_dt(ms) : "; + for (size_t i = 1; i < statistics.size(); ++i) + { + if (i > 1) + ss << " "; + auto send_call_duration = statistics[i].send_call_duration; + ss << std::setprecision(1) << std::setw(5) << std::chrono::duration_cast>(send_call_duration).count(); + } + + std::cerr << ss.str() << std::endl; + } +} diff --git a/samples/cpp/benchmarks/perftool/src/subscriber.cpp b/samples/cpp/benchmarks/perftool/src/subscriber.cpp new file mode 100644 index 0000000000..6d20c04c6c --- /dev/null +++ b/samples/cpp/benchmarks/perftool/src/subscriber.cpp @@ -0,0 +1,153 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2023 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +#include "subscriber.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "subscriber_statistics.h" + +Subscriber::Subscriber(const std::string& topic_name + , std::chrono::nanoseconds time_to_waste + , bool busy_wait + , bool hickup + , std::chrono::steady_clock::duration wait_before_hickup + , std::chrono::steady_clock::duration hickup_delay + , bool quiet + , bool log_print_verbose_times) + : ecal_sub (topic_name) + , time_to_waste_ (time_to_waste) + , busy_wait_ (busy_wait) + , hickup_ (hickup) + , wait_before_hickup_ (wait_before_hickup) + , hickup_time_ (std::chrono::steady_clock::time_point::max()) + , hickup_delay_ (hickup_delay) + , is_interrupted_ (false) + , statistics_size_ (100) + , log_print_verbose_times_(log_print_verbose_times) +{ + statistics_.reserve(statistics_size_); + + // create statistics thread + if (!quiet) + statistics_thread_ = std::make_unique([this](){ this->statisticsLoop(); }); + + ecal_sub.AddReceiveCallback([this](const char* topic_name_, const eCAL::SReceiveCallbackData* data_) { callback(topic_name_, data_); }); +} + +// Destructor +Subscriber::~Subscriber() +{ + // Interrupt the thread + { + const std::unique_lock lock(mutex_); + is_interrupted_ = true; + condition_variable_.notify_all(); + } + + // Join the thread + if (statistics_thread_) + statistics_thread_->join(); +} + +void Subscriber::callback(const char* /*topic_name_*/, const eCAL::SReceiveCallbackData* data_) +{ + // Initialize callback timepoint, if necessary + if (hickup_ && hickup_time_ == std::chrono::steady_clock::time_point::max()) + hickup_time_ = std::chrono::steady_clock::now() + wait_before_hickup_; + + SubscribedMessage message_info; + message_info.local_receive_time = std::chrono::steady_clock::now(); + message_info.ecal_receive_time = eCAL::Time::ecal_clock::now(); + message_info.ecal_send_time = eCAL::Time::ecal_clock::time_point(std::chrono::microseconds(data_->time)); + message_info.ecal_counter = data_->clock; + message_info.size_bytes = data_->size; + + std::chrono::steady_clock::duration time_to_waste_this_iteration(time_to_waste_); + + // Check if we need to hickup + if (hickup_ && std::chrono::steady_clock::now() >= hickup_time_) + { + // Reset hickup (we only want to do that once) + hickup_ = false; + + // use another sleep time for this iteratoin + time_to_waste_this_iteration = hickup_delay_; + } + + if (time_to_waste_this_iteration >= std::chrono::nanoseconds::zero()) + { + if (busy_wait_) + { + auto start = std::chrono::high_resolution_clock::now(); + while (std::chrono::high_resolution_clock::now() - start < time_to_waste_this_iteration) + { + // Do nothing + } + } + else + { + std::this_thread::sleep_for(time_to_waste_this_iteration); + } + } + + if (statistics_thread_) + { + const std::unique_locklock(mutex_); + statistics_.push_back(message_info); + } +} + +void Subscriber::statisticsLoop() +{ + while (!is_interrupted_) + { + SubscriberStatistics statistics; + statistics.reserve(statistics_size_ * 2); + + { + std::unique_locklock (mutex_); + + condition_variable_.wait_for(lock, std::chrono::seconds(1), [this]() { return bool(is_interrupted_); }); + + if (is_interrupted_) + return; + + if(statistics_.size() > 1) + { + statistics_size_ = std::max(statistics_size_, statistics_.size()); + statistics_.swap(statistics); + + // Initialize the new statistics vector with the last element of the old one. This is important for detecting drops and properly computing the delay of the actual first message. + statistics_.push_back(statistics.back()); + } + } + + if (statistics.size() > 1) + printStatistics(statistics, log_print_verbose_times_); + else + std::cerr << "Not enough data" << std::endl; + } +} diff --git a/samples/cpp/benchmarks/perftool/src/subscriber.h b/samples/cpp/benchmarks/perftool/src/subscriber.h new file mode 100644 index 0000000000..bc3acb42ef --- /dev/null +++ b/samples/cpp/benchmarks/perftool/src/subscriber.h @@ -0,0 +1,91 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2023 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "subscriber_statistics.h" + +class Subscriber +{ +////////////////////////////////////// +/// Publisher, Destructor +////////////////////////////////////// +public: + // Constructor that gets a frequency in Hz + Subscriber(const std::string& topic_name + , std::chrono::nanoseconds time_to_waste + , bool busy_wait + , bool hickup + , std::chrono::steady_clock::duration wait_before_hickup + , std::chrono::steady_clock::duration hickup_delay + , bool quiet + , bool log_print_verbose_times); + + // Delete copy + Subscriber(const Subscriber&) = delete; + Subscriber& operator=(const Subscriber&) = delete; + + // Delete move (the class uses a this reference) + Subscriber(Subscriber&&) noexcept = delete; + Subscriber& operator=(Subscriber&&) noexcept = delete; + + // Destructor + ~Subscriber(); + +////////////////////////////////////// +/// Implementation +////////////////////////////////////// +private: + void callback(const char* topic_name_, const eCAL::SReceiveCallbackData* data_); + + void statisticsLoop(); + +////////////////////////////////////// +/// Member variables +////////////////////////////////////// +private: + eCAL::CSubscriber ecal_sub; + std::chrono::nanoseconds time_to_waste_; + bool busy_wait_; + + bool hickup_; + const std::chrono::steady_clock::duration wait_before_hickup_; + std::chrono::steady_clock::time_point hickup_time_; + const std::chrono::steady_clock::duration hickup_delay_; + + std::unique_ptr statistics_thread_; + mutable std::mutex mutex_; + mutable std::condition_variable condition_variable_; + std::atomic is_interrupted_; + SubscriberStatistics statistics_; + size_t statistics_size_; + + const bool log_print_verbose_times_; +}; diff --git a/samples/cpp/benchmarks/perftool/src/subscriber_statistics.h b/samples/cpp/benchmarks/perftool/src/subscriber_statistics.h new file mode 100644 index 0000000000..35e7ac9d40 --- /dev/null +++ b/samples/cpp/benchmarks/perftool/src/subscriber_statistics.h @@ -0,0 +1,113 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2023 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +struct SubscribedMessage +{ + std::chrono::steady_clock::time_point local_receive_time; + eCAL::Time::ecal_clock::time_point ecal_send_time; + eCAL::Time::ecal_clock::time_point ecal_receive_time; + unsigned long long size_bytes{}; + unsigned long long ecal_counter{}; +}; + +using SubscriberStatistics = std::vector; + +inline void printStatistics(const SubscriberStatistics& statistics, bool print_verbose_times) +{ + // Compute entire entire_duration from first to last message and the mean + const auto entire_duration = statistics.back().local_receive_time - statistics.front().local_receive_time; + + // The first message is from the previous loop run and only exists to count lost messages properly and to compute the delay of the actual first message. + const int received_msgs = static_cast(statistics.size()) - 1; + + // Check if the ecal_counter is continous. If not, we have lost messages. Count them. + bool ecal_counter_is_monotinc = true; + unsigned long long lost_msgs = 0; + for (size_t i = 1; i < statistics.size(); ++i) + { + if (statistics[i].ecal_counter <= statistics[i - 1].ecal_counter) + { + ecal_counter_is_monotinc = false; + break; + } + lost_msgs += statistics[i].ecal_counter - statistics[i - 1].ecal_counter - 1; + } + + // Get the minimum and maximum delay time between two messages + auto msg_dt_min = std::chrono::steady_clock::duration::max(); + auto msg_dt_max = std::chrono::steady_clock::duration::min(); + for (size_t i = 1; i < statistics.size(); ++i) + { + auto delay = statistics[i].local_receive_time - statistics[i - 1].local_receive_time; + if (delay < msg_dt_min) + msg_dt_min = delay; + if (delay > msg_dt_max) + msg_dt_max = delay; + } + auto msg_dt_mean = entire_duration / (statistics.size() - 1); + + auto msg_frequency = 1.0 / std::chrono::duration_cast>(msg_dt_mean).count(); + + // Print mean entire_duration and rmse in a single line in milliseconds + { + std::stringstream ss; + ss << std::right << std::fixed; + ss << "[" << std::setprecision(3) << std::setw(10) << std::chrono::duration_cast>(std::chrono::steady_clock::now().time_since_epoch()).count() << "]"; + ss << " | cnt:" << std::setprecision(3) << std::setw(5) << received_msgs; + ss << " | lost:" << std::setprecision(3) << std::setw(5) << (ecal_counter_is_monotinc ? std::to_string(lost_msgs) : "???"); + ss << " | msg_dt(ms) mean:" << std::setprecision(3) << std::setw(8) << std::chrono::duration_cast>(msg_dt_mean).count(); + ss << " [" << std::setprecision(3) << std::setw(8) << std::chrono::duration_cast>(msg_dt_min).count(); + ss << "," << std::setprecision(3) << std::setw(8) << std::chrono::duration_cast>(msg_dt_max).count(); + ss << "]"; + ss << " | msg_freq(Hz):" << std::setprecision(1) << std::setw(7) << msg_frequency; + + std::cerr << ss.str() << std::endl; + } + + // Print verbose times + if (print_verbose_times) + { + std::stringstream ss; + ss << std::right << std::fixed; + ss << " msg_dt(ms): "; + for (size_t i = 1; i < statistics.size(); ++i) + { + if (i > 1) + ss << " "; + + auto delay = statistics[i].local_receive_time - statistics[i - 1].local_receive_time; + ss << std::setprecision(1) << std::setw(5) << std::chrono::duration_cast>(delay).count(); + } + + std::cerr << ss.str() << std::endl; + } +}