Skip to content

Commit

Permalink
datawriter/datareader pub/sub member private
Browse files Browse the repository at this point in the history
SetID renamed to SetFilterID(s)
  • Loading branch information
rex-schilasky committed Dec 6, 2024
1 parent bc53fdf commit c2b8a2c
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 60 deletions.
2 changes: 1 addition & 1 deletion app/play/play_core/src/measurement_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ bool MeasurementContainer::PublishFrame(long long index)
{
timestamp_usecs = std::chrono::duration_cast<std::chrono::microseconds>(frame_table_[index].send_timestamp_.time_since_epoch()).count();
}
frame_table_[index].publisher_info_->publisher_.SetID(frame_table_[index].send_id_);
frame_table_[index].publisher_info_->publisher_.SetFilterID(frame_table_[index].send_id_);
frame_table_[index].publisher_info_->publisher_.Send(send_buffer_, data_size, timestamp_usecs);
frame_table_[index].publisher_info_->message_counter_++;
return true;
Expand Down
13 changes: 6 additions & 7 deletions ecal/core/include/ecal/ecal_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ namespace eCAL
bool ClearAttribute(const std::string& attr_name_);

/**
* @brief Set the specific topic id.
* @brief Set a topic filter id.
*
* @param id_ The topic id for subscriber side filtering (0 == no id).
* @param id_ The topic id for subscriber side filtering (0 == no id).
*
* @return True if it succeeds, false if it fails.
**/
ECAL_API_EXPORTED_MEMBER
bool SetID(long long id_);
bool SetFilterID(long long filter_id_);

/**
* @brief Send a message to all subscribers.
Expand Down Expand Up @@ -260,7 +260,7 @@ namespace eCAL
* @return True if created, false if not.
**/
ECAL_API_EXPORTED_MEMBER
bool IsCreated() const {return(m_created);}
bool IsCreated() const {return(m_datawriter != nullptr);}

/**
* @brief Query if the publisher is subscribed.
Expand Down Expand Up @@ -312,10 +312,9 @@ namespace eCAL
ECAL_API_EXPORTED_MEMBER
std::string Dump(const std::string& indent_ = "") const;

protected:
private:
// class members
std::shared_ptr<CDataWriter> m_datawriter;
long long m_id;
bool m_created;
long long m_filter_id;
};
}
7 changes: 3 additions & 4 deletions ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ namespace eCAL
* @return True if it succeeds, false if it fails.
**/
ECAL_API_EXPORTED_MEMBER
bool SetID(const std::set<long long>& id_set_);
bool SetFilterIDs(const std::set<long long>& filter_ids_);

/**
* @brief Sets subscriber attribute.
Expand Down Expand Up @@ -267,7 +267,7 @@ namespace eCAL
* @return true if created, false if not.
**/
ECAL_API_EXPORTED_MEMBER
bool IsCreated() const {return(m_created);}
bool IsCreated() const {return(m_datareader != nullptr);}

/**
* @brief Query if the subscriber is published.
Expand Down Expand Up @@ -319,9 +319,8 @@ namespace eCAL
ECAL_API_EXPORTED_MEMBER
std::string Dump(const std::string& indent_ = "") const;

protected:
private:
// class members
std::shared_ptr<CDataReader> m_datareader;
bool m_created;
};
}
2 changes: 1 addition & 1 deletion ecal/core/src/cimpl/ecal_publisher_cimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ extern "C"
{
if (handle_ == nullptr) return(0);
auto* pub = static_cast<eCAL::CPublisher*>(handle_);
if (pub->SetID(id_)) return(1);
if (pub->SetFilterID(id_)) return(1);
return(0);
}

Expand Down
39 changes: 16 additions & 23 deletions ecal/core/src/pubsub/ecal_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ namespace eCAL
{
CPublisher::CPublisher() :
m_datawriter(nullptr),
m_id(0),
m_created(false)
m_filter_id(0)
{
}

Expand All @@ -65,10 +64,9 @@ namespace eCAL
**/
CPublisher::CPublisher(CPublisher&& rhs) noexcept :
m_datawriter(std::move(rhs.m_datawriter)),
m_id(rhs.m_id),
m_created(rhs.m_created)
m_filter_id(rhs.m_filter_id)
{
rhs.m_created = false;
rhs.m_datawriter = nullptr;
}

