Skip to content

Commit

Permalink
[config] cleanup (#1655)
Browse files Browse the repository at this point in the history
* Removed unused methods from ecal_config.h
* Removed unused member from transport_layer configuration.
* Moved num_executor_reader/writer to publisher/subscriber config.
* Added new num_execs to API and uses subscriber options for previous implementation calls.
  • Loading branch information
Peguen authored Jul 15, 2024
1 parent 6dd1718 commit 5a835aa
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 65 deletions.
3 changes: 3 additions & 0 deletions ecal/core/include/ecal/config/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ namespace eCAL
struct Configuration
{
bool enable; //!< enable layer

size_t num_executor_reader{}; //!< reader amount of threads that shall execute workload (Default: 4)
size_t num_executor_writer{}; //!< writer amount of threads that shall execute workload (Default: 4)
};
}

Expand Down
5 changes: 5 additions & 0 deletions ecal/core/include/ecal/config/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ namespace eCAL
struct Configuration
{
bool enable; //!< enable layer

size_t num_executor_reader{}; //!< reader amount of threads that shall execute workload (Default: 4)
size_t num_executor_writer{}; //!< writer amount of threads that shall execute workload (Default: 4)

size_t max_reconnections{}; //!< reconnection attemps the session will try to reconnect in (Default: 5)
};
}

Expand Down
20 changes: 1 addition & 19 deletions ecal/core/include/ecal/config/transport_layer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,12 @@ namespace eCAL
{
namespace TransportLayer
{
namespace TCPPubSub
{
struct Configuration
{
size_t num_executor_reader{}; //!< Tcp_pubsub reader amount of threads that shall execute workload (Default: 4)
size_t num_executor_writer{}; //!< Tcp_pubsub writer amount of threads that shall execute workload (Default: 4)
size_t max_reconnections{}; //!< Tcp_pubsub reconnection attemps the session will try to reconnect in (Default: 5)
};
}

namespace SHM
{
struct Configuration
{
std::string host_group_name{}; /*!< Common host group name that enables interprocess mechanisms across
(virtual) host borders (e.g, Docker); by default equivalent to local host name (Default: "")*/
Types::ConstrainedInteger<4096, 4096> memfile_minsize{}; //!< Default memory file size for new publisher (Default: 4096)
Types::ConstrainedInteger<50, 1, 100> memfile_reserve{}; //!< Dynamic file size reserve before recreating memory file if topic size changes in % (Default: 50)
unsigned int memfile_ack_timeout{}; //!< Publisher timeout for ack event from subscriber that memory file content is processed (Default: 0)
Types::ConstrainedInteger<0, 1> memfile_buffer_count{}; //!< Number of parallel used memory file buffers for 1:n publish/subscribe ipc connections (Default = 1)
bool drop_out_of_order_messages{}; //!< (Default: )
bool memfile_zero_copy{}; //!< Allow matching subscriber to access memory file without copying its content in advance (Default: false)
};
(virtual) host borders (e.g, Docker); by default equivalent to local host name (Default: "")*/ };
}

namespace UDPMC
Expand Down Expand Up @@ -85,7 +68,6 @@ namespace eCAL
{
bool drop_out_of_order_messages{}; //!< Enable dropping of payload messages that arrive out of order (Default: false)
UDPMC::Configuration mc_options{};
TCPPubSub::Configuration tcp_options{};
SHM::Configuration shm_options{};
};
}
Expand Down
19 changes: 7 additions & 12 deletions ecal/core/include/ecal/ecal_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ namespace eCAL
ECAL_API int GetTcpPubsubWriterThreadpoolSize ();
ECAL_API int GetTcpPubsubMaxReconnectionAttemps ();

ECAL_API int GetTcpPubReaderThreadpoolSize ();
ECAL_API int GetTcpPubWriterThreadpoolSize ();

ECAL_API int GetTcpSubReaderThreadpoolSize ();
ECAL_API int GetTcpSubWriterThreadpoolSize ();
ECAL_API int GetTcpSubMaxReconnectionAttemps ();

ECAL_API std::string GetHostGroupName ();

/////////////////////////////////////
Expand Down Expand Up @@ -104,16 +111,6 @@ namespace eCAL
/////////////////////////////////////
// publisher
/////////////////////////////////////
ECAL_API bool GetPublisherShmMode ();
ECAL_API bool GetPublisherTcpMode ();
ECAL_API bool GetPublisherUdpMulticastMode ();

ECAL_API size_t GetMemfileMinsizeBytes ();
ECAL_API size_t GetMemfileOverprovisioningPercentage ();
ECAL_API int GetMemfileAckTimeoutMs ();
ECAL_API bool IsMemfileZerocopyEnabled ();
ECAL_API size_t GetMemfileBufferCount ();

ECAL_API bool IsTopicTypeSharingEnabled ();
ECAL_API bool IsTopicDescriptionSharingEnabled ();

