diff --git a/ecal/core/include/ecal/config/publisher.h b/ecal/core/include/ecal/config/publisher.h index 1af155c9b3..63e45d1411 100644 --- a/ecal/core/include/ecal/config/publisher.h +++ b/ecal/core/include/ecal/config/publisher.h @@ -126,6 +126,9 @@ namespace eCAL struct Configuration { bool enable; //!< enable layer + + size_t num_executor_reader{}; //!< reader amount of threads that shall execute workload (Default: 4) + size_t num_executor_writer{}; //!< writer amount of threads that shall execute workload (Default: 4) }; } diff --git a/ecal/core/include/ecal/config/subscriber.h b/ecal/core/include/ecal/config/subscriber.h index d306250d43..9b283a4bb6 100644 --- a/ecal/core/include/ecal/config/subscriber.h +++ b/ecal/core/include/ecal/config/subscriber.h @@ -53,6 +53,11 @@ namespace eCAL struct Configuration { bool enable; //!< enable layer + + size_t num_executor_reader{}; //!< reader amount of threads that shall execute workload (Default: 4) + size_t num_executor_writer{}; //!< writer amount of threads that shall execute workload (Default: 4) + + size_t max_reconnections{}; //!< reconnection attemps the session will try to reconnect in (Default: 5) }; } diff --git a/ecal/core/include/ecal/config/transport_layer.h b/ecal/core/include/ecal/config/transport_layer.h index 97e7f8ce19..fac354b5cc 100644 --- a/ecal/core/include/ecal/config/transport_layer.h +++ b/ecal/core/include/ecal/config/transport_layer.h @@ -31,29 +31,12 @@ namespace eCAL { namespace TransportLayer { - namespace TCPPubSub - { - struct Configuration - { - size_t num_executor_reader{}; //!< Tcp_pubsub reader amount of threads that shall execute workload (Default: 4) - size_t num_executor_writer{}; //!< Tcp_pubsub writer amount of threads that shall execute workload (Default: 4) - size_t max_reconnections{}; //!< Tcp_pubsub reconnection attemps the session will try to reconnect in (Default: 5) - }; - } - namespace SHM { struct Configuration { std::string host_group_name{}; /*!< Common host group name that enables interprocess mechanisms across - (virtual) host borders (e.g, Docker); by default equivalent to local host name (Default: "")*/ - Types::ConstrainedInteger<4096, 4096> memfile_minsize{}; //!< Default memory file size for new publisher (Default: 4096) - Types::ConstrainedInteger<50, 1, 100> memfile_reserve{}; //!< Dynamic file size reserve before recreating memory file if topic size changes in % (Default: 50) - unsigned int memfile_ack_timeout{}; //!< Publisher timeout for ack event from subscriber that memory file content is processed (Default: 0) - Types::ConstrainedInteger<0, 1> memfile_buffer_count{}; //!< Number of parallel used memory file buffers for 1:n publish/subscribe ipc connections (Default = 1) - bool drop_out_of_order_messages{}; //!< (Default: ) - bool memfile_zero_copy{}; //!< Allow matching subscriber to access memory file without copying its content in advance (Default: false) - }; + (virtual) host borders (e.g, Docker); by default equivalent to local host name (Default: "")*/ }; } namespace UDPMC @@ -85,7 +68,6 @@ namespace eCAL { bool drop_out_of_order_messages{}; //!< Enable dropping of payload messages that arrive out of order (Default: false) UDPMC::Configuration mc_options{}; - TCPPubSub::Configuration tcp_options{}; SHM::Configuration shm_options{}; }; } diff --git a/ecal/core/include/ecal/ecal_config.h b/ecal/core/include/ecal/ecal_config.h index acc5caa5cb..7d0cc9cc92 100644 --- a/ecal/core/include/ecal/ecal_config.h +++ b/ecal/core/include/ecal/ecal_config.h @@ -69,6 +69,13 @@ namespace eCAL ECAL_API int GetTcpPubsubWriterThreadpoolSize (); ECAL_API int GetTcpPubsubMaxReconnectionAttemps (); + ECAL_API int GetTcpPubReaderThreadpoolSize (); + ECAL_API int GetTcpPubWriterThreadpoolSize (); + + ECAL_API int GetTcpSubReaderThreadpoolSize (); + ECAL_API int GetTcpSubWriterThreadpoolSize (); + ECAL_API int GetTcpSubMaxReconnectionAttemps (); + ECAL_API std::string GetHostGroupName (); ///////////////////////////////////// @@ -104,16 +111,6 @@ namespace eCAL ///////////////////////////////////// // publisher ///////////////////////////////////// - ECAL_API bool GetPublisherShmMode (); - ECAL_API bool GetPublisherTcpMode (); - ECAL_API bool GetPublisherUdpMulticastMode (); - - ECAL_API size_t GetMemfileMinsizeBytes (); - ECAL_API size_t GetMemfileOverprovisioningPercentage (); - ECAL_API int GetMemfileAckTimeoutMs (); - ECAL_API bool IsMemfileZerocopyEnabled (); - ECAL_API size_t GetMemfileBufferCount (); - ECAL_API bool IsTopicTypeSharingEnabled (); ECAL_API bool IsTopicDescriptionSharingEnabled (); @@ -128,8 +125,6 @@ namespace eCAL ///////////////////////////////////// namespace Experimental { - ECAL_API bool IsShmMonitoringEnabled (); - ECAL_API bool IsNetworkMonitoringDisabled (); ECAL_API size_t GetShmMonitoringQueueSize (); ECAL_API std::string GetShmMonitoringDomain (); ECAL_API bool GetDropOutOfOrderMessages (); diff --git a/ecal/core/src/config/ecal_config.cpp b/ecal/core/src/config/ecal_config.cpp index 25328e4efa..61a28ae3d2 100644 --- a/ecal/core/src/config/ecal_config.cpp +++ b/ecal/core/src/config/ecal_config.cpp @@ -117,9 +117,17 @@ namespace eCAL ECAL_API bool IsNpcapEnabled () { return GetConfiguration().transport_layer.mc_options.npcap_enabled; } - ECAL_API int GetTcpPubsubReaderThreadpoolSize () { return static_cast(GetConfiguration().transport_layer.tcp_options.num_executor_reader); } - ECAL_API int GetTcpPubsubWriterThreadpoolSize () { return static_cast(GetConfiguration().transport_layer.tcp_options.num_executor_writer); } - ECAL_API int GetTcpPubsubMaxReconnectionAttemps () { return static_cast(GetConfiguration().transport_layer.tcp_options.max_reconnections); } + ECAL_API int GetTcpPubReaderThreadpoolSize () { return static_cast(GetConfiguration().publisher.tcp.num_executor_reader); } + ECAL_API int GetTcpPubWriterThreadpoolSize () { return static_cast(GetConfiguration().publisher.tcp.num_executor_writer); } + + ECAL_API int GetTcpSubReaderThreadpoolSize () { return static_cast(GetConfiguration().subscriber.tcp.num_executor_reader); } + ECAL_API int GetTcpSubWriterThreadpoolSize () { return static_cast(GetConfiguration().subscriber.tcp.num_executor_writer); } + ECAL_API int GetTcpSubMaxReconnectionAttemps () { return static_cast(GetConfiguration().subscriber.tcp.max_reconnections); } + + // Keep this until new logic is implemented + ECAL_API int GetTcpPubsubReaderThreadpoolSize () { return GetTcpSubReaderThreadpoolSize(); }; + ECAL_API int GetTcpPubsubWriterThreadpoolSize () { return GetTcpSubWriterThreadpoolSize(); }; + ECAL_API int GetTcpPubsubMaxReconnectionAttemps () { return GetTcpSubMaxReconnectionAttemps();}; ECAL_API std::string GetHostGroupName () { return GetConfiguration().transport_layer.shm_options.host_group_name; } @@ -156,17 +164,6 @@ namespace eCAL ///////////////////////////////////// // publisher ///////////////////////////////////// - - ECAL_API bool GetPublisherUdpMulticastMode () { return GetConfiguration().publisher.udp.enable; } - ECAL_API bool GetPublisherShmMode () { return GetConfiguration().publisher.shm.enable; } - ECAL_API bool GetPublisherTcpMode () { return GetConfiguration().publisher.tcp.enable; } - - ECAL_API size_t GetMemfileMinsizeBytes () { return GetConfiguration().transport_layer.shm_options.memfile_minsize; } - ECAL_API size_t GetMemfileOverprovisioningPercentage () { return GetConfiguration().transport_layer.shm_options.memfile_reserve; } - ECAL_API int GetMemfileAckTimeoutMs () { return GetConfiguration().transport_layer.shm_options.memfile_ack_timeout; } - ECAL_API bool IsMemfileZerocopyEnabled () { return GetConfiguration().transport_layer.shm_options.memfile_zero_copy; } - ECAL_API size_t GetMemfileBufferCount () { return GetConfiguration().transport_layer.shm_options.memfile_buffer_count; } - ECAL_API bool IsTopicTypeSharingEnabled () { return GetConfiguration().registration.share_ttype; } ECAL_API bool IsTopicDescriptionSharingEnabled () { return GetConfiguration().registration.share_tdesc; } diff --git a/ecal/core/src/config/ecal_config_initializer.cpp b/ecal/core/src/config/ecal_config_initializer.cpp index 0606164ceb..6c8386208e 100644 --- a/ecal/core/src/config/ecal_config_initializer.cpp +++ b/ecal/core/src/config/ecal_config_initializer.cpp @@ -130,19 +130,8 @@ namespace eCAL multicastOptions.join_all_interfaces = iniConfig.get(NETWORK, "multicast_join_all_if", NET_UDP_MULTICAST_JOIN_ALL_IF_ENABLED); multicastOptions.npcap_enabled = iniConfig.get(NETWORK, "npcap_enabled", NET_NPCAP_ENABLED); - auto& tcpPubSubOptions = transportLayerOptions.tcp_options; - tcpPubSubOptions.num_executor_reader = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_reader", NET_TCP_PUBSUB_NUM_EXECUTOR_READER); - tcpPubSubOptions.num_executor_writer = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_writer", NET_TCP_PUBSUB_NUM_EXECUTOR_WRITER); - tcpPubSubOptions.max_reconnections = iniConfig.get(NETWORK, "tcp_pubsup_max_reconnections", NET_TCP_PUBSUB_MAX_RECONNECTIONS); - auto& shmOptions = transportLayerOptions.shm_options; shmOptions.host_group_name = iniConfig.get(NETWORK, "host_group_name", NET_HOST_GROUP_NAME); - shmOptions.memfile_minsize = iniConfig.get(PUBLISHER, "memfile_minsize", PUB_MEMFILE_MINSIZE); - shmOptions.memfile_reserve = iniConfig.get(PUBLISHER, "memfile_reserve", PUB_MEMFILE_RESERVE); - shmOptions.memfile_ack_timeout = iniConfig.get(PUBLISHER, "memfile_ack_timeout", PUB_MEMFILE_ACK_TO); - shmOptions.memfile_buffer_count = iniConfig.get(PUBLISHER, "memfile_buffer_count", PUB_MEMFILE_BUF_COUNT); - shmOptions.drop_out_of_order_messages = iniConfig.get(EXPERIMENTAL, "drop_out_of_order_messages", EXP_DROP_OUT_OF_ORDER_MESSAGES); - shmOptions.memfile_zero_copy = iniConfig.get(PUBLISHER, "memfile_zero_copy", PUB_MEMFILE_ZERO_COPY); // registration options auto registrationTimeout = iniConfig.get(COMMON, "registration_timeout", CMN_REGISTRATION_TO); @@ -170,17 +159,19 @@ namespace eCAL // subscriber options auto& subscriberOptions = subscriber; subscriberOptions.shm.enable = iniConfig.get(NETWORK, "shm_rec_enabled", NET_SHM_REC_ENABLED) != 0; - subscriberOptions.tcp.enable = iniConfig.get(NETWORK, "tcp_rec_enabled", NET_TCP_REC_ENABLED) != 0; + + subscriberOptions.tcp.enable = iniConfig.get(NETWORK, "tcp_rec_enabled", NET_TCP_REC_ENABLED) != 0; + subscriberOptions.tcp.max_reconnections = iniConfig.get(NETWORK, "tcp_pubsup_max_reconnections", NET_TCP_PUBSUB_MAX_RECONNECTIONS); + subscriberOptions.tcp.num_executor_reader = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_reader", NET_TCP_PUBSUB_NUM_EXECUTOR_READER); + subscriberOptions.tcp.num_executor_writer = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_writer", NET_TCP_PUBSUB_NUM_EXECUTOR_WRITER); + subscriberOptions.udp.enable = iniConfig.get(NETWORK, "udp_mc_rec_enabled", NET_UDP_MC_REC_ENABLED) != 0; // publisher options auto& publisherOptions = publisher; publisherOptions.shm.enable = iniConfig.get(PUBLISHER, "use_shm", static_cast(PUB_USE_SHM)) != 0; - publisherOptions.shm.zero_copy_mode = iniConfig.get(PUBLISHER, "memfile_zero_copy", PUB_MEMFILE_ZERO_COPY); - publisherOptions.shm.acknowledge_timeout_ms = iniConfig.get(PUBLISHER, "memfile_ack_timeout", PUB_MEMFILE_ACK_TO); publisherOptions.shm.memfile_min_size_bytes = iniConfig.get(PUBLISHER, "memfile_minsize", PUB_MEMFILE_MINSIZE); publisherOptions.shm.memfile_reserve_percent = iniConfig.get(PUBLISHER, "memfile_reserve", PUB_MEMFILE_RESERVE); - publisherOptions.shm.memfile_buffer_count = iniConfig.get(PUBLISHER, "memfile_buffer_count", PUB_MEMFILE_BUF_COUNT); publisherOptions.udp.enable = iniConfig.get(PUBLISHER, "use_udp_mc", static_cast(PUB_USE_UDP_MC)) != 0; // TODO PG: Add here when its available in config file @@ -191,6 +182,8 @@ namespace eCAL publisherOptions.share_topic_type = iniConfig.get(PUBLISHER, "share_ttype", PUB_SHARE_TTYPE); publisherOptions.tcp.enable = iniConfig.get(PUBLISHER, "use_tcp", static_cast(PUB_USE_TCP)) != 0; + publisherOptions.tcp.num_executor_reader = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_reader", NET_TCP_PUBSUB_NUM_EXECUTOR_READER); + publisherOptions.tcp.num_executor_writer = iniConfig.get(NETWORK, "tcp_pubsup_num_executor_writer", NET_TCP_PUBSUB_NUM_EXECUTOR_WRITER); // timesync options auto& timesyncOptions = timesync; diff --git a/ecal/tests/cpp/config_test/src/config_test.cpp b/ecal/tests/cpp/config_test/src/config_test.cpp index 7e5a4f6400..93c2c75bb1 100644 --- a/ecal/tests/cpp/config_test/src/config_test.cpp +++ b/ecal/tests/cpp/config_test/src/config_test.cpp @@ -166,11 +166,6 @@ TEST(ConfigDeathTest, user_config_death_test) SetValue(custom_config.transport_layer.mc_options.sndbuf, (5242880 + 512)), std::invalid_argument); - // Value exceeds MAX. Default MAX = 100 - ASSERT_THROW( - SetValue(custom_config.transport_layer.shm_options.memfile_reserve, 150), - std::invalid_argument); - // Test the registration option limits // Refresh timeout > registration timeout ASSERT_THROW(