Skip to content

Commit

Permalink
[core] create FrequencyCalculator class for sender / receiver frequen…
Browse files Browse the repository at this point in the history
…cy calculation.
  • Loading branch information
KerstinKeller committed Mar 12, 2024
1 parent 65c0a09 commit 28890f9
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 75 deletions.
1 change: 1 addition & 0 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ set(ecal_util_src
src/util/convert_utf.h
src/util/ecal_expmap.h
src/util/ecal_thread.h
src/util/frequency_calculator.h
src/util/getenvvar.h
src/util/sys_usage.cpp
src/util/sys_usage.h
Expand Down
52 changes: 18 additions & 34 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ namespace eCAL
m_receive_timeout(0),
m_receive_time(0),
m_clock(0),
m_clock_old(0),
m_freq(0),
m_frequency_calculator(3.0f),
m_message_drops(0),
m_loc_published(false),
m_ext_published(false),
Expand Down Expand Up @@ -94,9 +93,7 @@ namespace eCAL
m_topic_id.clear();
m_topic_info = topic_info_;
m_clock = 0;
m_clock_old = 0;
m_message_drops = 0;
m_rec_time = std::chrono::steady_clock::time_point();
m_created = false;
#ifndef NDEBUG
// log it
Expand Down Expand Up @@ -160,10 +157,7 @@ namespace eCAL
// reset defaults
m_created = false;
m_clock = 0;
m_clock_old = 0;
m_freq = 0;
m_message_drops = 0;
m_rec_time = std::chrono::steady_clock::time_point();

m_use_udp_mc_confirmed = false;
m_use_shm_confirmed = false;
Expand Down Expand Up @@ -281,7 +275,7 @@ namespace eCAL
ecal_reg_sample_mutable_topic->set_pname(m_pname);
ecal_reg_sample_mutable_topic->set_uname(Process::GetUnitName());
ecal_reg_sample_mutable_topic->set_dclock(m_clock);
ecal_reg_sample_mutable_topic->set_dfreq(m_freq);
ecal_reg_sample_mutable_topic->set_dfreq(GetFrequency());
ecal_reg_sample_mutable_topic->set_message_drops(google::protobuf::int32(m_message_drops));

size_t loc_connections(0);
Expand Down Expand Up @@ -490,6 +484,13 @@ namespace eCAL
// increase read clock
m_clock++;

// Update frequency calculation
{
const auto receive_time = std::chrono::steady_clock::now();
const std::lock_guard<std::mutex> freq_lock(m_frequency_calculator_mutex);
m_frequency_calculator.addTick(receive_time);
}

// reset timeout
m_receive_time = 0;

Expand Down Expand Up @@ -903,37 +904,20 @@ namespace eCAL
// should never be reached
return false;
}

int32_t CDataReader::GetFrequency()
{
const auto frequency_time = std::chrono::steady_clock::now();
const std::lock_guard<std::mutex> lock(m_frequency_calculator_mutex);
return static_cast<int32_t>(m_frequency_calculator.getFrequency(frequency_time) * 1000);
}

void CDataReader::RefreshRegistration()
{
if(!m_created) return;

// ensure that registration is not called within zero nanoseconds
// normally it will be called from registration logic every second
auto curr_time = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(curr_time - m_rec_time) > std::chrono::milliseconds(0))
{
// reset clock and time on first call
if (m_clock_old == 0)
{
m_clock_old = m_clock;
m_rec_time = curr_time;
}

// check for clock difference
else if ((m_clock - m_clock_old) > 0)
{
// calculate frequency in mHz
m_freq = static_cast<long>((1000 * 1000 * (m_clock - m_clock_old)) / std::chrono::duration_cast<std::chrono::milliseconds>(curr_time - m_rec_time).count());
// reset clock and time
m_clock_old = m_clock;
m_rec_time = curr_time;
}
else
{
m_freq = 0;
}
}