/**
Expand All @@ -79,19 +77,18 @@ namespace eCAL
// Call destroy, to clean up the current state, then afterwards move all elements
Destroy();

m_datawriter = std::move(rhs.m_datawriter);
m_id = rhs.m_id;
m_created = rhs.m_created;

rhs.m_created = false;
m_datawriter = std::move(rhs.m_datawriter);
m_filter_id = rhs.m_filter_id;

rhs.m_datawriter = nullptr;

return *this;
}

bool CPublisher::Create(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_)
{
if (m_created) return(false);
if (topic_name_.empty()) return(false);
if (m_datawriter != nullptr) return(false);
if (topic_name_.empty()) return(false);

// create datawriter
m_datawriter = std::make_shared<CDataWriter>(data_type_info_, BuildWriterAttributes(topic_name_, config_, GetTransportLayerConfiguration(), GetRegistrationConfiguration()));
Expand All @@ -100,8 +97,7 @@ namespace eCAL
g_pubgate()->Register(topic_name_, m_datawriter);

// we made it :-)
m_created = true;
return(m_created);
return(true);
}

bool CPublisher::Create(const std::string& topic_name_)
Expand All @@ -111,7 +107,7 @@ namespace eCAL

bool CPublisher::Destroy()
{
if(!m_created) return(false);
if (m_datawriter == nullptr) return(false);

// unregister datawriter
if(g_pubgate() != nullptr) g_pubgate()->Unregister(m_datawriter->GetTopicName(), m_datawriter);
Expand All @@ -125,8 +121,6 @@ namespace eCAL
m_datawriter.reset();

// we made it :-)
m_created = false;

return(true);
}

Expand All @@ -148,10 +142,10 @@ namespace eCAL
return m_datawriter->ClearAttribute(attr_name_);
}

bool CPublisher::SetID(long long id_)
bool CPublisher::SetFilterID(long long filter_id_)
{
m_id = id_;
return(true);
m_filter_id = filter_id_;
return true;
}

size_t CPublisher::Send(const void* const buf_, const size_t len_, const long long time_ /* = DEFAULT_TIME_ARGUMENT */)
Expand All @@ -162,7 +156,7 @@ namespace eCAL

size_t CPublisher::Send(CPayloadWriter& payload_, long long time_)
{
if (!m_created) return(0);
if (m_datawriter == nullptr) return 0;

// in an optimization case the
// publisher can send an empty package
Expand All @@ -177,7 +171,7 @@ namespace eCAL

// send content via data writer layer
const long long write_time = (time_ == DEFAULT_TIME_ARGUMENT) ? eCAL::Time::GetMicroSeconds() : time_;
const size_t written_bytes = m_datawriter->Write(payload_, write_time, m_id);
const size_t written_bytes = m_datawriter->Write(payload_, write_time, m_filter_id);

// return number of bytes written
return written_bytes;
Expand Down Expand Up @@ -242,7 +236,6 @@ namespace eCAL
out << indent_ << "----------------------" << '\n';
out << indent_ << " class CPublisher" << '\n';
out << indent_ << "----------------------" << '\n';
out << indent_ << "m_created: " << m_created << '\n';
if((m_datawriter != nullptr) && m_datawriter->IsCreated()) out << indent_ << m_datawriter->Dump(" ");
out << '\n';

Expand Down
29 changes: 11 additions & 18 deletions ecal/core/src/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@
namespace eCAL
{
CSubscriber::CSubscriber() :
m_datareader(nullptr),
m_created(false)
m_datareader(nullptr)
{
}

Expand All @@ -57,10 +56,9 @@ namespace eCAL
}

CSubscriber::CSubscriber(CSubscriber&& rhs) noexcept :
m_datareader(std::move(rhs.m_datareader)),
m_created(rhs.m_created)
m_datareader(std::move(rhs.m_datareader))
{
rhs.m_created = false;
rhs.m_datareader = nullptr;
}

CSubscriber& CSubscriber::operator=(CSubscriber&& rhs) noexcept
Expand All @@ -69,17 +67,16 @@ namespace eCAL
Destroy();

m_datareader = std::move(rhs.m_datareader);
m_created = rhs.m_created;

rhs.m_created = false;
rhs.m_datareader = nullptr;

return *this;
}

