Skip to content

Commit

Permalink
CThread class extented with trigger function
Browse files Browse the repository at this point in the history
  • Loading branch information
rex-schilasky committed Jul 29, 2024
1 parent a2d054b commit 0fccaa2
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 49 deletions.
29 changes: 5 additions & 24 deletions ecal/core/src/registration/ecal_registration_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ namespace eCAL

// start cyclic registration thread
m_reg_sample_snd_thread = std::make_shared<CCallbackThread>(std::bind(&CRegistrationProvider::RegisterSendThread, this));
m_reg_sample_snd_thread->start(std::chrono::milliseconds(0));
m_reg_sample_snd_thread->start(std::chrono::milliseconds(Config::GetRegistrationRefreshMs()));

m_created = true;
}
Expand All @@ -92,17 +92,14 @@ namespace eCAL
{
if(!m_created) return;

// stop cyclic registration thread
m_reg_sample_snd_thread->stop();

// add unregistration sample to registration loop
AddSingleSample(Registration::GetProcessUnregisterSample());

// wake up registration thread the last time
TriggerRegisterSendThread();
m_reg_sample_snd_thread->trigger();

// stop registration thread
m_reg_sample_snd_thread.reset();
// stop cyclic registration thread
m_reg_sample_snd_thread->stop();

// delete registration sender
m_reg_sender.reset();
Expand All @@ -119,7 +116,7 @@ namespace eCAL
AddSingleSample(sample_);

// wake up registration thread
TriggerRegisterSendThread();
m_reg_sample_snd_thread->trigger();

return(true);
}
Expand All @@ -141,15 +138,6 @@ namespace eCAL
m_applied_sample_list.samples.push_back(sample_);
}

void CRegistrationProvider::TriggerRegisterSendThread()
{
{
const std::lock_guard<std::mutex> lock(m_reg_sample_snd_thread_cv_mtx);
m_reg_sample_snd_thread_trigger = true;
}
m_reg_sample_snd_thread_cv.notify_one();
}

void CRegistrationProvider::RegisterSendThread()
{
// collect all registrations and send them out cyclic
Expand Down Expand Up @@ -187,13 +175,6 @@ namespace eCAL
m_reg_sender->SendSampleList(m_applied_sample_list);
m_applied_sample_list.samples.clear();
}

// wait for external trigger or until registration refresh timeout
{
std::unique_lock<std::mutex> lock(m_reg_sample_snd_thread_cv_mtx);
m_reg_sample_snd_thread_cv.wait_for(lock, std::chrono::milliseconds(Config::GetRegistrationRefreshMs()), [this] { return m_reg_sample_snd_thread_trigger; });
m_reg_sample_snd_thread_trigger = false;
}
}
}
}
30 changes: 10 additions & 20 deletions ecal/core/src/registration/ecal_registration_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,12 @@

#pragma once

#include "io/udp/ecal_udp_sample_sender.h"

#include <atomic>
#include <condition_variable>
#include "registration/ecal_registration_sender.h"
#include "util/ecal_thread.h"

#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include <registration/ecal_registration_sender.h>
#include "util/ecal_thread.h"

namespace eCAL
{
Expand All @@ -56,22 +51,17 @@ namespace eCAL

protected:
void AddSingleSample(const Registration::Sample& sample_);
void TriggerRegisterSendThread();
void RegisterSendThread();

static std::atomic<bool> m_created;

std::unique_ptr<CRegistrationSender> m_reg_sender;
std::shared_ptr<CCallbackThread> m_reg_sample_snd_thread;
static std::atomic<bool> m_created;

std::condition_variable m_reg_sample_snd_thread_cv;
std::mutex m_reg_sample_snd_thread_cv_mtx;
bool m_reg_sample_snd_thread_trigger;
std::unique_ptr<CRegistrationSender> m_reg_sender;
std::shared_ptr<CCallbackThread> m_reg_sample_snd_thread;

std::mutex m_applied_sample_list_mtx;
Registration::SampleList m_applied_sample_list;
std::mutex m_applied_sample_list_mtx;
Registration::SampleList m_applied_sample_list;

bool m_use_registration_udp;
bool m_use_registration_shm;
bool m_use_registration_udp;
bool m_use_registration_shm;
};
}
33 changes: 28 additions & 5 deletions ecal/core/src/util/ecal_thread.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -84,12 +84,28 @@ namespace eCAL
}
}

/**
* @brief Trigger the callback thread to interrupt the current sleep without stopping it.
* The callback function will be executed immediately.
*/
void trigger()
{
{
const std::unique_lock<std::mutex> lock(mtx_);
// Set the flag to signal the callback thread to trigger
triggerThread_ = true;
// Notify the callback thread to wake up and check the flag
cv_.notify_one();
}
}

private:
std::thread callbackThread_; /**< The callback thread object. */
std::function<void()> callback_; /**< The callback function to be executed in the callback thread. */
std::mutex mtx_; /**< Mutex for thread synchronization. */
std::condition_variable cv_; /**< Condition variable for signaling between threads. */
bool stopThread_{false}; /**< Flag to indicate whether the callback thread should stop. */
bool stopThread_{ false }; /**< Flag to indicate whether the callback thread should stop. */
bool triggerThread_{ false }; /**< Flag to indicate whether the callback thread should be triggered. */

/**
* @brief Callback function that runs in the callback thread.
Expand All @@ -105,10 +121,17 @@ namespace eCAL
{
std::unique_lock<std::mutex> lock(mtx_);
// Wait for a signal or a timeout
if (cv_.wait_for(lock, timeout, [this] { return stopThread_; }))
if (cv_.wait_for(lock, timeout, [this] { return stopThread_ || triggerThread_; }))
{
// If the stopThread flag is true, break out of the loop
break;
if (stopThread_) {
// If the stopThread flag is true, break out of the loop
break;
}

if (triggerThread_) {
// If the triggerThread flag is true, reset it and proceed
triggerThread_ = false;
}
}
}

Expand Down

0 comments on commit 0fccaa2

Please sign in to comment.