// register without send
Register(false);
Expand Down Expand Up @@ -996,6 +980,7 @@ namespace eCAL
{
std::stringstream out;


out << std::endl;
out << indent_ << "------------------------------------" << std::endl;
out << indent_ << " class CDataReader " << std::endl;
Expand All @@ -1012,8 +997,7 @@ namespace eCAL
out << indent_ << "m_read_buf.size(): " << m_read_buf.size() << std::endl;
out << indent_ << "m_read_time: " << m_read_time << std::endl;
out << indent_ << "m_clock: " << m_clock << std::endl;
out << indent_ << "m_rec_time: " << std::chrono::duration_cast<std::chrono::milliseconds>(m_rec_time.time_since_epoch()).count() << std::endl;
out << indent_ << "m_freq: " << m_freq << std::endl;
out << indent_ << "frequency [mHz]: " << GetFrequency() << std::endl;
out << indent_ << "m_created: " << m_created << std::endl;
out << std::endl;

Expand Down
10 changes: 7 additions & 3 deletions ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
#include <string>
#include <unordered_map>

#include <util/frequency_calculator.h>

namespace eCAL
{
class CDataReader
Expand Down Expand Up @@ -119,6 +121,8 @@ namespace eCAL
void Disconnect();
bool CheckMessageClock(const std::string& tid_, long long current_clock_);

int32_t GetFrequency();

std::string m_host_name;
std::string m_host_group_name;
int m_host_id;
Expand Down Expand Up @@ -156,9 +160,9 @@ namespace eCAL
EventCallbackMapT m_event_callback_map;

std::atomic<long long> m_clock;
long long m_clock_old;
std::chrono::steady_clock::time_point m_rec_time;
long m_freq;

std::mutex m_frequency_calculator_mutex;
ResettableFrequencyCalculator<std::chrono::steady_clock> m_frequency_calculator;

std::set<long long> m_id_set;

Expand Down
51 changes: 16 additions & 35 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ namespace eCAL
m_connected(false),
m_id(0),
m_clock(0),
m_clock_old(0),
m_freq(0),
m_frequency_calculator(3.0f),
m_bandwidth_max_udp(NET_BANDWIDTH_MAX_UDP),
m_loc_subscribed(false),
m_ext_subscribed(false),
Expand Down Expand Up @@ -114,9 +113,6 @@ namespace eCAL
m_topic_info = topic_info_;
m_id = 0;
m_clock = 0;
m_clock_old = 0;
m_snd_time = std::chrono::steady_clock::time_point();
m_freq = 0;
m_bandwidth_max_udp = Config::GetMaxUdpBandwidthBytesPerSecond();
m_buffering_shm = Config::GetMemfileBufferCount();
m_zero_copy = Config::IsMemfileZerocopyEnabled();
Expand Down Expand Up @@ -191,9 +187,6 @@ namespace eCAL
// reset defaults
m_id = 0;
m_clock = 0;
m_clock_old = 0;
m_snd_time = std::chrono::steady_clock::time_point();
m_freq = 0;
m_bandwidth_max_udp = Config::GetMaxUdpBandwidthBytesPerSecond();
m_buffering_shm = Config::GetMemfileBufferCount();
m_zero_copy = Config::IsMemfileZerocopyEnabled();
Expand Down Expand Up @@ -412,6 +405,13 @@ namespace eCAL

size_t CDataWriter::Write(CPayloadWriter& payload_, long long time_, long long id_)
{
{
// we should think about if we would like to potentially use the `time_` variable to tick with (but we would need the same base for checking incoming samples then....
const auto send_time = std::chrono::steady_clock::now();
const std::lock_guard<std::mutex> lock(m_frequency_calculator_mutex);
m_frequency_calculator.addTick(send_time);
}

// check writer modes
if (!CheckWriterModes())
{
Expand Down Expand Up @@ -741,32 +741,6 @@ namespace eCAL
{
if (!m_created) return;

// force to register every second to refresh data clock information
auto curr_time = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(curr_time - m_snd_time) > std::chrono::milliseconds(0))
{
// reset clock and time on first call
if (m_clock_old == 0)
{
m_clock_old = m_clock;
m_snd_time = curr_time;
}

// check for clock difference
if ((m_clock - m_clock_old) > 0)
{
// calculate frequency in mHz
m_freq = static_cast<long>((1000 * 1000 * (m_clock - m_clock_old)) / std::chrono::duration_cast<std::chrono::milliseconds>(curr_time - m_snd_time).count());
// reset clock and time
m_clock_old = m_clock;
m_snd_time = curr_time;
}
else
{
m_freq = 0;
}
}

// register without send
Register(false);

Expand Down Expand Up @@ -813,6 +787,7 @@ namespace eCAL
out << indent_ << "m_topic_info.descriptor: " << m_topic_info.descriptor << std::endl;
out << indent_ << "m_id: " << m_id << std::endl;
out << indent_ << "m_clock: " << m_clock << std::endl;
out << indent_ << "frequency [mHz]: " << GetFrequency() << std::endl;
out << indent_ << "m_created: " << m_created << std::endl;
out << indent_ << "m_loc_subscribed: " << m_loc_subscribed << std::endl;
out << indent_ << "m_ext_subscribed: " << m_ext_subscribed << std::endl;
Expand Down Expand Up @@ -921,7 +896,7 @@ namespace eCAL
ecal_reg_sample_mutable_topic->set_uname(Process::GetUnitName());
ecal_reg_sample_mutable_topic->set_did(m_id);
ecal_reg_sample_mutable_topic->set_dclock(m_clock);
ecal_reg_sample_mutable_topic->set_dfreq(m_freq);
ecal_reg_sample_mutable_topic->set_dfreq(GetFrequency());

size_t loc_connections(0);
size_t ext_connections(0);
Expand Down Expand Up @@ -1337,4 +1312,10 @@ namespace eCAL
(void)base_msg_;
#endif
}
int32_t CDataWriter::GetFrequency()
{
const auto frequency_time = std::chrono::steady_clock::now();
const std::lock_guard<std::mutex> lock(m_frequency_calculator_mutex);
return static_cast<int32_t>(m_frequency_calculator.getFrequency(frequency_time) * 1000);
}
}
10 changes: 7 additions & 3 deletions ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

#include "ecal_def.h"
#include "util/ecal_expmap.h"
#include <util/frequency_calculator.h>


#include "udp/ecal_writer_udp_mc.h"
#include "shm/ecal_writer_shm.h"
Expand Down Expand Up @@ -144,6 +146,8 @@ namespace eCAL
bool IsInternalSubscribedOnly();
void LogSendMode(TLayer::eSendMode smode_, const std::string & base_msg_);

int32_t GetFrequency();

std::string m_host_name;
std::string m_host_group_name;
int m_host_id;
Expand Down Expand Up @@ -177,9 +181,9 @@ namespace eCAL

long long m_id;
long long m_clock;
long long m_clock_old;
std::chrono::steady_clock::time_point m_snd_time;
long m_freq;

std::mutex m_frequency_calculator_mutex;
ResettableFrequencyCalculator<std::chrono::steady_clock> m_frequency_calculator;

long m_bandwidth_max_udp;

Expand Down
Loading

0 comments on commit 28890f9

Please sign in to comment.