bool CSubscriber::Create(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Subscriber::Configuration& config_)
{
if (m_created) return(false);
if (topic_name_.empty()) return(false);
if (m_datareader != nullptr) return(false);
if (topic_name_.empty()) return(false);

// create datareader
m_datareader = std::make_shared<CDataReader>(data_type_info_, BuildReaderAttributes(topic_name_, config_, GetPublisherConfiguration(), GetTransportLayerConfiguration(), GetRegistrationConfiguration()));
Expand All @@ -88,8 +85,7 @@ namespace eCAL
g_subgate()->Register(topic_name_, m_datareader);

// we made it :-)
m_created = true;
return(m_created);
return(true);
}

bool CSubscriber::Create(const std::string& topic_name_)
Expand All @@ -99,7 +95,7 @@ namespace eCAL

bool CSubscriber::Destroy()
{
if(!m_created) return(false);
if (m_datareader == nullptr) return(false);

// remove receive callback
RemReceiveCallback();
Expand All @@ -117,15 +113,13 @@ namespace eCAL
m_datareader.reset();

// we made it :-)
m_created = false;

return(true);
}

bool CSubscriber::SetID(const std::set<long long>& id_set_)
bool CSubscriber::SetFilterIDs(const std::set<long long>& filter_ids_)
{
if (m_datareader == nullptr) return(false);
m_datareader->SetID(id_set_);
m_datareader->SetFilterIDs(filter_ids_);
return(true);
}

Expand All @@ -143,7 +137,7 @@ namespace eCAL

bool CSubscriber::ReceiveBuffer(std::string& buf_, long long* time_ /* = nullptr */, int rcv_timeout_ /* = 0 */) const
{
if (!m_created) return(false);
if (m_datareader == nullptr) return(false);
return(m_datareader->Read(buf_, time_, rcv_timeout_));
}

Expand Down Expand Up @@ -219,7 +213,6 @@ namespace eCAL
out << indent_ << "----------------------" << '\n';
out << indent_ << " class CSubscriber " << '\n';
out << indent_ << "----------------------" << '\n';
out << indent_ << "m_created: " << m_created << '\n';
if((m_datareader != nullptr) && m_datareader->IsCreated()) out << indent_ << m_datareader->Dump(" ");
out << '\n';

Expand Down
4 changes: 2 additions & 2 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ namespace eCAL
return(true);
}

void CDataReader::SetID(const std::set<long long>& id_set_)
void CDataReader::SetFilterIDs(const std::set<long long>& filter_ids_)
{
m_id_set = id_set_;
m_id_set = filter_ids_;
}

void CDataReader::ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& pub_layer_states_)
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ namespace eCAL
bool SetAttribute(const std::string& attr_name_, const std::string& attr_value_);
bool ClearAttribute(const std::string& attr_name_);

void SetID(const std::set<long long>& id_set_);
void SetFilterIDs(const std::set<long long>& filter_ids_);

void ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& pub_layer_states_);
void RemovePublication(const SPublicationInfo& publication_info_);
Expand Down
4 changes: 2 additions & 2 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ namespace eCAL
return true;
}

size_t CDataWriter::Write(CPayloadWriter& payload_, long long time_, long long id_)
size_t CDataWriter::Write(CPayloadWriter& payload_, long long time_, long long filter_id_)
{
// get payload buffer size (one time, to avoid multiple computations)
const size_t payload_buf_size(payload_.GetSize());
Expand All @@ -186,7 +186,7 @@ namespace eCAL
}

// prepare counter and internal states
const size_t snd_hash = PrepareWrite(id_, payload_buf_size);
const size_t snd_hash = PrepareWrite(filter_id_, payload_buf_size);

// did we write anything
bool written(false);
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ namespace eCAL

bool Stop();

size_t Write(CPayloadWriter& payload_, long long time_, long long id_);
size_t Write(CPayloadWriter& payload_, long long time_, long long filter_id_);

bool SetDataTypeInformation(const SDataTypeInformation& topic_info_);

Expand Down

0 comments on commit c2b8a2c

Please sign in to comment.