Expand All @@ -128,8 +125,6 @@ namespace eCAL
/////////////////////////////////////
namespace Experimental
{
ECAL_API bool IsShmMonitoringEnabled ();
ECAL_API bool IsNetworkMonitoringDisabled ();
ECAL_API size_t GetShmMonitoringQueueSize ();
ECAL_API std::string GetShmMonitoringDomain ();
ECAL_API bool GetDropOutOfOrderMessages ();
Expand Down
25 changes: 11 additions & 14 deletions ecal/core/src/config/ecal_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,17 @@ namespace eCAL

ECAL_API bool IsNpcapEnabled () { return GetConfiguration().transport_layer.mc_options.npcap_enabled; }

ECAL_API int GetTcpPubsubReaderThreadpoolSize () { return static_cast<int>(GetConfiguration().transport_layer.tcp_options.num_executor_reader); }
ECAL_API int GetTcpPubsubWriterThreadpoolSize () { return static_cast<int>(GetConfiguration().transport_layer.tcp_options.num_executor_writer); }
ECAL_API int GetTcpPubsubMaxReconnectionAttemps () { return static_cast<int>(GetConfiguration().transport_layer.tcp_options.max_reconnections); }
ECAL_API int GetTcpPubReaderThreadpoolSize () { return static_cast<int>(GetConfiguration().publisher.tcp.num_executor_reader); }
ECAL_API int GetTcpPubWriterThreadpoolSize () { return static_cast<int>(GetConfiguration().publisher.tcp.num_executor_writer); }

ECAL_API int GetTcpSubReaderThreadpoolSize () { return static_cast<int>(GetConfiguration().subscriber.tcp.num_executor_reader); }
ECAL_API int GetTcpSubWriterThreadpoolSize () { return static_cast<int>(GetConfiguration().subscriber.tcp.num_executor_writer); }
ECAL_API int GetTcpSubMaxReconnectionAttemps () { return static_cast<int>(GetConfiguration().subscriber.tcp.max_reconnections); }

// Keep this until new logic is implemented
ECAL_API int GetTcpPubsubReaderThreadpoolSize () { return GetTcpSubReaderThreadpoolSize(); };
ECAL_API int GetTcpPubsubWriterThreadpoolSize () { return GetTcpSubWriterThreadpoolSize(); };
ECAL_API int GetTcpPubsubMaxReconnectionAttemps () { return GetTcpSubMaxReconnectionAttemps();};

ECAL_API std::string GetHostGroupName () { return GetConfiguration().transport_layer.shm_options.host_group_name; }

Expand Down Expand Up @@ -156,17 +164,6 @@ namespace eCAL
/////////////////////////////////////
// publisher
/////////////////////////////////////

ECAL_API bool GetPublisherUdpMulticastMode () { return GetConfiguration().publisher.udp.enable; }
ECAL_API bool GetPublisherShmMode () { return GetConfiguration().publisher.shm.enable; }
ECAL_API bool GetPublisherTcpMode () { return GetConfiguration().publisher.tcp.enable; }

ECAL_API size_t GetMemfileMinsizeBytes () { return GetConfiguration().transport_layer.shm_options.memfile_minsize; }
ECAL_API size_t GetMemfileOverprovisioningPercentage () { return GetConfiguration().transport_layer.shm_options.memfile_reserve; }
ECAL_API int GetMemfileAckTimeoutMs () { return GetConfiguration().transport_layer.shm_options.memfile_ack_timeout; }
ECAL_API bool IsMemfileZerocopyEnabled () { return GetConfiguration().transport_layer.shm_options.memfile_zero_copy; }
ECAL_API size_t GetMemfileBufferCount () { return GetConfiguration().transport_layer.shm_options.memfile_buffer_count; }

ECAL_API bool IsTopicTypeSharingEnabled () { return GetConfiguration().registration.share_ttype; }
ECAL_API bool IsTopicDescriptionSharingEnabled () { return GetConfiguration().registration.share_tdesc; }

Expand Down
23 changes: 8 additions & 15 deletions ecal/core/src/config/ecal_config_initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,8 @@ namespace eCAL
multicastOptions.join_all_interfaces = iniConfig.get(NETWORK, "multicast_join_all_if", NET_UDP_MULTICAST_JOIN_ALL_IF_ENABLED);
multicastOptions.npcap_enabled = iniConfig.get(NETWORK, "npcap_enabled", NET_NPCAP_ENABLED);

auto& tcpPubSubOptions = transportLayerOptions.tcp_options;
tcpPubSubOptions.num_executor_reader = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_reader", NET_TCP_PUBSUB_NUM_EXECUTOR_READER);
tcpPubSubOptions.num_executor_writer = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_writer", NET_TCP_PUBSUB_NUM_EXECUTOR_WRITER);
tcpPubSubOptions.max_reconnections = iniConfig.get(NETWORK, "tcp_pubsup_max_reconnections", NET_TCP_PUBSUB_MAX_RECONNECTIONS);

auto& shmOptions = transportLayerOptions.shm_options;
shmOptions.host_group_name = iniConfig.get(NETWORK, "host_group_name", NET_HOST_GROUP_NAME);
shmOptions.memfile_minsize = iniConfig.get(PUBLISHER, "memfile_minsize", PUB_MEMFILE_MINSIZE);
shmOptions.memfile_reserve = iniConfig.get(PUBLISHER, "memfile_reserve", PUB_MEMFILE_RESERVE);
shmOptions.memfile_ack_timeout = iniConfig.get(PUBLISHER, "memfile_ack_timeout", PUB_MEMFILE_ACK_TO);
shmOptions.memfile_buffer_count = iniConfig.get(PUBLISHER, "memfile_buffer_count", PUB_MEMFILE_BUF_COUNT);
shmOptions.drop_out_of_order_messages = iniConfig.get(EXPERIMENTAL, "drop_out_of_order_messages", EXP_DROP_OUT_OF_ORDER_MESSAGES);
shmOptions.memfile_zero_copy = iniConfig.get(PUBLISHER, "memfile_zero_copy", PUB_MEMFILE_ZERO_COPY);

// registration options
auto registrationTimeout = iniConfig.get(COMMON, "registration_timeout", CMN_REGISTRATION_TO);
Expand Down Expand Up @@ -170,17 +159,19 @@ namespace eCAL
// subscriber options
auto& subscriberOptions = subscriber;
subscriberOptions.shm.enable = iniConfig.get(NETWORK, "shm_rec_enabled", NET_SHM_REC_ENABLED) != 0;
subscriberOptions.tcp.enable = iniConfig.get(NETWORK, "tcp_rec_enabled", NET_TCP_REC_ENABLED) != 0;

subscriberOptions.tcp.enable = iniConfig.get(NETWORK, "tcp_rec_enabled", NET_TCP_REC_ENABLED) != 0;
subscriberOptions.tcp.max_reconnections = iniConfig.get(NETWORK, "tcp_pubsup_max_reconnections", NET_TCP_PUBSUB_MAX_RECONNECTIONS);
subscriberOptions.tcp.num_executor_reader = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_reader", NET_TCP_PUBSUB_NUM_EXECUTOR_READER);
subscriberOptions.tcp.num_executor_writer = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_writer", NET_TCP_PUBSUB_NUM_EXECUTOR_WRITER);

subscriberOptions.udp.enable = iniConfig.get(NETWORK, "udp_mc_rec_enabled", NET_UDP_MC_REC_ENABLED) != 0;

// publisher options
auto& publisherOptions = publisher;
publisherOptions.shm.enable = iniConfig.get(PUBLISHER, "use_shm", static_cast<int>(PUB_USE_SHM)) != 0;
publisherOptions.shm.zero_copy_mode = iniConfig.get(PUBLISHER, "memfile_zero_copy", PUB_MEMFILE_ZERO_COPY);
publisherOptions.shm.acknowledge_timeout_ms = iniConfig.get(PUBLISHER, "memfile_ack_timeout", PUB_MEMFILE_ACK_TO);
publisherOptions.shm.memfile_min_size_bytes = iniConfig.get(PUBLISHER, "memfile_minsize", PUB_MEMFILE_MINSIZE);
publisherOptions.shm.memfile_reserve_percent = iniConfig.get(PUBLISHER, "memfile_reserve", PUB_MEMFILE_RESERVE);
publisherOptions.shm.memfile_buffer_count = iniConfig.get(PUBLISHER, "memfile_buffer_count", PUB_MEMFILE_BUF_COUNT);

publisherOptions.udp.enable = iniConfig.get(PUBLISHER, "use_udp_mc", static_cast<int>(PUB_USE_UDP_MC)) != 0;
// TODO PG: Add here when its available in config file
Expand All @@ -191,6 +182,8 @@ namespace eCAL
publisherOptions.share_topic_type = iniConfig.get(PUBLISHER, "share_ttype", PUB_SHARE_TTYPE);

publisherOptions.tcp.enable = iniConfig.get(PUBLISHER, "use_tcp", static_cast<int>(PUB_USE_TCP)) != 0;
publisherOptions.tcp.num_executor_reader = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_reader", NET_TCP_PUBSUB_NUM_EXECUTOR_READER);
publisherOptions.tcp.num_executor_writer = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_writer", NET_TCP_PUBSUB_NUM_EXECUTOR_WRITER);

// timesync options
auto& timesyncOptions = timesync;
Expand Down
5 changes: 0 additions & 5 deletions ecal/tests/cpp/config_test/src/config_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,6 @@ TEST(ConfigDeathTest, user_config_death_test)
SetValue(custom_config.transport_layer.mc_options.sndbuf, (5242880 + 512)),
std::invalid_argument);

// Value exceeds MAX. Default MAX = 100
ASSERT_THROW(
SetValue(custom_config.transport_layer.shm_options.memfile_reserve, 150),
std::invalid_argument);

// Test the registration option limits
// Refresh timeout > registration timeout
ASSERT_THROW(
Expand Down

0 comments on commit 5a835aa

Please sign in to comment.