diff --git a/implementation/endpoints/src/endpoint_manager_base.cpp b/implementation/endpoints/src/endpoint_manager_base.cpp index 22736bf64..81c741954 100644 --- a/implementation/endpoints/src/endpoint_manager_base.cpp +++ b/implementation/endpoints/src/endpoint_manager_base.cpp @@ -35,35 +35,36 @@ endpoint_manager_base::endpoint_manager_base( } std::shared_ptr endpoint_manager_base::create_local(client_t _client) { - std::lock_guard its_lock(local_endpoint_mutex_); + std::scoped_lock its_lock(local_endpoint_mutex_); return create_local_unlocked(_client); } void endpoint_manager_base::remove_local(const client_t _client) { - std::shared_ptr its_endpoint(find_local(_client)); + std::scoped_lock its_lock(local_endpoint_mutex_); + VSOMEIP_INFO << "emb::" << __func__ << ": client " << std::hex << _client; + std::shared_ptr its_endpoint(find_local_unlocked(_client)); if (its_endpoint) { its_endpoint->register_error_handler(nullptr); its_endpoint->stop(); - VSOMEIP_INFO << "Client [" << std::hex << rm_->get_client() << "] is closing connection to [" - << std::hex << _client << "]" << " endpoint > " << its_endpoint; - std::lock_guard its_lock(local_endpoint_mutex_); + VSOMEIP_INFO << "Client [" << std::hex << rm_->get_client() + << "] is closing connection to [" << std::hex << _client << "]" + << " endpoint > " << its_endpoint; local_endpoints_.erase(_client); } } std::shared_ptr endpoint_manager_base::find_or_create_local(client_t _client) { - std::shared_ptr its_endpoint {nullptr}; - { - std::scoped_lock its_lock {local_endpoint_mutex_}; - its_endpoint = find_local_unlocked(_client); - if (!its_endpoint) { - its_endpoint = create_local_unlocked(_client); - } + std::scoped_lock its_lock {local_endpoint_mutex_}; + std::shared_ptr its_endpoint {find_local_unlocked(_client)}; + if (!its_endpoint) { + VSOMEIP_INFO << "emb::" << __func__ << ": create_client " << std::hex << _client; + its_endpoint = create_local_unlocked(_client); } if (its_endpoint) { its_endpoint->start(); } else { - VSOMEIP_ERROR << __func__ << ": couldn't find or create endpoint for client " << _client; + VSOMEIP_ERROR << "emb::" << __func__ << ": couldn't find or create endpoint for client " + << std::hex << _client; } return its_endpoint; } diff --git a/implementation/endpoints/src/endpoint_manager_impl.cpp b/implementation/endpoints/src/endpoint_manager_impl.cpp index 3f7597af8..30d87b413 100644 --- a/implementation/endpoints/src/endpoint_manager_impl.cpp +++ b/implementation/endpoints/src/endpoint_manager_impl.cpp @@ -71,7 +71,7 @@ std::shared_ptr endpoint_manager_impl::find_or_create_remote_client( } } if (start_endpoint && its_endpoint && configuration_->is_someip(_service, _instance) - && rm_->get_routing_state() != routing_state_e::RS_SUSPENDED) { + && !rm_->is_suspended()) { its_endpoint->start(); } return its_endpoint; @@ -97,7 +97,7 @@ void endpoint_manager_impl::find_or_create_remote_client( } } const bool is_someip {configuration_->is_someip(_service, _instance)}; - const bool is_suspended {rm_->get_routing_state() == routing_state_e::RS_SUSPENDED}; + const bool is_suspended {rm_->is_suspended()}; if (start_reliable_endpoint && its_reliable_endpoint && is_someip && !is_suspended) { its_reliable_endpoint->start(); @@ -289,7 +289,7 @@ endpoint_manager_impl::create_server_endpoint(uint16_t _port, bool _reliable, bo if (its_server_endpoint) { server_endpoints_[_port][_reliable] = its_server_endpoint; - if (rm_->get_routing_state() != routing_state_e::RS_SUSPENDED) { + if (!rm_->is_suspended()) { its_server_endpoint->start(); } } else { @@ -1395,140 +1395,27 @@ bool endpoint_manager_impl::is_used_endpoint(endpoint* const _endpoint) const { } void endpoint_manager_impl::suspend() { - client_endpoints_t its_client_endpoints; - server_endpoints_t its_server_endpoints; - - { - std::scoped_lock its_lock {endpoint_mutex_}; - its_client_endpoints = client_endpoints_; - its_server_endpoints = server_endpoints_; - } - - // stop client endpoints - std::set> its_suspended_client_endpoints; - for (const auto& [its_address, ports] : its_client_endpoints) { - for (const auto& [its_port, protocols] : ports) { - for (const auto& [its_protocol, partitions] : protocols) { - for (const auto& [its_partition, its_endpoint] : partitions) { - its_endpoint->stop(); - auto its_client_endpoint { - std::dynamic_pointer_cast(its_endpoint)}; - if (its_client_endpoint) { - its_suspended_client_endpoints.insert(its_client_endpoint); - } - } - } - } - } - - // start server endpoints - for (const auto& [its_port, protocols] : its_server_endpoints) { - for (const auto& [its_protocol, its_endpoint] : protocols) { - its_endpoint->stop(); - } - } - // check that the clients are established again - size_t its_interval {MIN_ENDPOINT_WAIT_INTERVAL}; - size_t its_sum {0}; - const size_t its_max {SUM_ENDPOINT_WAIT_INTERVAL}; - bool is_done; - do { - is_done = true; - std::this_thread::sleep_for(std::chrono::milliseconds(its_interval)); - for (auto& its_endpoint : its_suspended_client_endpoints) { - is_done = is_done && its_endpoint->is_closed(); - if (!is_done) - break; - } - if (its_interval < MAX_ENDPOINT_WAIT_INTERVAL) { - its_interval <<= 1; - } - its_sum += its_interval; - } while (!is_done && its_sum < its_max); - - if (!is_done) { - for (const auto& its_endpoint : its_suspended_client_endpoints) { - if (!its_endpoint->is_closed()) { - boost::asio::ip::address its_address; - (void)its_endpoint->get_remote_address(its_address); - - VSOMEIP_WARNING << "endpoint_manager_impl::" << __func__ - << ": Suspending client port [" << std::dec - << its_endpoint->get_local_port() << "] --> [" - << its_address.to_string() << ":" << its_endpoint->get_remote_port() - << "] failed."; - } - } - } + // do nothing } void endpoint_manager_impl::resume() { client_endpoints_t its_client_endpoints; - server_endpoints_t its_server_endpoints; - { std::scoped_lock its_lock {endpoint_mutex_}; its_client_endpoints = client_endpoints_; - its_server_endpoints = server_endpoints_; } - // start server endpoints - for (const auto& [its_port, protocols] : its_server_endpoints) { - for (const auto& [its_protocol, its_endpoint] : protocols) { - its_endpoint->restart(); - } - } - - // start client endpoints + // restart client endpoints std::set> its_resumed_client_endpoints; for (const auto& [its_address, ports] : its_client_endpoints) { for (const auto& [its_port, protocols] : ports) { for (const auto& [its_protocol, partitions] : protocols) { for (const auto& [its_partition, its_endpoint] : partitions) { its_endpoint->restart(); - auto its_client_endpoint { - std::dynamic_pointer_cast(its_endpoint)}; - if (its_client_endpoint) { - its_resumed_client_endpoints.insert(its_client_endpoint); - } } } } } - - // check that the clients are established again - size_t its_interval {MIN_ENDPOINT_WAIT_INTERVAL}; - size_t its_sum {0}; - const size_t its_max {SUM_ENDPOINT_WAIT_INTERVAL}; - bool is_done; - do { - is_done = true; - std::this_thread::sleep_for(std::chrono::milliseconds(its_interval)); - for (const auto& its_endpoint : its_resumed_client_endpoints) { - is_done = is_done && its_endpoint->is_established(); - if (!is_done) - break; - } - if (its_interval < MAX_ENDPOINT_WAIT_INTERVAL) { - its_interval <<= 1; - } - its_sum += its_interval; - } while (!is_done && its_sum < its_max); - - if (!is_done) { - for (const auto& its_endpoint : its_resumed_client_endpoints) { - if (!its_endpoint->is_established()) { - boost::asio::ip::address its_address; - (void)its_endpoint->get_remote_address(its_address); - - VSOMEIP_WARNING << "endpoint_manager_impl::" << __func__ - << ": Resuming client port [" << std::dec - << its_endpoint->get_local_port() << "] --> [" - << its_address.to_string() << ":" << its_endpoint->get_remote_port() - << "] failed."; - } - } - } } } // namespace vsomeip_v3 diff --git a/implementation/endpoints/src/local_tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/local_tcp_client_endpoint_impl.cpp index 23e310ce8..f2745414a 100644 --- a/implementation/endpoints/src/local_tcp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_tcp_client_endpoint_impl.cpp @@ -46,7 +46,6 @@ void local_tcp_client_endpoint_impl::restart(bool _force) { if (!_force && state_ == cei_state_e::CONNECTING) { return; } - state_ = cei_state_e::CONNECTING; { std::lock_guard its_lock(mutex_); sending_blocked_ = false; @@ -57,6 +56,7 @@ void local_tcp_client_endpoint_impl::restart(bool _force) { std::lock_guard its_lock(socket_mutex_); shutdown_and_close_socket_unlocked(true); } + state_ = cei_state_e::CONNECTING; was_not_connected_ = true; reconnect_counter_ = 0; start_connect_timer(); @@ -119,15 +119,15 @@ void local_tcp_client_endpoint_impl::connect() { if (its_error) { VSOMEIP_WARNING << "ltcei::connect: couldn't disable " << "Nagle algorithm: " << its_error.message() - << " remote:" << remote_.port() - << " endpoint > " << this << " state_ > " << static_cast(state_.load()); + << " remote: " << remote_.port() << " endpoint: " << this + << " state_: " << static_cast(state_.load()); } socket_->set_option(boost::asio::socket_base::keep_alive(true), its_error); if (its_error) { VSOMEIP_WARNING << "ltcei::connect: couldn't enable " - << "keep_alive: " << its_error.message() - << " remote:" << remote_.port() - << " endpoint > " << this << " state_ > " << static_cast(state_.load()); + << "keep_alive: " << its_error.message() << " remote:" << remote_.port() + << " endpoint > " << this << " state_ > " + << static_cast(state_.load()); } // Setting the TIME_WAIT to 0 seconds forces RST to always be sent in reponse to a FIN // Since this is endpoint for internal communication, setting the TIME_WAIT to 5 seconds @@ -135,29 +135,29 @@ void local_tcp_client_endpoint_impl::connect() { socket_->set_option(boost::asio::socket_base::linger(true, 5), its_error); if (its_error) { VSOMEIP_WARNING << "ltcei::connect: couldn't enable " - << "SO_LINGER: " << its_error.message() - << " remote:" << remote_.port() - << " endpoint > " << this << " state_ > " << static_cast(state_.load()); + << "SO_LINGER: " << its_error.message() << " remote:" << remote_.port() + << " endpoint > " << this << " state_ > " + << static_cast(state_.load()); } socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error); if (its_error) { - VSOMEIP_WARNING << "ltcei::" << __func__ - << ": Cannot enable SO_REUSEADDR" << "(" << its_error.message() << ")" - << " endpoint > " << this << " state_ > " << static_cast(state_.load()); + VSOMEIP_WARNING << "ltcei::" << __func__ << ": Cannot enable SO_REUSEADDR" << "(" + << its_error.message() << ")" + << " endpoint > " << this << " state_ > " + << static_cast(state_.load()); } socket_->bind(local_, its_error); if (its_error) { - VSOMEIP_WARNING << "ltcei::" << __func__ - << ": Cannot bind to client port " << local_.port() << "(" - << its_error.message() << ")" - << " endpoint > " << this << " state_ > " << static_cast(state_.load()); + VSOMEIP_WARNING << "ltcei::" << __func__ << ": Cannot bind to client port " + << local_.port() << "(" << its_error.message() << ")" + << " endpoint > " << this << " state_ > " + << static_cast(state_.load()); try { - strand_.post( - std::bind(&client_endpoint_impl::connect_cbk, shared_from_this(), - its_connect_error)); - } catch (const std::exception &e) { - VSOMEIP_ERROR << "ltcei::connect: " << e.what() - << " endpoint > " << this << " state_ > " << static_cast(state_.load()); + strand_.post(std::bind(&client_endpoint_impl::connect_cbk, shared_from_this(), + its_connect_error)); + } catch (const std::exception& e) { + VSOMEIP_ERROR << "ltcei::connect: " << e.what() << " endpoint > " << this + << " state_ > " << static_cast(state_.load()); } return; } diff --git a/implementation/endpoints/src/local_tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/local_tcp_server_endpoint_impl.cpp index 3b2e175e7..f82081b6d 100644 --- a/implementation/endpoints/src/local_tcp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_tcp_server_endpoint_impl.cpp @@ -241,16 +241,24 @@ void local_tcp_server_endpoint_impl::accept_cbk( // Nagle algorithm off new_connection_socket.set_option(boost::asio::ip::tcp::no_delay(true), its_error); if (its_error) { - VSOMEIP_WARNING << "ltsei::accept_cbk: couldn't disable " + VSOMEIP_WARNING << "ltsei::" << __func__ << ": couldn't disable " << "Nagle algorithm: " << its_error.message() << " endpoint > " << this; } new_connection_socket.set_option(boost::asio::socket_base::keep_alive(true), its_error); if (its_error) { - VSOMEIP_WARNING << "ltsei::accept_cbk: couldn't enable " + VSOMEIP_WARNING << "ltsei::" << __func__ << ": couldn't enable " << "keep_alive: " << its_error.message() << " endpoint > " << this; } + // Setting the TIME_WAIT to 0 seconds forces RST to always be sent in reponse to a FIN + // Since this is endpoint for internal communication, setting the TIME_WAIT to 5 seconds + // should be enough to ensure the ACK to the FIN arrives to the server endpoint. + new_connection_socket.set_option(boost::asio::socket_base::linger(true, 5), its_error); + if (its_error) { + VSOMEIP_WARNING << "ltsei::" << __func__ << ": setting SO_LINGER failed (" + << its_error.message() << ") " << this; + } } } if (_error != boost::asio::error::bad_descriptor @@ -258,10 +266,9 @@ void local_tcp_server_endpoint_impl::accept_cbk( && _error != boost::asio::error::no_descriptors) { start(); } else if (_error == boost::asio::error::no_descriptors) { - VSOMEIP_ERROR << "ltsei::accept_cbk: " - << _error.message() << " (" << std::dec << _error.value() - << ") Will try to accept again in 1000ms" - << " endpoint > " << this; + VSOMEIP_ERROR << "ltsei::" << __func__ << ": " << _error.message() << " (" << std::dec + << _error.value() << ") Will try to accept again in 1000ms" + << " endpoint > " << this; auto its_timer = std::make_shared(io_, std::chrono::milliseconds(1000)); @@ -723,10 +730,8 @@ void local_tcp_server_endpoint_impl::connection::receive_cbk( } while (recv_buffer_size_ > 0 && found_message); } - if (is_stopped_ - || _error == boost::asio::error::eof - || _error == boost::asio::error::connection_reset - || is_error) { + if (is_stopped_ || _error == boost::asio::error::eof + || _error == boost::asio::error::connection_reset || is_error) { shutdown_and_close(); its_server->remove_connection(bound_client_); its_server->configuration_->get_policy_manager()->remove_client_to_sec_client_mapping(bound_client_); diff --git a/implementation/endpoints/src/local_uds_client_endpoint_impl.cpp b/implementation/endpoints/src/local_uds_client_endpoint_impl.cpp index f534a26ae..7791efa2a 100644 --- a/implementation/endpoints/src/local_uds_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_uds_client_endpoint_impl.cpp @@ -49,7 +49,6 @@ void local_uds_client_endpoint_impl::restart(bool _force) { if (!_force && state_ == cei_state_e::CONNECTING) { return; } - state_ = cei_state_e::CONNECTING; { std::lock_guard its_lock(mutex_); sending_blocked_ = false; @@ -60,6 +59,7 @@ void local_uds_client_endpoint_impl::restart(bool _force) { std::lock_guard its_lock(socket_mutex_); shutdown_and_close_socket_unlocked(true); } + state_ = cei_state_e::CONNECTING; was_not_connected_ = true; reconnect_counter_ = 0; start_connect_timer(); diff --git a/implementation/endpoints/src/local_uds_server_endpoint_impl.cpp b/implementation/endpoints/src/local_uds_server_endpoint_impl.cpp index 5b8560bdc..2d9e707c6 100644 --- a/implementation/endpoints/src/local_uds_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_uds_server_endpoint_impl.cpp @@ -826,10 +826,8 @@ void local_uds_server_endpoint_impl::connection::receive_cbk( } while (recv_buffer_size_ > 0 && found_message); } - if (is_stopped_ - || _error == boost::asio::error::eof - || _error == boost::asio::error::connection_reset - || is_error) { + if (is_stopped_ || _error == boost::asio::error::eof + || _error == boost::asio::error::connection_reset || is_error) { shutdown_and_close(); its_server->remove_connection(bound_client_); its_server->configuration_->get_policy_manager()->remove_client_to_sec_client_mapping(bound_client_); diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp index b62926d84..6d3c3cb4a 100644 --- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp @@ -87,7 +87,6 @@ void tcp_client_endpoint_impl::restart(bool _force) { << its_connect_duration; } } - self->state_ = cei_state_e::CONNECTING; std::string address_port_local; { std::lock_guard its_lock(self->socket_mutex_); @@ -95,6 +94,7 @@ void tcp_client_endpoint_impl::restart(bool _force) { self->shutdown_and_close_socket_unlocked(true); self->recv_buffer_ = std::make_shared(self->recv_buffer_size_initial_, 0); } + self->state_ = cei_state_e::CONNECTING; self->was_not_connected_ = true; self->reconnect_counter_ = 0; { @@ -677,8 +677,8 @@ void tcp_client_endpoint_impl::receive_cbk( } if (invalid_parameter_detected) { - state_ = cei_state_e::CONNECTING; shutdown_and_close_socket_unlocked(false); + state_ = cei_state_e::CONNECTING; its_lock.unlock(); // wait_until_sent interprets "no error" as timeout. @@ -708,8 +708,8 @@ void tcp_client_endpoint_impl::receive_cbk( << "Restarting connection. " << "local: " << get_address_port_local() << " remote: " << get_address_port_remote(); - state_ = cei_state_e::CONNECTING; shutdown_and_close_socket_unlocked(false); + state_ = cei_state_e::CONNECTING; its_lock.unlock(); // wait_until_sent interprets "no error" as timeout. @@ -746,8 +746,8 @@ void tcp_client_endpoint_impl::receive_cbk( << " local: " << get_address_port_local() << " remote: " << get_address_port_remote() << ". Restarting connection due to missing/broken data TCP stream."; - state_ = cei_state_e::CONNECTING; shutdown_and_close_socket_unlocked(false); + state_ = cei_state_e::CONNECTING; its_lock.unlock(); // wait_until_sent interprets "no error" as timeout. @@ -787,8 +787,8 @@ void tcp_client_endpoint_impl::receive_cbk( " restarting" << get_remote_information(); } else { VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk restarting."; - state_ = cei_state_e::CONNECTING; shutdown_and_close_socket_unlocked(false); + state_ = cei_state_e::CONNECTING; its_lock.unlock(); // wait_until_sent interprets "no error" as timeout. @@ -970,12 +970,12 @@ void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error, VSOMEIP_WARNING << "tce::send_cbk endpoint is already restarting:" << get_remote_information(); } else { - state_ = cei_state_e::CONNECTING; shutdown_and_close_socket(false); std::shared_ptr its_host = endpoint_host_.lock(); if (its_host) { its_host->on_disconnect(shared_from_this()); } + state_ = cei_state_e::CONNECTING; restart(true); } service_t its_service(0); diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp index 45561b67c..dea7269fc 100644 --- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp @@ -258,8 +258,16 @@ void tcp_server_endpoint_impl::accept_cbk(connection::ptr _connection, new_connection_socket.set_option(boost::asio::socket_base::keep_alive(true), its_error); if (its_error) { - VSOMEIP_WARNING << "tcp_server_endpoint::connect: couldn't enable " - << "keep_alive: " << its_error.message(); + VSOMEIP_WARNING << "tsei::" << __func__ + << ": couldn't enable keep_alive: " << its_error.message(); + } + // Setting the TIME_WAIT to 0 seconds forces RST to always be sent in reponse to a FIN + // The linger is needed to be set for suspend to RAM, since after resuming the + // connection could be lost and the socket could get stuck in TIME_WAIT for 120 seconds + new_connection_socket.set_option(boost::asio::socket_base::linger(true, 5), its_error); + if (its_error) { + VSOMEIP_WARNING << "tsei::" << __func__ + << ": setting SO_LINGER failed: " << its_error.message(); } } if (!its_error) { @@ -413,10 +421,12 @@ void tcp_server_endpoint_impl::connection::receive() { void tcp_server_endpoint_impl::connection::stop() { std::lock_guard its_lock(socket_mutex_); + if (socket_.is_open()) { boost::system::error_code its_error; auto its_server {server_.lock()}; + if (its_server && its_server->is_suspended()) { socket_.set_option(boost::asio::socket_base::linger(true, 0), its_error); if (its_error) { @@ -509,6 +519,7 @@ void tcp_server_endpoint_impl::connection::receive_cbk(boost::system::error_code " couldn't lock server_"; return; } + #if 0 std::stringstream msg; for (std::size_t i = 0; i < _bytes + recv_buffer_size_; ++i) @@ -979,6 +990,7 @@ void tcp_server_endpoint_impl::connection::wait_until_sent( return; std::lock_guard its_lock(its_server->mutex_); + auto it = its_server->targets_.find(remote_); if (it != its_server->targets_.end()) { auto& its_data = it->second; diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp index 890c2082d..1a130f0f0 100644 --- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp @@ -195,7 +195,6 @@ void udp_client_endpoint_impl::restart(bool _force) { if (!_force && state_ == cei_state_e::CONNECTING) { return; } - state_ = cei_state_e::CONNECTING; { std::lock_guard its_lock(mutex_); queue_.clear(); @@ -205,11 +204,12 @@ void udp_client_endpoint_impl::restart(bool _force) { std::lock_guard its_lock(socket_mutex_); local = get_address_port_local(); } - shutdown_and_close_socket(false); was_not_connected_ = true; reconnect_counter_ = 0; VSOMEIP_WARNING << "uce::restart: local: " << local << " remote: " << get_address_port_remote(); + shutdown_and_close_socket(false); + state_ = cei_state_e::CONNECTING; start_connect_timer(); } @@ -615,12 +615,12 @@ void udp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error, VSOMEIP_WARNING << "uce::send_cbk endpoint is already restarting:" << get_remote_information(); } else { - state_ = cei_state_e::CONNECTING; shutdown_and_close_socket(false); std::shared_ptr its_host = endpoint_host_.lock(); if (its_host) { its_host->on_disconnect(shared_from_this()); } + state_ = cei_state_e::CONNECTING; restart(true); } service_t its_service(0); diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp index 567167e46..6101c5f04 100644 --- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp @@ -146,6 +146,7 @@ void udp_server_endpoint_impl::start() { void udp_server_endpoint_impl::stop() { server_endpoint_impl::stop(); + is_stopped_ = true; { diff --git a/implementation/routing/include/routing_manager_base.hpp b/implementation/routing/include/routing_manager_base.hpp index 9d6fbd03c..066aa18e9 100644 --- a/implementation/routing/include/routing_manager_base.hpp +++ b/implementation/routing/include/routing_manager_base.hpp @@ -147,6 +147,8 @@ class routing_manager_base : public routing_manager, virtual routing_state_e get_routing_state(); + virtual bool is_suspended() const; + virtual void register_client_error_handler(client_t _client, const std::shared_ptr &_endpoint) = 0; @@ -259,6 +261,7 @@ class routing_manager_base : public routing_manager, service_t _service, instance_t _instance, eventgroup_t _eventgroup); void add_known_client(client_t _client, const std::string &_client_host); + void remove_known_client(client_t _client); #ifdef VSOMEIP_ENABLE_COMPAT void set_incoming_subscription_state(client_t _client, service_t _service, instance_t _instance, @@ -338,6 +341,7 @@ class routing_manager_base : public routing_manager, } }; std::set pending_subscriptions_; + std::mutex pending_subscription_mutex_; services_t services_remote_; mutable std::mutex services_remote_mutex_; @@ -350,8 +354,7 @@ class routing_manager_base : public routing_manager, mutable std::mutex env_mutex_; std::string env_; - std::mutex routing_state_mutex_; - routing_state_e routing_state_; + std::atomic routing_state_; #ifdef USE_DLT std::shared_ptr tc_; diff --git a/implementation/routing/include/routing_manager_client.hpp b/implementation/routing/include/routing_manager_client.hpp index 28cf11ddb..5c19d3819 100644 --- a/implementation/routing/include/routing_manager_client.hpp +++ b/implementation/routing/include/routing_manager_client.hpp @@ -237,11 +237,17 @@ class routing_manager_client std::atomic_bool is_started_; std::atomic state_; + mutable std::mutex sender_mutex_; std::shared_ptr sender_; // --> stub + + mutable std::mutex receiver_mutex_; std::shared_ptr receiver_; // --> from everybody + std::mutex pending_offers_mutex_; std::set pending_offers_; + std::mutex requests_mutex_; std::set requests_; + std::mutex requests_to_debounce_mutex_; std::set requests_to_debounce_; struct event_data_t { @@ -262,22 +268,20 @@ class routing_manager_client _other.is_provided_, _other.is_cyclic_, _other.eventgroups_); } }; + std::mutex pending_event_registrations_mutex_; std::set pending_event_registrations_; - std::map> pending_incoming_subscriptions_; std::recursive_mutex incoming_subscriptions_mutex_; + std::map> pending_incoming_subscriptions_; - std::mutex state_mutex_; - std::mutex routing_stop_mutex_; + std::mutex state_condition_mutex_; std::condition_variable state_condition_; - std::map > > remote_subscriber_count_; std::mutex remote_subscriber_count_mutex_; + std::map>> + remote_subscriber_count_; - mutable std::mutex sender_mutex_; - mutable std::mutex receiver_mutex_; - + std::mutex register_application_timer_mutex_; boost::asio::steady_timer register_application_timer_; std::mutex request_timer_mutex_; diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp index b62cc4bc6..598b2e21f 100644 --- a/implementation/routing/include/routing_manager_impl.hpp +++ b/implementation/routing/include/routing_manager_impl.hpp @@ -385,6 +385,8 @@ class routing_manager_impl: public routing_manager_base, void start_ip_routing(); + inline bool is_external_routing_ready() const; + void add_requested_service(client_t _client, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor); @@ -500,9 +502,9 @@ class routing_manager_impl: public routing_manager_base, std::mutex version_log_timer_mutex_; boost::asio::steady_timer version_log_timer_; - bool if_state_running_; - bool sd_route_set_; - bool routing_running_; + std::atomic_bool if_state_running_; + std::atomic_bool sd_route_set_; + std::atomic_bool routing_running_; std::mutex pending_sd_offers_mutex_; std::vector> pending_sd_offers_; #if defined(__linux__) || defined(ANDROID) @@ -518,8 +520,6 @@ class routing_manager_impl: public routing_manager_base, std::tuple>> pending_offers_; - std::mutex pending_subscription_mutex_; - std::mutex remote_subscription_state_mutex_; std::map, subscription_state_e> remote_subscription_state_; @@ -543,6 +543,7 @@ class routing_manager_impl: public routing_manager_base, pending_remote_offer_id_t pending_remote_offer_id_; std::map> pending_remote_offers_; + std::mutex last_resume_mutex_; std::chrono::steady_clock::time_point last_resume_; std::mutex offer_serialization_mutex_; diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp index 229d7a223..7963a2aec 100644 --- a/implementation/routing/src/routing_manager_base.cpp +++ b/implementation/routing/src/routing_manager_base.cpp @@ -787,6 +787,11 @@ void routing_manager_base::add_known_client(client_t _client, const std::string known_clients_[_client] = _client_host; } +void routing_manager_base::remove_known_client(client_t _client) { + std::scoped_lock its_lock(known_clients_mutex_); + known_clients_.erase(_client); +} + void routing_manager_base::subscribe(client_t _client, const vsomeip_sec_client_t *_sec_client, service_t _service, instance_t _instance, @@ -1205,6 +1210,7 @@ void routing_manager_base::remove_local(client_t _client, std::get<1>(its_subscription), std::get<2>(its_subscription), ANY_EVENT); } ep_mgr_->remove_local(_client); + remove_known_client(_client); { std::lock_guard its_lock(local_services_mutex_); // Finally remove all services that are implemented by the client. @@ -1534,6 +1540,7 @@ void routing_manager_base::put_deserializer( void routing_manager_base::send_pending_subscriptions(service_t _service, instance_t _instance, major_version_t _major) { + std::scoped_lock its_lock(pending_subscription_mutex_); for (auto &ps : pending_subscriptions_) { if (ps.service_ == _service && ps.instance_ == _instance && ps.major_ == _major) { @@ -1545,6 +1552,7 @@ void routing_manager_base::send_pending_subscriptions(service_t _service, void routing_manager_base::remove_pending_subscription(service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event) { + std::scoped_lock its_lock(pending_subscription_mutex_); if (_eventgroup == 0xFFFF) { for (auto it = pending_subscriptions_.begin(); it != pending_subscriptions_.end();) { @@ -1661,6 +1669,11 @@ routing_manager_base::get_routing_state() { return routing_state_; } +bool routing_manager_base::is_suspended() const { + return routing_state_ == routing_state_e::RS_SUSPENDED + || routing_state_ == routing_state_e::RS_DELAYED_RESUME; +} + #ifdef VSOMEIP_ENABLE_COMPAT void routing_manager_base::set_incoming_subscription_state(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event, subscription_state_e _state) { diff --git a/implementation/routing/src/routing_manager_client.cpp b/implementation/routing/src/routing_manager_client.cpp index b56aa62c9..5886251ba 100644 --- a/implementation/routing/src/routing_manager_client.cpp +++ b/implementation/routing/src/routing_manager_client.cpp @@ -168,14 +168,14 @@ void routing_manager_client::start() { } void routing_manager_client::stop() { - std::scoped_lock its_routing_lock {routing_stop_mutex_}; - std::unique_lock its_lock(state_mutex_); if (state_ == inner_state_type_e::ST_REGISTERING) { + std::scoped_lock its_register_application_lock {register_application_timer_mutex_}; register_application_timer_.cancel(); } const std::chrono::milliseconds its_timeout(configuration_->get_shutdown_timeout()); while (state_ == inner_state_type_e::ST_REGISTERING) { + std::unique_lock its_lock(state_condition_mutex_); std::cv_status status = state_condition_.wait_for(its_lock, its_timeout); if (status == std::cv_status::timeout) { VSOMEIP_WARNING << std::hex << std::setfill('0') << std::setw(4) << get_client() @@ -188,6 +188,7 @@ void routing_manager_client::stop() { deregister_application(); // Waiting de-register acknowledge to synchronize shutdown while (state_ == inner_state_type_e::ST_REGISTERED) { + std::unique_lock its_lock(state_condition_mutex_); std::cv_status status = state_condition_.wait_for(its_lock, its_timeout); if (status == std::cv_status::timeout) { VSOMEIP_WARNING << std::hex << std::setfill('0') << std::setw(4) << get_client() @@ -197,7 +198,6 @@ void routing_manager_client::stop() { } } is_started_ = false; - its_lock.unlock(); #if defined(__linux__) || defined(ANDROID) if (local_link_connector_) @@ -205,7 +205,7 @@ void routing_manager_client::stop() { #endif { - std::lock_guard its_lock(request_timer_mutex_); + std::scoped_lock its_lock(request_timer_mutex_); request_debounce_timer_.cancel(); } @@ -258,30 +258,35 @@ routing_manager_client::on_net_state_change( << _name << " " << std::boolalpha << _is_available; - std::scoped_lock its_routing_lock {routing_stop_mutex_}; if (_is_interface) { if (_is_available) { if (!is_local_link_available_) { is_local_link_available_ = true; + bool is_receiver {false}; { - std::scoped_lock its_receiver_sender_lock {receiver_mutex_, sender_mutex_}; + std::scoped_lock its_lock {receiver_mutex_}; if (!receiver_) receiver_ = ep_mgr_->create_local_server(shared_from_this()); if (receiver_) { receiver_->start(); is_started_ = true; - if (!sender_) - sender_ = ep_mgr_->create_local(VSOMEIP_ROUTING_CLIENT); - if (sender_) { - host_->set_sec_client_port(sender_->get_local_port()); - sender_->start(); - } + is_receiver = true; } } + if (is_receiver) { + std::scoped_lock its_lock {sender_mutex_}; + if (!sender_) + sender_ = ep_mgr_->create_local(VSOMEIP_ROUTING_CLIENT); + + if (sender_) { + host_->set_sec_client_port(sender_->get_local_port()); + sender_->start(); + } + } } } else { if (is_local_link_available_) { @@ -290,9 +295,11 @@ routing_manager_client::on_net_state_change( state_ = inner_state_type_e::ST_DEREGISTERED; { - std::scoped_lock its_sender_lock(sender_mutex_); + std::unique_lock its_sender_lock(sender_mutex_); if (sender_) { + its_sender_lock.unlock(); on_disconnect(sender_); + its_sender_lock.lock(); host_->set_sec_client_port(VSOMEIP_SEC_PORT_UNSET); sender_->stop(); } @@ -303,7 +310,7 @@ routing_manager_client::on_net_state_change( receiver_->stop(); } { - std::lock_guard its_lock(local_services_mutex_); + std::scoped_lock its_lock(local_services_mutex_); local_services_.clear(); } @@ -320,7 +327,7 @@ std::shared_ptr routing_manager_client::get_configuration() const std::string routing_manager_client::get_env(client_t _client) const { - std::lock_guard its_known_clients_lock(known_clients_mutex_); + std::scoped_lock its_known_clients_lock(known_clients_mutex_); return get_env_unlocked(_client); } @@ -342,11 +349,11 @@ bool routing_manager_client::offer_service(client_t _client, << "routing_manager_base::offer_service returned false"; } { - std::lock_guard its_lock(state_mutex_); if (state_ == inner_state_type_e::ST_REGISTERED) { send_offer_service(_client, _service, _instance, _major, _minor); } protocol::service offer(_service, _instance, _major, _minor ); + std::scoped_lock its_lock(pending_offers_mutex_); pending_offers_.insert(offer); } return true; @@ -389,7 +396,7 @@ void routing_manager_client::stop_offer_service(client_t _client, { // Hold the mutex to ensure no placeholder event is created in between. - std::lock_guard its_lock(stop_mutex_); + std::scoped_lock its_lock(stop_mutex_); routing_manager_base::stop_offer_service(_client, _service, _instance, _major, _minor); clear_remote_subscriber_count(_service, _instance); @@ -400,7 +407,6 @@ void routing_manager_client::stop_offer_service(client_t _client, } { - std::lock_guard its_lock(state_mutex_); if (state_ == inner_state_type_e::ST_REGISTERED) { protocol::stop_offer_service_command its_command; @@ -424,6 +430,7 @@ void routing_manager_client::stop_offer_service(client_t _client, << std::dec << static_cast(its_error) << ")"; } } + std::scoped_lock its_lock(pending_offers_mutex_); auto it = pending_offers_.begin(); while (it != pending_offers_.end()) { if (it->service_ == _service @@ -442,7 +449,6 @@ void routing_manager_client::request_service(client_t _client, routing_manager_base::request_service(_client, _service, _instance, _major, _minor); { - std::lock_guard its_lock(state_mutex_); size_t request_debouncing_time = configuration_->get_request_debouncing(host_->get_name()); protocol::service request = { _service, _instance, _major, _minor }; if (!request_debouncing_time) { @@ -451,10 +457,16 @@ void routing_manager_client::request_service(client_t _client, requests.insert(request); send_request_services(requests); } - requests_.insert(request); + { + std::scoped_lock its_lock(requests_mutex_); + requests_.insert(request); + } } else { - requests_to_debounce_.insert(request); - std::lock_guard its_lock(request_timer_mutex_); + { + std::scoped_lock its_lock(requests_to_debounce_mutex_); + requests_to_debounce_.insert(request); + } + std::scoped_lock its_lock(request_timer_mutex_); if (!request_debounce_timer_running_) { request_debounce_timer_running_ = true; request_debounce_timer_.expires_from_now(std::chrono::milliseconds(request_debouncing_time)); @@ -472,25 +484,28 @@ void routing_manager_client::release_service(client_t _client, service_t _service, instance_t _instance) { routing_manager_base::release_service(_client, _service, _instance); { - std::lock_guard its_lock(state_mutex_); remove_pending_subscription(_service, _instance, 0xFFFF, ANY_EVENT); bool pending(false); - auto it = requests_to_debounce_.begin(); - while (it != requests_to_debounce_.end()) { - if (it->service_ == _service - && it->instance_ == _instance) { - pending = true; + { + std::scoped_lock its_lock(requests_to_debounce_mutex_); + auto it = requests_to_debounce_.begin(); + while (it != requests_to_debounce_.end()) { + if (it->service_ == _service + && it->instance_ == _instance) { + pending = true; + } + it++; } - it++; + if (it != requests_to_debounce_.end()) requests_to_debounce_.erase(it); } - if (it != requests_to_debounce_.end()) requests_to_debounce_.erase(it); if (!pending && state_ == inner_state_type_e::ST_REGISTERED) { send_release_service(_client, _service, _instance); } { + std::scoped_lock its_lock(requests_mutex_); auto it = requests_.begin(); while (it != requests_.end()) { if (it->service_ == _service @@ -531,7 +546,7 @@ void routing_manager_client::register_event(client_t _client, }; bool is_first(false); { - std::lock_guard its_lock(state_mutex_); + std::scoped_lock its_lock(pending_event_registrations_mutex_); is_first = pending_event_registrations_.find(registration) == pending_event_registrations_.end(); #ifndef VSOMEIP_ENABLE_COMPAT @@ -574,7 +589,6 @@ void routing_manager_client::register_event(client_t _client, _is_provided); } { - std::lock_guard its_lock(state_mutex_); if (state_ == inner_state_type_e::ST_REGISTERED && is_first) { send_register_event(get_client(), _service, _instance, _notifier, _eventgroups, _type, _reliability, @@ -591,7 +605,6 @@ void routing_manager_client::unregister_event(client_t _client, _notifier, _is_provided); { - std::lock_guard its_lock(state_mutex_); if (state_ == inner_state_type_e::ST_REGISTERED) { protocol::unregister_event_command its_command; @@ -612,6 +625,7 @@ void routing_manager_client::unregister_event(client_t _client, } } + std::scoped_lock its_lock(pending_event_registrations_mutex_); for (auto iter = pending_event_registrations_.begin(); iter != pending_event_registrations_.end(); ) { if (iter->service_ == _service @@ -644,7 +658,6 @@ void routing_manager_client::subscribe( (void)_client; - std::lock_guard its_lock(state_mutex_); if (state_ == inner_state_type_e::ST_REGISTERED && is_available(_service, _instance, _major)) { send_subscribe(get_client(), _service, _instance, _eventgroup, _major, _event, _filter ); } @@ -654,6 +667,7 @@ void routing_manager_client::subscribe( _event, _filter, *_sec_client }; + std::scoped_lock its_lock(pending_subscription_mutex_); pending_subscriptions_.insert(subscription); } @@ -807,7 +821,6 @@ void routing_manager_client::unsubscribe(client_t _client, (void)_sec_client; { - std::lock_guard its_lock(state_mutex_); remove_pending_subscription(_service, _instance, _eventgroup, _event); if (state_ == inner_state_type_e::ST_REGISTERED) { @@ -858,7 +871,6 @@ bool routing_manager_client::send(client_t _client, const byte_t *_data, bool is_sent(false); bool has_remote_subscribers(false); { - std::lock_guard its_lock(state_mutex_); if (state_ != inner_state_type_e::ST_REGISTERED) { return false; } @@ -1271,7 +1283,6 @@ void routing_manager_client::on_message( case protocol::id_e::ASSIGN_CLIENT_ACK_ID: { - std::scoped_lock its_routing_lock {routing_stop_mutex_}; client_t its_assigned_client(VSOMEIP_CLIENT_UNSET); protocol::assign_client_ack_command its_ack_command; its_ack_command.deserialize(its_buffer, its_error); @@ -1879,14 +1890,17 @@ void routing_manager_client::on_routing_info( } #endif { - std::lock_guard its_lock(state_mutex_); if (state_ == inner_state_type_e::ST_REGISTERING) { - boost::system::error_code ec; - register_application_timer_.cancel(ec); + { + std::scoped_lock its_register_application_lock {register_application_timer_mutex_}; + boost::system::error_code ec; + register_application_timer_.cancel(ec); + } send_registered_ack(); send_pending_commands(); state_ = inner_state_type_e::ST_REGISTERED; // Notify stop() call about clean deregistration + std::scoped_lock its_lock(state_condition_mutex_); state_condition_.notify_one(); } } @@ -1902,7 +1916,7 @@ void routing_manager_client::on_routing_info( case protocol::routing_info_entry_type_e::RIE_DELETE_CLIENT: { { - std::lock_guard its_lock(known_clients_mutex_); + std::scoped_lock its_lock(known_clients_mutex_); known_clients_.erase(its_client); } if (its_client == get_client()) { @@ -1914,9 +1928,9 @@ void routing_manager_client::on_routing_info( host_->on_state(static_cast(inner_state_type_e::ST_DEREGISTERED)); { - std::lock_guard its_lock(state_mutex_); state_ = inner_state_type_e::ST_DEREGISTERED; // Notify stop() call about clean deregistration + std::scoped_lock its_lock(state_condition_mutex_); state_condition_.notify_one(); } } else if (its_client != VSOMEIP_ROUTING_CLIENT) { @@ -1936,7 +1950,7 @@ void routing_manager_client::on_routing_info( // Add yet unknown clients that offer services. Otherwise, // the service cannot be used. The entry will be overwritten, // when the offering clients connects. - std::lock_guard its_lock(known_clients_mutex_); + std::scoped_lock its_lock(known_clients_mutex_); if (known_clients_.find(its_client) == known_clients_.end()) { known_clients_[its_client] = ""; } @@ -1950,7 +1964,7 @@ void routing_manager_client::on_routing_info( const auto its_minor(s.minor_); { - std::lock_guard its_lock(local_services_mutex_); + std::scoped_lock its_lock(local_services_mutex_); // Check whether the service instance is already known. If yes, // continue with the next service within the routing info. @@ -1964,7 +1978,6 @@ void routing_manager_client::on_routing_info( = std::make_tuple(its_major, its_minor, its_client); } { - std::lock_guard its_lock(state_mutex_); send_pending_subscriptions(its_service, its_instance, its_major); } host_->on_availability(its_service, its_instance, @@ -1988,7 +2001,7 @@ void routing_manager_client::on_routing_info( const auto its_minor(s.minor_); { - std::lock_guard its_lock(local_services_mutex_); + std::scoped_lock its_lock(local_services_mutex_); auto found_service = local_services_.find(its_service); if (found_service != local_services_.end()) { found_service->second.erase(its_instance); @@ -2033,11 +2046,11 @@ void routing_manager_client::on_routing_info( vsomeip_sec_client_t sec_client_; std::string env_; }; - std::lock_guard its_lock(incoming_subscriptions_mutex_); + std::scoped_lock its_lock(incoming_subscriptions_mutex_); std::forward_list subscription_actions; if (pending_incoming_subscriptions_.size()) { { - std::lock_guard its_lock(known_clients_mutex_); + std::scoped_lock its_known_clients_lock(known_clients_mutex_); for (const auto &k : known_clients_) { auto its_client = pending_incoming_subscriptions_.find(k.first); if (its_client != pending_incoming_subscriptions_.end()) { @@ -2083,7 +2096,7 @@ void routing_manager_client::on_routing_info( si.instance_id_, si.eventgroup_id_, si.event_); #endif { - std::lock_guard its_lock2(incoming_subscriptions_mutex_); + std::scoped_lock its_lk(incoming_subscriptions_mutex_); pending_incoming_subscriptions_.erase(si.client_id_); } }); @@ -2112,9 +2125,9 @@ void routing_manager_client::reconnect(const std::map &_c host_->on_state(static_cast(inner_state_type_e::ST_DEREGISTERED)); { - std::lock_guard its_lock(state_mutex_); state_ = inner_state_type_e::ST_DEREGISTERED; // Notify stop() call about clean deregistration + std::scoped_lock its_lock(state_condition_mutex_); state_condition_.notify_one(); } @@ -2169,7 +2182,6 @@ void routing_manager_client::assign_client() { return; } - std::lock_guard its_state_lock(state_mutex_); if (is_connected_) { std::scoped_lock its_sender_lock {sender_mutex_}; if (sender_) { @@ -2181,15 +2193,18 @@ void routing_manager_client::assign_client() { state_ = inner_state_type_e::ST_ASSIGNING; sender_->send(&its_buffer[0], static_cast(its_buffer.size())); - - boost::system::error_code ec; - register_application_timer_.cancel(ec); - register_application_timer_.expires_from_now(std::chrono::milliseconds(3000)); - register_application_timer_.async_wait( + + { + std::scoped_lock its_register_application_lock {register_application_timer_mutex_}; + boost::system::error_code ec; + register_application_timer_.cancel(ec); + register_application_timer_.expires_from_now(std::chrono::milliseconds(3000)); + register_application_timer_.async_wait( std::bind( &routing_manager_client::assign_client_timeout_cbk, std::dynamic_pointer_cast(shared_from_this()), std::placeholders::_1)); + } } else { VSOMEIP_WARNING << __func__ << ": (" << std::hex << std::setfill('0') << std::setw(4) << get_client() << ") sender not initialized. Ignoring client assignment"; @@ -2235,13 +2250,15 @@ void routing_manager_client::register_application() { state_ = inner_state_type_e::ST_REGISTERING; sender_->send(&its_buffer[0], uint32_t(its_buffer.size())); - register_application_timer_.cancel(); - register_application_timer_.expires_from_now(std::chrono::milliseconds(3000)); - register_application_timer_.async_wait( - std::bind( - &routing_manager_client::register_application_timeout_cbk, - std::dynamic_pointer_cast(shared_from_this()), - std::placeholders::_1)); + { + std::scoped_lock its_register_application_lock {register_application_timer_mutex_}; + register_application_timer_.cancel(); + register_application_timer_.expires_from_now(std::chrono::milliseconds(3000)); + register_application_timer_.async_wait(std::bind( + &routing_manager_client::register_application_timeout_cbk, + std::dynamic_pointer_cast(shared_from_this()), + std::placeholders::_1)); + } // Send a `config_command` to share our hostname with the other application. protocol::config_command its_command_config; @@ -2360,8 +2377,9 @@ void routing_manager_client::send_pending_event_registrations(client_t _client) protocol::register_events_command its_command; its_command.set_client(_client); - std::set::iterator it = pending_event_registrations_.begin(); + std::scoped_lock its_lock(pending_event_registrations_mutex_); + auto it = pending_event_registrations_.begin(); while(it != pending_event_registrations_.end()) { for(; it!=pending_event_registrations_.end(); it++) { @@ -2512,7 +2530,7 @@ void routing_manager_client::on_stop_offer_service(service_t _service, (void) _minor; std::map > events; { - std::lock_guard its_lock(events_mutex_); + std::scoped_lock its_lock(events_mutex_); auto its_events_service = events_.find(_service); if (its_events_service != events_.end()) { auto its_events_instance = its_events_service->second.find(_instance); @@ -2528,10 +2546,14 @@ void routing_manager_client::on_stop_offer_service(service_t _service, } void routing_manager_client::send_pending_commands() { - for (auto &po : pending_offers_) - send_offer_service(get_client(), + { + std::scoped_lock its_lock(pending_offers_mutex_); + for (auto &po : pending_offers_) { + send_offer_service(get_client(), po.service_, po.instance_, po.major_, po.minor_); + } + } send_pending_event_registrations(get_client()); @@ -2599,7 +2621,7 @@ void routing_manager_client::notify_remote_initially(service_t _service, instanc uint32_t routing_manager_client::get_remote_subscriber_count(service_t _service, instance_t _instance, eventgroup_t _eventgroup, bool _increment) { - std::lock_guard its_lock(remote_subscriber_count_mutex_); + std::scoped_lock its_lock(remote_subscriber_count_mutex_); uint32_t count (0); bool found(false); auto found_service = remote_subscriber_count_.find(_service); @@ -2631,7 +2653,7 @@ uint32_t routing_manager_client::get_remote_subscriber_count(service_t _service, void routing_manager_client::clear_remote_subscriber_count( service_t _service, instance_t _instance) { - std::lock_guard its_lock(remote_subscriber_count_mutex_); + std::scoped_lock its_lock(remote_subscriber_count_mutex_); auto found_service = remote_subscriber_count_.find(_service); if (found_service != remote_subscriber_count_.end()) { if (found_service->second.erase(_instance)) { @@ -2649,7 +2671,6 @@ routing_manager_client::assign_client_timeout_cbk( if (!_error) { bool register_again(false); { - std::lock_guard its_lock(state_mutex_); if (state_ != inner_state_type_e::ST_REGISTERED) { state_ = inner_state_type_e::ST_DEREGISTERED; register_again = true; @@ -2680,7 +2701,6 @@ void routing_manager_client::register_application_timeout_cbk( bool register_again(false); { - std::lock_guard its_lock(state_mutex_); if (!_error && state_ != inner_state_type_e::ST_REGISTERED) { state_ = inner_state_type_e::ST_DEREGISTERED; register_again = true; @@ -2720,7 +2740,7 @@ void routing_manager_client::send_registered_ack() { bool routing_manager_client::is_client_known(client_t _client) { - std::lock_guard its_lock(known_clients_mutex_); + std::scoped_lock its_lock(known_clients_mutex_); return (known_clients_.find(_client) != known_clients_.end()); } @@ -2729,7 +2749,7 @@ bool routing_manager_client::create_placeholder_event_and_subscribe( event_t _notifier, const std::shared_ptr &_filter, client_t _client) { - std::lock_guard its_lock(stop_mutex_); + std::scoped_lock its_lock(stop_mutex_); bool is_inserted(false); @@ -2758,32 +2778,30 @@ bool routing_manager_client::create_placeholder_event_and_subscribe( void routing_manager_client::request_debounce_timeout_cbk( boost::system::error_code const &_error) { - std::lock_guard its_lock(state_mutex_); if (!_error) { + std::scoped_lock its_lock(requests_to_debounce_mutex_, requests_mutex_); if (requests_to_debounce_.size()) { if (state_ == inner_state_type_e::ST_REGISTERED) { send_request_services(requests_to_debounce_); requests_.insert(requests_to_debounce_.begin(), - requests_to_debounce_.end()); + requests_to_debounce_.end()); requests_to_debounce_.clear(); } else { - { - std::lock_guard its_lock(request_timer_mutex_); - request_debounce_timer_running_ = true; - request_debounce_timer_.expires_from_now(std::chrono::milliseconds( - configuration_->get_request_debouncing(host_->get_name()))); - request_debounce_timer_.async_wait( - std::bind( - &routing_manager_client::request_debounce_timeout_cbk, - std::dynamic_pointer_cast(shared_from_this()), - std::placeholders::_1)); - return; - } + std::scoped_lock its_request_timer_lock(request_timer_mutex_); + request_debounce_timer_running_ = true; + request_debounce_timer_.expires_from_now(std::chrono::milliseconds( + configuration_->get_request_debouncing(host_->get_name()))); + request_debounce_timer_.async_wait( + std::bind( + &routing_manager_client::request_debounce_timeout_cbk, + std::dynamic_pointer_cast(shared_from_this()), + std::placeholders::_1)); + return; } } } { - std::lock_guard its_lock(request_timer_mutex_); + std::scoped_lock its_request_timer_lock(request_timer_mutex_); request_debounce_timer_running_ = false; } } @@ -2798,20 +2816,19 @@ void routing_manager_client::register_client_error_handler(client_t _client, void routing_manager_client::handle_client_error(client_t _client) { if (_client != VSOMEIP_ROUTING_CLIENT) { - VSOMEIP_INFO << "rmc::handle_client_error" << "Client 0x" << std::hex << std::setw(4) + VSOMEIP_INFO << "rmc::handle_client_error:" << " Client 0x" << std::hex << std::setw(4) << std::setfill('0') << get_client() << " handles a client error(" << std::hex << std::setw(4) << std::setfill('0') << _client << ") not reconnecting"; remove_local(_client, true); } else { - bool should_reconnect(true); - { - std::unique_lock its_lock(state_mutex_); - should_reconnect = is_started_; - } - if (should_reconnect) { + VSOMEIP_INFO << "rmc::handle_client_error:" << " Client 0x" << std::hex << std::setw(4) + << std::setfill('0') << get_client() << " handles a client error(" << std::hex + << std::setw(4) << std::setfill('0') << _client + << ") with host, will reconnect"; + if (is_started_) { std::map its_known_clients; { - std::lock_guard its_lock(known_clients_mutex_); + std::scoped_lock its_lock(known_clients_mutex_); its_known_clients = known_clients_; } reconnect(its_known_clients); @@ -2867,7 +2884,7 @@ void routing_manager_client::send_unsubscribe_ack( } void routing_manager_client::resend_provided_event_registrations() { - std::lock_guard its_lock(state_mutex_); + std::scoped_lock its_lock(pending_event_registrations_mutex_); for (const event_data_t& ed : pending_event_registrations_) { if (ed.is_provided_) { send_register_event(get_client(), ed.service_, ed.instance_, @@ -2970,26 +2987,34 @@ void routing_manager_client::on_update_security_credentials( void routing_manager_client::on_client_assign_ack(const client_t &_client) { - std::lock_guard its_lock(state_mutex_); if (state_ == inner_state_type_e::ST_ASSIGNING) { if (_client != VSOMEIP_CLIENT_UNSET) { state_ = inner_state_type_e::ST_ASSIGNED; - boost::system::error_code ec; - register_application_timer_.cancel(ec); + { + std::scoped_lock its_register_application_lock {register_application_timer_mutex_}; + boost::system::error_code ec; + register_application_timer_.cancel(ec); + } host_->set_client(_client); if (is_started_) { init_receiver(); - std::scoped_lock r_lock(receiver_mutex_); - if (receiver_) { - receiver_->start(); - VSOMEIP_INFO << "Client " - << std::hex << std::setfill('0') << std::setw(4) << get_client() + bool is_receiver {false}; + { + std::scoped_lock r_lock(receiver_mutex_); + if (receiver_) { + receiver_->start(); + VSOMEIP_INFO << "Client " + << std::hex << std::setw(4) << std::setfill('0') << get_client() << " (" << host_->get_name() << ") successfully connected to routing ~> registering.."; - register_application(); - } else { + register_application(); + + is_receiver = true; + } + } + if (!is_receiver) { VSOMEIP_WARNING << __func__ << ": (" << host_->get_name() << ":" << std::hex << std::setfill('0') << std::setw(4) << _client << ") Receiver not started. Restarting"; @@ -3024,7 +3049,7 @@ void routing_manager_client::on_suspend() { << std::hex << std::setfill('0') << std::setw(4) << host_->get_client(); - std::lock_guard its_lock(remote_subscriber_count_mutex_); + std::scoped_lock its_lock(remote_subscriber_count_mutex_); // Unsubscribe everything that is left over. for (const auto &s : remote_subscriber_count_) { diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index cea58ace1..0a6c95f70 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -235,10 +235,7 @@ void routing_manager_impl::start() { this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); netlink_connector_->start(); #else - { - std::lock_guard its_lock(pending_sd_offers_mutex_); - start_ip_routing(); - } + start_ip_routing(); #endif if (stub_) @@ -448,11 +445,13 @@ bool routing_manager_impl::offer_service(client_t _client, } { - std::lock_guard its_lock(pending_sd_offers_mutex_); - if (if_state_running_) { + if (is_external_routing_ready()) { init_service_info(_service, _instance, true); } else { + std::scoped_lock its_lock(pending_sd_offers_mutex_); pending_sd_offers_.push_back(std::make_pair(_service, _instance)); + VSOMEIP_INFO << "rmi::" << __func__ << " added service: " << std::hex << _service + << " to pending_sd_offers_.size = " << pending_sd_offers_.size(); } } @@ -464,22 +463,24 @@ bool routing_manager_impl::offer_service(client_t _client, } { - std::lock_guard ist_lock(pending_subscription_mutex_); std::set its_already_subscribed_events; - for (auto &ps : pending_subscriptions_) { - if (ps.service_ == _service - && ps.instance_ == _instance - && ps.major_ == _major) { - insert_subscription(ps.service_, ps.instance_, - ps.eventgroup_, ps.event_, nullptr, - get_client(), &its_already_subscribed_events); + { + std::scoped_lock ist_lock(pending_subscription_mutex_); + for (auto &ps : pending_subscriptions_) { + if (ps.service_ == _service + && ps.instance_ == _instance + && ps.major_ == _major) { + insert_subscription(ps.service_, ps.instance_, + ps.eventgroup_, ps.event_, nullptr, + get_client(), &its_already_subscribed_events); #if 0 - VSOMEIP_ERROR << __func__ - << ": event=" - << std::hex << ps.service_ << "." - << std::hex << ps.instance_ << "." - << std::hex << ps.event_; + VSOMEIP_ERROR << __func__ + << ": event=" + << std::hex << ps.service_ << "." + << std::hex << ps.instance_ << "." + << std::hex << ps.event_; #endif + } } } @@ -543,10 +544,13 @@ void routing_manager_impl::stop_offer_service(client_t _client, } if (is_local) { { - std::lock_guard its_lock(pending_sd_offers_mutex_); + std::scoped_lock its_lock(pending_sd_offers_mutex_); for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) { if (it->first == _service && it->second == _instance) { it = pending_sd_offers_.erase(it); + VSOMEIP_INFO << "rmi::" << __func__ << " removed service: " << std::hex + << _service + << " to pending_sd_offers_.size = " << pending_sd_offers_.size(); break; } else { ++it; @@ -642,7 +646,6 @@ void routing_manager_impl::release_service(client_t _client, service_t _service, << std::setw(4) << _instance << "]"; if (host_->get_client() == _client) { - std::lock_guard its_lock(pending_subscription_mutex_); remove_pending_subscription(_service, _instance, 0xFFFF, ANY_EVENT); } routing_manager_base::release_service(_client, _service, _instance); @@ -789,13 +792,13 @@ void routing_manager_impl::subscribe( } } if (subscriber_is_rm_host) { - std::lock_guard ist_lock(pending_subscription_mutex_); subscription_data_t subscription = { _service, _instance, _eventgroup, _major, _event, _filter, *_sec_client }; + std::scoped_lock ist_lock(pending_subscription_mutex_); pending_subscriptions_.insert(subscription); } } else { @@ -840,7 +843,6 @@ void routing_manager_impl::unsubscribe( [](const bool _subscription_accepted){ (void)_subscription_accepted; }); if (0 == find_local_client(_service, _instance)) { if (get_client() == _client) { - std::lock_guard ist_lock(pending_subscription_mutex_); remove_pending_subscription(_service, _instance, _eventgroup, _event); } if (last_subscriber_removed) { @@ -860,7 +862,6 @@ void routing_manager_impl::unsubscribe( } } else { if (get_client() == _client) { - std::lock_guard ist_lock(pending_subscription_mutex_); remove_pending_subscription(_service, _instance, _eventgroup, _event); if (stub_) stub_->send_unsubscribe( @@ -1336,8 +1337,8 @@ void routing_manager_impl::on_availability(service_t _service, instance_t _insta // remote service if (VSOMEIP_ROUTING_CLIENT == its_local_client) { static const ttl_t configured_ttl(configuration_->get_sd_ttl()); - std::lock_guard its_subscribed_lock(discovery_->get_subscribed_mutex()); - std::lock_guard its_lock(pending_subscription_mutex_); + std::scoped_lock its_lock {discovery_->get_subscribed_mutex(), + pending_subscription_mutex_}; for (auto &ps : pending_subscriptions_) { if (ps.service_ == _service && ps.instance_ == _instance @@ -2377,8 +2378,7 @@ void routing_manager_impl::add_routing_info( const boost::asio::ip::address &_unreliable_address, uint16_t _unreliable_port) { - std::lock_guard its_lock(routing_state_mutex_); - if (routing_state_ == routing_state_e::RS_SUSPENDED) { + if (is_suspended()) { VSOMEIP_INFO << "rmi::" << __func__ << " We are suspended --> do nothing."; return; } @@ -3472,7 +3472,7 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const } std::stringstream its_last_resume; { - std::lock_guard its_lock(routing_state_mutex_); + std::scoped_lock its_lock(last_resume_mutex_); if (last_resume_ != std::chrono::steady_clock::time_point::min()) { its_last_resume << " | " << std::dec << std::chrono::duration_cast( @@ -3492,7 +3492,7 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const } { - std::lock_guard its_lock(version_log_timer_mutex_); + std::scoped_lock its_lock(version_log_timer_mutex_); version_log_timer_.expires_from_now(std::chrono::seconds(its_interval)); version_log_timer_.async_wait( std::bind(&routing_manager_impl::log_version_timer_cbk, @@ -3700,7 +3700,7 @@ void routing_manager_impl::handle_client_error(client_t _client) { if (stub_) stub_->update_registration(_client, registration_type_e::DEREGISTER_ON_ERROR, boost::asio::ip::address(), 0); - remove_local(_client, true); + std::forward_list> its_offers; { @@ -3773,15 +3773,11 @@ routing_state_e routing_manager_impl::get_routing_state() { } void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { - { - std::lock_guard its_lock(routing_state_mutex_); - if (routing_state_ == _routing_state) { - VSOMEIP_INFO << "rmi::" << __func__ << " No routing state change --> do nothing."; - return; - } - - routing_state_ = _routing_state; + if (routing_state_ == _routing_state) { + VSOMEIP_INFO << "rmi::" << __func__ << " No routing state change --> do nothing."; + return; } + routing_state_ = _routing_state; if (discovery_) { switch (_routing_state) { @@ -3874,9 +3870,6 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { } } - // suspend all endpoints - ep_mgr_->suspend(); - VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode done, diagnosis mode is " << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); @@ -3884,6 +3877,12 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { } case routing_state_e::RS_RESUMED: { + if (!is_external_routing_ready()) { + VSOMEIP_INFO << "rmi::" << __func__ << " Network not running, delaying the resume of routing manager"; + routing_state_ = routing_state_e::RS_DELAYED_RESUME; + return; + } + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode, diagnosis mode was " << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); @@ -3891,7 +3890,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { ep_mgr_->resume(); { - std::lock_guard its_lock(routing_state_mutex_); + std::scoped_lock its_lock(last_resume_mutex_); last_resume_ = std::chrono::steady_clock::now(); } @@ -3919,6 +3918,16 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { } } + { + std::scoped_lock its_lock(pending_sd_offers_mutex_); + // Trigger pending offers + for (const auto& [its_service, its_instance] : pending_sd_offers_) { + init_service_info(its_service, its_instance, true); + } + pending_sd_offers_.clear(); + VSOMEIP_INFO << "rmi::" << __func__ << ": clear pending_sd_offers_"; + } + VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode done, diagnosis mode was " << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); break; @@ -3971,6 +3980,9 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode done, diagnosis mode was " << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive."); break; + case routing_state_e::RS_DELAYED_RESUME: + // Do nothing + break; default: break; } @@ -3979,52 +3991,39 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { void routing_manager_impl::on_net_interface_or_route_state_changed( bool _is_interface, const std::string &_if, bool _available) { - std::lock_guard its_lock(pending_sd_offers_mutex_); auto log_change_message = [&_if, _available, _is_interface](bool _warning) { std::stringstream ss; ss << (_is_interface ? "Network interface" : "Route") << " \"" << _if - << "\" state changed: " << (_available ? "up" : "down"); + << "\" state changed: " << (_available ? "up" : "down"); if (_warning) { VSOMEIP_WARNING << ss.str(); } else { VSOMEIP_INFO << ss.str(); } }; + if (_is_interface) { - if (if_state_running_ - || (_available && !if_state_running_ && routing_running_)) { - log_change_message(true); - } else if (!if_state_running_) { - log_change_message(false); - } - if (_available && !if_state_running_) { - if_state_running_ = true; - if (!routing_running_) { - if(configuration_->is_sd_enabled()) { - if (sd_route_set_) { - start_ip_routing(); - } - } else { - // Static routing, don't wait for route! - start_ip_routing(); - } - } + if (_available != if_state_running_) { + log_change_message(_available); + } + if_state_running_ = _available; + // When the interface goes down the sd route is also lost + if (!if_state_running_) { + sd_route_set_ = false; } } else { - if (sd_route_set_ - || (_available && !sd_route_set_ && routing_running_)) { - log_change_message(true); - } else if (!sd_route_set_) { - log_change_message(false); - } - if (_available && !sd_route_set_) { - sd_route_set_ = true; - if (!routing_running_) { - if (if_state_running_) { - start_ip_routing(); - } - } + if (_available != sd_route_set_) { + log_change_message(_available); } + sd_route_set_ = _available; + } + + if (!routing_running_ && is_external_routing_ready()) { + start_ip_routing(); + } + + if (get_routing_state() == routing_state_e::RS_DELAYED_RESUME && is_external_routing_ready()) { + set_routing_state(routing_state_e::RS_RESUMED); } } @@ -4038,22 +4037,33 @@ void routing_manager_impl::start_ip_routing() { } if (discovery_) { - if (routing_state_ != routing_state_e::RS_SUSPENDED) { + if (!is_suspended()) { discovery_->start(); } } else { init_routing_info(); } - for (auto its_service : pending_sd_offers_) { - init_service_info(its_service.first, its_service.second, true); + { + std::scoped_lock its_lock(pending_sd_offers_mutex_); + for (auto its_service : pending_sd_offers_) { + init_service_info(its_service.first, its_service.second, true); + } + pending_sd_offers_.clear(); + VSOMEIP_INFO << "rmi::" << __func__ << ": clear pending_sd_offers_"; } - pending_sd_offers_.clear(); - + routing_running_ = true; VSOMEIP_INFO << VSOMEIP_ROUTING_READY_MESSAGE; } +inline bool routing_manager_impl::is_external_routing_ready() const { + return if_state_running_ + && (!configuration_->is_sd_enabled() + || (configuration_->is_sd_enabled() && sd_route_set_)); +} + + bool routing_manager_impl::is_available(service_t _service, instance_t _instance, major_version_t _major) const { return routing_manager_base::is_available(_service, _instance, _major); diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index 91e7274c5..72a2517a4 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -684,6 +684,10 @@ service_discovery_impl::is_reboot( && ((_is_multicast && !std::get<2>(its_received->second)) || (!_is_multicast && !std::get<3>(its_received->second)))) { result = true; + VSOMEIP_INFO << "sdi::" << __func__ << " Reboot detected, rbt flag: " << _reboot_flag + << " Sender: " << _sender.to_string() << " is multicast: " << _is_multicast + << " multicast old reboot flag: " << std::get<2>(its_received->second) + << " unicast old reboot flag " << std::get<3>(its_received->second); } else { session_t its_old_session; bool its_old_reboot_flag; @@ -699,6 +703,13 @@ service_discovery_impl::is_reboot( if (its_old_reboot_flag && _reboot_flag && its_old_session >= _session) { result = true; + VSOMEIP_INFO << "sdi::" << __func__ + << " Reboot detected, rbt flag: " << _reboot_flag + << " Sender: " << _sender.to_string() + << " is multicast: " << _is_multicast + << " old reboot flag: " << its_old_reboot_flag + << " current session: " << _session + << " old session: " << its_old_session; } } diff --git a/interface/vsomeip/enumeration_types.hpp b/interface/vsomeip/enumeration_types.hpp index b56690de3..666a30ab6 100644 --- a/interface/vsomeip/enumeration_types.hpp +++ b/interface/vsomeip/enumeration_types.hpp @@ -52,6 +52,7 @@ enum class routing_state_e : uint8_t { RS_RESUMED = 0x02, RS_SHUTDOWN = 0x03, RS_DIAGNOSIS = 0x04, + RS_DELAYED_RESUME = 0x05, RS_UNKNOWN = 0xFF };