From 7719e0245f4f4a2f63c469aff1265152e6be548e Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Fri, 6 Dec 2024 13:12:14 +0100 Subject: [PATCH 1/2] service protocol v0 implementation removed (not the configuration) --- .../src/service/ecal_service_client_impl.cpp | 6 +- .../src/service/ecal_service_server_impl.cpp | 68 +- .../src/service/ecal_service_server_impl.h | 6 +- ecal/service/ecal_service/CMakeLists.txt | 6 - .../ecal_service/src/client_session.cpp | 15 +- .../src/client_session_impl_v0.cpp | 594 ------------------ .../ecal_service/src/client_session_impl_v0.h | 163 ----- ecal/service/ecal_service/src/protocol_v0.cpp | 141 ----- ecal/service/ecal_service/src/protocol_v0.h | 45 -- ecal/service/ecal_service/src/server_impl.cpp | 13 +- .../src/server_session_impl_v0.cpp | 270 -------- .../ecal_service/src/server_session_impl_v0.h | 107 ---- 12 files changed, 26 insertions(+), 1408 deletions(-) delete mode 100644 ecal/service/ecal_service/src/client_session_impl_v0.cpp delete mode 100644 ecal/service/ecal_service/src/client_session_impl_v0.h delete mode 100644 ecal/service/ecal_service/src/protocol_v0.cpp delete mode 100644 ecal/service/ecal_service/src/protocol_v0.h delete mode 100644 ecal/service/ecal_service/src/server_session_impl_v0.cpp delete mode 100644 ecal/service/ecal_service/src/server_session_impl_v0.h diff --git a/ecal/core/src/service/ecal_service_client_impl.cpp b/ecal/core/src/service/ecal_service_client_impl.cpp index bd601eba7a..5b4e7b457a 100644 --- a/ecal/core/src/service/ecal_service_client_impl.cpp +++ b/ecal/core/src/service/ecal_service_client_impl.cpp @@ -423,9 +423,9 @@ namespace eCAL // I have no idea why, but for some reason the event callbacks of the actual connetions are not even used. The connect / disconnect callbacks are executed whenever a new connection is found, and not when the client has actually connected or disconnected. I am preserving the previous behavior. }; - // Only connect via V0 protocol / V0 port, if V1 port is not available - const auto protocol_version = (service_.tcp_port_v1 != 0 ? service_.version : 0); - const auto port_to_use = (protocol_version == 0 ? service_.tcp_port_v0 : service_.tcp_port_v1); + // use protocol version 1 + const auto protocol_version = 1; + const auto port_to_use = service_.tcp_port_v1; // Create the client and add it to the map const std::vector> endpoint_list diff --git a/ecal/core/src/service/ecal_service_server_impl.cpp b/ecal/core/src/service/ecal_service_server_impl.cpp index ec1812af69..3f9939d251 100644 --- a/ecal/core/src/service/ecal_service_server_impl.cpp +++ b/ecal/core/src/service/ecal_service_server_impl.cpp @@ -21,8 +21,6 @@ * @brief eCAL service server implementation **/ -#include - #include "registration/ecal_registration_provider.h" #include "ecal_global_accessors.h" #include "ecal_service_server_impl.h" @@ -113,17 +111,8 @@ namespace eCAL return -1; }; - // start service protocol version 0 - if (Config::IsServiceProtocolV0Enabled()) - { - m_tcp_server_v0 = server_manager->create_server(0, 0, service_callback, true, event_callback); - } - // start service protocol version 1 - if (Config::IsServiceProtocolV1Enabled()) - { - m_tcp_server_v1 = server_manager->create_server(1, 0, service_callback, true, event_callback); - } + m_tcp_server = server_manager->create_server(1, 0, service_callback, true, event_callback); // mark as created m_created = true; @@ -147,11 +136,8 @@ namespace eCAL m_event_callback_map.clear(); } - if (m_tcp_server_v0) - m_tcp_server_v0->stop(); - - if (m_tcp_server_v1) - m_tcp_server_v1->stop(); + if (m_tcp_server) + m_tcp_server->stop(); // mark as no more created m_created = false; @@ -165,8 +151,7 @@ namespace eCAL { const std::lock_guard connected_lock(m_connected_mutex); - m_connected_v0 = false; - m_connected_v1 = false; + m_connected = false; } return(true); @@ -290,8 +275,7 @@ namespace eCAL { if (!m_created) return false; - return (m_tcp_server_v0 && m_tcp_server_v0->is_connected()) - || (m_tcp_server_v1 && m_tcp_server_v1->is_connected()); + return (m_tcp_server && m_tcp_server->is_connected()); } // called by the eCAL::CServiceGate to register a client @@ -314,11 +298,8 @@ namespace eCAL ecal_reg_sample.cmd_type = bct_reg_service; // might be zero in contruction phase - unsigned short const server_tcp_port_v0(m_tcp_server_v0 ? m_tcp_server_v0->get_port() : 0); - if ((Config::IsServiceProtocolV0Enabled()) && (server_tcp_port_v0 == 0)) return ecal_reg_sample; - - unsigned short const server_tcp_port_v1(m_tcp_server_v1 ? m_tcp_server_v1->get_port() : 0); - if ((Config::IsServiceProtocolV1Enabled()) && (server_tcp_port_v1 == 0)) return ecal_reg_sample; + unsigned short const server_tcp_port(m_tcp_server ? m_tcp_server->get_port() : 0); + if ((Config::IsServiceProtocolV1Enabled()) && (server_tcp_port == 0)) return ecal_reg_sample; auto& identifier = ecal_reg_sample.identifier; identifier.entity_id = m_service_id; @@ -331,8 +312,8 @@ namespace eCAL service.uname = Process::GetUnitName(); service.sname = m_service_name; - service.tcp_port_v0 = server_tcp_port_v0; - service.tcp_port_v1 = server_tcp_port_v1; + service.tcp_port_v0 = 0; + service.tcp_port_v1 = server_tcp_port; // add methods { @@ -486,42 +467,21 @@ namespace eCAL { const std::lock_guard connected_lock(m_connected_mutex); - // protocol version 0 - if (m_connected_v0) - { - if (m_tcp_server_v0 && !m_tcp_server_v0->is_connected()) - { - mode_changed = true; - m_connected_v0 = false; - Logging::Log(log_level_debug2, m_service_name + ": " + "client with protocol version 0 disconnected"); - } - } - else - { - if (m_tcp_server_v0 && m_tcp_server_v0->is_connected()) - { - mode_changed = true; - m_connected_v0 = true; - Logging::Log(log_level_debug2, m_service_name + ": " + "client with protocol version 0 connected"); - } - } - - // protocol version 1 - if (m_connected_v1) + if (m_connected) { - if (m_tcp_server_v1 && !m_tcp_server_v1->is_connected()) + if (m_tcp_server && !m_tcp_server->is_connected()) { mode_changed = true; - m_connected_v1 = false; + m_connected = false; Logging::Log(log_level_debug2, m_service_name + ": " + "client with protocol version 1 disconnected"); } } else { - if (m_tcp_server_v1 && m_tcp_server_v1->is_connected()) + if (m_tcp_server && m_tcp_server->is_connected()) { mode_changed = true; - m_connected_v1 = true; + m_connected = true; Logging::Log(log_level_debug2, m_service_name + ": " + "client with protocol version 1 connected"); } } diff --git a/ecal/core/src/service/ecal_service_server_impl.h b/ecal/core/src/service/ecal_service_server_impl.h index ccf741bef2..0a56ab6657 100644 --- a/ecal/core/src/service/ecal_service_server_impl.h +++ b/ecal/core/src/service/ecal_service_server_impl.h @@ -102,8 +102,7 @@ namespace eCAL int RequestCallback(const std::string& request_pb_, std::string& response_pb_); void EventCallback(eCAL_Server_Event event_, const std::string& message_); - std::shared_ptr m_tcp_server_v0; - std::shared_ptr m_tcp_server_v1; + std::shared_ptr m_tcp_server; static constexpr int m_server_version = 1; @@ -125,8 +124,7 @@ namespace eCAL EventCallbackMapT m_event_callback_map; mutable std::mutex m_connected_mutex; //!< mutex protecting the m_connected_v0 and m_connected_v1 variable, as those are modified by the event callbacks in another thread. - bool m_connected_v0 = false; - bool m_connected_v1 = false; + bool m_connected = false; std::atomic m_created; }; diff --git a/ecal/service/ecal_service/CMakeLists.txt b/ecal/service/ecal_service/CMakeLists.txt index ec6f734ba0..63d1ff833e 100644 --- a/ecal/service/ecal_service/CMakeLists.txt +++ b/ecal/service/ecal_service/CMakeLists.txt @@ -29,16 +29,12 @@ set(sources src/client_manager.cpp src/client_session.cpp src/client_session_impl_base.h - src/client_session_impl_v0.cpp - src/client_session_impl_v0.h src/client_session_impl_v1.cpp src/client_session_impl_v1.h src/condition_variable_signaler.h src/log_defs.h src/log_helpers.h src/protocol_layout.h - src/protocol_v0.cpp - src/protocol_v0.h src/protocol_v1.cpp src/protocol_v1.h src/server.cpp @@ -46,8 +42,6 @@ set(sources src/server_impl.h src/server_manager.cpp src/server_session_impl_base.h - src/server_session_impl_v0.cpp - src/server_session_impl_v0.h src/server_session_impl_v1.cpp src/server_session_impl_v1.h ) diff --git a/ecal/service/ecal_service/src/client_session.cpp b/ecal/service/ecal_service/src/client_session.cpp index 9a84ddbc50..83329848d9 100644 --- a/ecal/service/ecal_service/src/client_session.cpp +++ b/ecal/service/ecal_service/src/client_session.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,6 @@ #include #include "client_session_impl_v1.h" -#include "client_session_impl_v0.h" #include "condition_variable_signaler.h" namespace eCAL @@ -76,19 +75,13 @@ namespace eCAL } ClientSession::ClientSession(const std::shared_ptr& io_context - , std::uint8_t protocol_version + , std::uint8_t /*protocol_version*/ , const std::vector>& server_list , const EventCallbackT& event_callback , const LoggerT& logger) { - if (protocol_version == 0) - { - impl_ = ClientSessionV0::create(io_context, server_list, event_callback, logger); - } - else - { - impl_ = ClientSessionV1::create(io_context, server_list, event_callback, logger); - } + // we support v1 protocol only + impl_ = ClientSessionV1::create(io_context, server_list, event_callback, logger); } ClientSession::~ClientSession() diff --git a/ecal/service/ecal_service/src/client_session_impl_v0.cpp b/ecal/service/ecal_service/src/client_session_impl_v0.cpp deleted file mode 100644 index 3f8044fe98..0000000000 --- a/ecal/service/ecal_service/src/client_session_impl_v0.cpp +++ /dev/null @@ -1,594 +0,0 @@ -/* ========================= eCAL LICENSE ================================= - * - * Copyright (C) 2016 - 2024 Continental Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ========================= eCAL LICENSE ================================= -*/ - -#include "client_session_impl_v0.h" -#include "client_session_impl_base.h" - -#include "protocol_v0.h" -#include "protocol_layout.h" -#include "log_helpers.h" -#include "log_defs.h" - -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -namespace eCAL -{ - namespace service - { - ///////////////////////////////////// - // Constructor, Destructor, Create - ///////////////////////////////////// - std::shared_ptr ClientSessionV0::create(const std::shared_ptr& io_context - , const std::vector>& server_list - , const EventCallbackT& event_callback - , const LoggerT& logger) - { - std::shared_ptr instance(new ClientSessionV0(io_context, server_list, event_callback, logger)); - - // Throw exception, if the server list is empty - if (server_list.empty()) - { - throw std::invalid_argument("Server list must not be empty"); - } - - instance->resolve_endpoint(0); - - return instance; - } - - ClientSessionV0::ClientSessionV0(const std::shared_ptr& io_context - , const std::vector>& server_list - , const EventCallbackT& event_callback - , const LoggerT& logger) - : ClientSessionBase(io_context, event_callback) - , server_list_ (server_list) - , service_call_queue_strand_(*io_context) - , resolver_ (*io_context) - , logger_ (logger) - , state_ (State::NOT_CONNECTED) - , stopped_by_user_ (false) - , service_call_in_progress_ (false) - { - ECAL_SERVICE_LOG_DEBUG_VERBOSE(logger_, "Created"); - } - - ClientSessionV0::~ClientSessionV0() - { - ClientSessionV0::stop(); - ECAL_SERVICE_LOG_DEBUG_VERBOSE(logger_, "Deleted"); - } - - ////////////////////////////////////// - // Connection establishement - ////////////////////////////////////// - void ClientSessionV0::resolve_endpoint(size_t server_list_index) - { - ECAL_SERVICE_LOG_DEBUG(logger_, "Resolving endpoint [" + server_list_[server_list_index].first + ":" + std::to_string(server_list_[server_list_index].second) + "]..."); - - const asio::ip::tcp::resolver::query query(server_list_[server_list_index].first, std::to_string(server_list_[server_list_index].second)); - - resolver_.async_resolve(query - , service_call_queue_strand_.wrap([me = enable_shared_from_this::shared_from_this(), server_list_index_copy = server_list_index] // gcc 7 forces us to assign the server_list_index to a new variable - (asio::error_code ec, const asio::ip::tcp::resolver::iterator& resolved_endpoints) - { - if (ec) - { -#if ECAL_SERVICE_LOG_DEBUG_ENABLED - { - const std::string message = "Failed resolving endpoint [" + me->server_list_[server_list_index_copy].first + ":" + std::to_string(me->server_list_[server_list_index_copy].second) + "]: " + ec.message(); - ECAL_SERVICE_LOG_DEBUG(me->logger_, message); - } -#endif - - if (server_list_index_copy + 1 < me->server_list_.size()) - { - // Try next possible endpoint - me->resolve_endpoint(server_list_index_copy + 1); - } - else - { - std::string message = "Failed resolving any endpoint: "; - for (size_t j = 0; j < me->server_list_.size(); ++j) - { - message += me->server_list_[j].first + ":" + std::to_string(me->server_list_[j].second); - if (j + 1 < me->server_list_.size()) - { - message += ", "; - } - } - me->logger_(LogLevel::Error, message); - me->handle_connection_loss_error(message); - } - return; - } - else - { -#if ECAL_SERVICE_LOG_DEBUG_VERBOSE_ENABLED - // Verbose-debug log of all endpoints - { - std::string endpoints_str = "Resolved endpoints for " + me->server_list_[server_list_index_copy].first + ": "; - for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); ++it) - { - endpoints_str += endpoint_to_string(*it) + ", "; - } - ECAL_SERVICE_LOG_DEBUG_VERBOSE(me->logger_, endpoints_str); - } -#endif //ECAL_SERVICE_LOG_DEBUG_VERBOSE_ENABLED - me->connect_to_endpoint(resolved_endpoints, server_list_index_copy); - } - })); - } - - void ClientSessionV0::connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints, size_t server_list_index) - { - // Convert the resolved_endpoints iterator to an endpoint sequence - // (i.e. a vector of endpoints) - auto endpoint_sequence = std::make_shared>(); - for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); ++it) - { - endpoint_sequence->push_back(*it); - } - - const std::lock_guard socket_lock(socket_mutex_); - asio::async_connect(socket_ - , *endpoint_sequence - , service_call_queue_strand_.wrap([me = shared_from_this(), endpoint_sequence, server_list_index](asio::error_code ec, const asio::ip::tcp::endpoint& endpoint) - { - (void)endpoint; - if (ec) - { - { - // Log an error - const std::string message = "Failed to connect to endpoint [" + me->chosen_endpoint_.first + ":" + std::to_string(me->chosen_endpoint_.second) + "]: " + ec.message(); - me->logger_(LogLevel::Error, message); - } - - // If there are more servers available, try the next one - if (server_list_index + 1 < me->server_list_.size()) - { - me->resolve_endpoint(server_list_index + 1); - } - else - { - std::string message = "Failed to connect to any endpoint: "; - for (size_t j = 0; j < me->server_list_.size(); ++j) - { - message += me->server_list_[j].first + ":" + std::to_string(me->server_list_[j].second); - if (j + 1 < me->server_list_.size()) - { - message += ", "; - } - } - me->logger_(LogLevel::Error, message); - me->handle_connection_loss_error(message); - } - return; - } - else - { - ECAL_SERVICE_LOG_DEBUG(me->logger_, "Successfully connected to endpoint [" + endpoint.address().to_string() + ":" + std::to_string(endpoint.port()) + "]"); - - // Disable Nagle's algorithm. Nagles Algorithm will otherwise cause the - // Socket to wait for more data, if it encounters a frame that can still - // fit more data. Obviously, this is an awfull default behaviour, if we - // want to transmit our data in a timely fashion. - { - asio::error_code socket_option_ec; - { - const std::lock_guard socket_lock(me->socket_mutex_); - me->socket_.set_option(asio::ip::tcp::no_delay(true), socket_option_ec); // NOLINT(bugprone-unused-return-value) -> We already get the value from the ec parameter - } - if (socket_option_ec) - { - me->logger_(LogLevel::Warning, "[" + get_connection_info_string(me->socket_) + "] " + "Failed setting tcp::no_delay option: " + socket_option_ec.message()); - } - } - - { - // Set the chosen endpoint - const std::lock_guard chosen_endpoint_lock(me->chosen_endpoint_mutex_); - me->chosen_endpoint_ = me->server_list_[server_list_index]; - } - - const std::string message = "Connected to server. Using protocol version 0."; - me->logger_(LogLevel::Info, "[" + get_connection_info_string(me->socket_) + "] " + message); - - { - const std::lock_guard lock(me->service_state_mutex_); - me->state_ = State::CONNECTED; - } - - // Call event callback - if(me->event_callback_) me->event_callback_(eCAL::service::ClientEventType::Connected, message); - - // Start sending service requests, if there are any - { - const std::lock_guard lock(me->service_state_mutex_); - if (!me->service_call_queue_.empty()) - { - // If there are service calls in the queue, we send the next one. - me->service_call_in_progress_ = true; - me->send_next_service_request(me->service_call_queue_.front().request, me->service_call_queue_.front().response_cb); - me->service_call_queue_.pop_front(); - } - else - { - // If there are no more service calls to send, we go to error-peeking. - // While error peeking we basically do nothing, except from non-destructively - // reading 1 byte from the socket (i.e. without removing it from the socket). - // This will cause asio / the OS to notify us, when the server closed the connection. - - me->service_call_in_progress_ = false; - me->peek_for_error(); - } - } - } - })); - } - - ////////////////////////////////////// - // Service calls - ////////////////////////////////////// - - bool ClientSessionV0::async_call_service(const std::shared_ptr& request, const ResponseCallbackT& response_callback) - { - // Lock mutex for stopped_by_user_ variable - const std::lock_guard service_state_lock(service_state_mutex_); - - if (stopped_by_user_) - { - return false; - } - else - { - service_call_queue_strand_.post([me = shared_from_this(), request, response_callback]() - { - // Variable that enables us to unlock the mutex before actually calling the callback - bool call_response_callback_with_error(false); - - { - const std::lock_guard lock(me->service_state_mutex_); - if (me->state_ != State::FAILED) - { - // If we are not in failed state, let's check - // whether we directly invoke the call of if we add it to the queue - - if (!me->service_call_in_progress_ && (me->state_ == State::CONNECTED)) - { - // Directly call the the service, iff - // - // - There is no call in progress - // - // and - // - // - We are connected - // - ECAL_SERVICE_LOG_DEBUG_VERBOSE(me->logger_, "[" + get_connection_info_string(me->socket_) + "] " + " No service call in progress. Directly starting next service call."); - me->service_call_in_progress_ = true; - me->send_next_service_request(request, response_callback); - } - else - { - // Add the call to the queue, iff: - // - // - A call is already in progress - // - // or - // - // - We are not connected, yet - // - ECAL_SERVICE_LOG_DEBUG_VERBOSE(me->logger_, "[" + get_connection_info_string(me->socket_) + "] " + "Queuing new service request"); - me->service_call_queue_.push_back(ServiceCall{request, response_callback}); - } - } - else - { - // If we are in FAILED state, we directly call the callback with an error. - call_response_callback_with_error = true; - } - } - - if(call_response_callback_with_error) - { - // If we are in FAILED state, we directly call the callback with an error. - // The mutex is unlocked at this point. That is important, as we have no - // influence on when the callback will return. - ECAL_SERVICE_LOG_DEBUG_VERBOSE(me->logger_, "[" + get_connection_info_string(me->socket_) + "] " + " Client is in FAILED state. Calling callback with error."); - response_callback(eCAL::service::Error::ErrorCode::CONNECTION_CLOSED, nullptr); - } - }); - return true; - } - } - - void ClientSessionV0::send_next_service_request(const std::shared_ptr& request, const ResponseCallbackT& response_cb) - { - ECAL_SERVICE_LOG_DEBUG(logger_, "[" + get_connection_info_string(socket_) + "] " + "Sending service request..."); - - // V0 writes payload with no header - - const std::lock_guard socket_lock(socket_mutex_); - asio::async_write(socket_ - , asio::buffer(*request) - , [me = shared_from_this(), request, response_cb](asio::error_code ec, std::size_t /*bytes_sent*/) - { - if (ec) - { - const std::string message = "Failed sending service request: " + ec.message(); - me->logger_(LogLevel::Error, "[" + get_connection_info_string(me->socket_) + "] " + message); - - // Call the callback with an error - response_cb(Error(Error::ErrorCode::CONNECTION_CLOSED, message), nullptr); - - // Further handle the error, e.g. unwinding pending service calls and calling the event callback - me->handle_connection_loss_error(message); - return; - } - ECAL_SERVICE_LOG_DEBUG_VERBOSE(me->logger_, "[" + get_connection_info_string(me->socket_) + "] " + "Successfully sent service request."); - me->receive_service_response(response_cb); - }); - - } - - void ClientSessionV0::receive_service_response(const ResponseCallbackT& response_cb) - { - ECAL_SERVICE_LOG_DEBUG_VERBOSE(logger_, "[" + get_connection_info_string(socket_) + "] " + "Waiting for service response..."); - - eCAL::service::ProtocolV0::async_receive_payload_with_header(socket_, socket_mutex_ - , service_call_queue_strand_.wrap([me = shared_from_this(), response_cb](asio::error_code ec) - { - const std::string message = "Failed receiving service response: " + ec.message(); - me->logger_(LogLevel::Error, "[" + get_connection_info_string(me->socket_) + "] " + message); - - // Call the callback with an error - response_cb(Error(Error::ErrorCode::CONNECTION_CLOSED, message), nullptr); - - // Further handle the error, e.g. unwinding pending service calls and calling the event callback - me->handle_connection_loss_error(message); - }) - , service_call_queue_strand_.wrap([me = shared_from_this(), response_cb](const std::shared_ptr& /*header_buffer*/, const std::shared_ptr& payload_buffer) - { - // The response is a Service response - ECAL_SERVICE_LOG_DEBUG(me->logger_, "[" + get_connection_info_string(me->socket_) + "] " + "Successfully received service response of " + std::to_string(payload_buffer->size()) + " bytes"); - - // Call the user's callback - response_cb(Error::OK, payload_buffer); - - // Check if there are more items in the queue. If so, send the next request - // The mutex must be locket, as we access the queue. - { - const std::lock_guard lock(me->service_state_mutex_); - - if (!me->service_call_queue_.empty()) - { - // If there are more items, continue calling the service - ECAL_SERVICE_LOG_DEBUG_VERBOSE(me->logger_, "[" + get_connection_info_string(me->socket_) + "] " + " Service call queue contains " + std::to_string(me->service_call_queue_.size()) + " Entries. Starting next service call."); - me->service_call_in_progress_ = true; - me->send_next_service_request(me->service_call_queue_.front().request, me->service_call_queue_.front().response_cb); - me->service_call_queue_.pop_front(); - } - else - { - // If there are no more service calls to send, we go to error-peeking. - // While error peeking we basically do nothing, except from non-destructively - // reading 1 byte from the socket (i.e. without removing it from the socket). - // This will cause asio / the OS to notify us, when the server closed the connection. - - ECAL_SERVICE_LOG_DEBUG_VERBOSE(me->logger_, "[" + get_connection_info_string(me->socket_) + "] " + " No further servcice calls."); - me->service_call_in_progress_ = false; - me->peek_for_error(); - } - } - })); - - - } - - ////////////////////////////////////// - // Status API - ////////////////////////////////////// - - std::string ClientSessionV0::get_host() const - { - const std::lock_guard chosen_endpoint_lock(chosen_endpoint_mutex_); - return chosen_endpoint_.first; - } - - std::uint16_t ClientSessionV0::get_port() const - { - const std::lock_guard chosen_endpoint_lock(chosen_endpoint_mutex_); - return chosen_endpoint_.second; - } - - asio::ip::tcp::endpoint ClientSessionV0::get_remote_endpoint() const - { - // form remote endpoint string - { - asio::error_code ec; - auto endpoint = socket_.remote_endpoint(ec); - if (!ec) - return endpoint; - else - return asio::ip::tcp::endpoint(); - } - } - - State ClientSessionV0::get_state() const - { - const std::lock_guard lock(service_state_mutex_); - return state_; - } - - std::uint8_t ClientSessionV0::get_accepted_protocol_version() const - { - return 0; - } - - int ClientSessionV0::get_queue_size() const - { - const std::lock_guard lock(service_state_mutex_); - return static_cast(service_call_queue_.size()); - } - - ////////////////////////////////////// - // Shutdown - ////////////////////////////////////// - void ClientSessionV0::peek_for_error() - { - const std::shared_ptr> peek_buffer = std::make_shared>(1, '\0'); - - const std::lock_guard socket_lock(socket_mutex_); - socket_.async_receive(asio::buffer(*peek_buffer) - , asio::socket_base::message_peek - , service_call_queue_strand_.wrap([me = shared_from_this(), peek_buffer](const asio::error_code& ec, std::size_t /*bytes_transferred*/) { - if (ec) - { - const std::string message = "Connection loss while idling: " + ec.message(); - me->logger_(eCAL::service::LogLevel::Info, "[" + get_connection_info_string(me->socket_) + "] " + message); - me->handle_connection_loss_error("Connection loss while idling: " + ec.message()); - } - })); - } - - void ClientSessionV0::handle_connection_loss_error(const std::string& error_message) - { - bool call_event_callback (false); // Variable that enables us to unlock the mutex before we execute the event callback. - - // Close the socket, so all waiting async operations are actually woken - // up and fail with an error code. If we wouldn't do that, at least on - // Ubuntu only 1 waiting operations would wake up, while the others would - // indefinitively continue to wait. - // Having multiple async operations on the same socket actually does - // happen, as we are peeking for errors whenever we don't send or - // receive anything. - close_socket(); - - { - const std::lock_guard lock(service_state_mutex_); - - // cancel the connection loss handling, if we are already in FAILED state - if (state_ == State::FAILED) - return; - - if (state_ == State::CONNECTED) - { - // Event callback - call_event_callback = true; - } - - // Set the state to FAILED - state_ = State::FAILED; - - // call all callbacks from the queue with an error - if (!service_call_queue_.empty()) - { - ECAL_SERVICE_LOG_DEBUG(logger_, "[" + get_connection_info_string(socket_) + "] " + "Calling " + std::to_string(service_call_queue_.size()) + " service callbacks with error"); - call_all_callbacks_with_error(); - } - } - - if (call_event_callback && event_callback_) - { - event_callback_(eCAL::service::ClientEventType::Disconnected, error_message); - } - } - - void ClientSessionV0::call_all_callbacks_with_error() - { - service_call_queue_strand_.post([me = shared_from_this()]() - { - ServiceCall first_service_call; - bool more_service_calls(false); - - { - // Lock the mutex and manipulate the queue. We want the mutex unlocked for the event callback call. - const std::lock_guard lock(me->service_state_mutex_); - - if (me->service_call_queue_.empty()) - return; - - first_service_call = std::move(me->service_call_queue_.front()); - me->service_call_queue_.pop_front(); - - more_service_calls = (!me->service_call_queue_.empty()); - } - - // Execute the callback with an error - first_service_call.response_cb(eCAL::service::Error::ErrorCode::CONNECTION_CLOSED, nullptr); // TODO: I should probably store the error that lead to this somewhere and tell the actual error. - - // If there are more sevice calls, call those with an error, as well - if (more_service_calls) - me->call_all_callbacks_with_error(); - }); - } - - void ClientSessionV0::stop() - { - // This is a function that gets used both by the API and by potentially - // multiple failing async operations at once. - - { - // Set the stopped_by_user_ flag to true, so that the async operations stop enqueuing new service calls. - const std::lock_guard service_state_lock(service_state_mutex_); - stopped_by_user_ = true; - } - - { - close_socket(); - } - } - - void ClientSessionV0::close_socket() - { - // Close the socket, so all waiting async operations will fail with an - // error code. This will also cause the client to call all pending - // callbacks with an error. - const std::lock_guard socket_lock(socket_mutex_); - - if (socket_.is_open()) - { - { - asio::error_code ec; - socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec); // NOLINT(bugprone-unused-return-value) -> we already get the value by the ec parameter - } - - { - asio::error_code ec; - socket_.close(ec); // NOLINT(bugprone-unused-return-value) -> we already get the value by the ec parameter - } - } - } - - - } // namespace service -} // namespace eCAL diff --git a/ecal/service/ecal_service/src/client_session_impl_v0.h b/ecal/service/ecal_service/src/client_session_impl_v0.h deleted file mode 100644 index 326ad1b648..0000000000 --- a/ecal/service/ecal_service/src/client_session_impl_v0.h +++ /dev/null @@ -1,163 +0,0 @@ -/* ========================= eCAL LICENSE ===== ============================ - * - * Copyright (C) 2016 - 2019 Continental Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ========================= eCAL LICENSE ================================= -*/ - -#pragma once - -#include "client_session_impl_base.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include -#include - -namespace eCAL -{ - namespace service - { - class ClientSessionV0 - : public ClientSessionBase - , public std::enable_shared_from_this - { - ////////////////////////////////////// - // Internal types - ////////////////////////////////////// - private: - struct ServiceCall - { - std::shared_ptr request; - ResponseCallbackT response_cb; - }; - - ///////////////////////////////////// - // Constructor, Destructor, Create - ///////////////////////////////////// - public: - static std::shared_ptr create(const std::shared_ptr& io_context - , const std::vector>& server_list - , const EventCallbackT& event_callback - , const LoggerT& logger = default_logger("Service Client V1")); - - protected: - ClientSessionV0(const std::shared_ptr& io_context - , const std::vector>& server_list - , const EventCallbackT& event_callback - , const LoggerT& logger); - - public: - // Delete copy / move constructor and assignment operator - ClientSessionV0(const ClientSessionV0&) = delete; // Copy construct - ClientSessionV0(ClientSessionV0&&) = delete; // Move construct - - ClientSessionV0& operator=(const ClientSessionV0&) = delete; // Copy assign - ClientSessionV0& operator=(ClientSessionV0&&) = delete; // Move assign - - ~ClientSessionV0() override; - - ////////////////////////////////////// - // Connection establishement - ////////////////////////////////////// - private: - void resolve_endpoint(size_t server_list_index); - void connect_to_endpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints, size_t server_list_index); - - ////////////////////////////////////// - // Service calls - ////////////////////////////////////// - public: - bool async_call_service(const std::shared_ptr& request, const ResponseCallbackT& response_callback) override; - - private: - void send_next_service_request(const std::shared_ptr& request, const ResponseCallbackT& response_cb); - void receive_service_response(const ResponseCallbackT& response_cb); - - ////////////////////////////////////// - // Status API - ////////////////////////////////////// - public: - std::string get_host() const override; - std::uint16_t get_port() const override; - asio::ip::tcp::endpoint get_remote_endpoint() const override; - - State get_state() const override; - std::uint8_t get_accepted_protocol_version() const override; - int get_queue_size() const override; - - ////////////////////////////////////// - // Shutdown - ////////////////////////////////////// - public: - void peek_for_error(); - void handle_connection_loss_error(const std::string& message); - void call_all_callbacks_with_error(); - - /** - * @brief Stop the client. - * - * This will close the connection AND set the state to stopped. This - * means, that no further async service calls will be accepted. From now - * on, all async service calls will just return false and the callback - * will not be called any more in order to give the io_thread the chance - * of shutting down. - */ - void stop() override; - - private: - /** - * @brief Closes the internal socket - * - * This will close the connection without setting the state to stopped. - * This means, that further async service calls will still be accepted, - * but they will fail. The failure is still communicated by calling the - * callback. For a proper shutdown, where the plan is to stop the - * io_context thread as well, the stop() function must be used, or the - * client must be destroyed. - */ - void close_socket(); - - ////////////////////////////////////// - // Member variables - ////////////////////////////////////// - private: - - const std::vector> server_list_; //!< The list of servers that this client was created with. They will be tried in order. - - mutable std::mutex chosen_endpoint_mutex_; //!< Protects the chosen_endpoint_ variable. - std::pair chosen_endpoint_; //!< The endpoint that the client is currently connected to. Protected by chosen_endpoint_mutex_. - - asio::io_context::strand service_call_queue_strand_; - asio::ip::tcp::resolver resolver_; - const LoggerT logger_; - - mutable std::mutex service_state_mutex_; - State state_; //!< The connection state of this client. Protected by service_state_mutex_. - bool stopped_by_user_; //!< Telling whether we actively stopped the client. Protected by service_state_mutex_. When set, the client will not accept any more async service calls. - std::deque service_call_queue_; - bool service_call_in_progress_; - }; - } -} diff --git a/ecal/service/ecal_service/src/protocol_v0.cpp b/ecal/service/ecal_service/src/protocol_v0.cpp deleted file mode 100644 index c66d297e73..0000000000 --- a/ecal/service/ecal_service/src/protocol_v0.cpp +++ /dev/null @@ -1,141 +0,0 @@ -/* ========================= eCAL LICENSE ================================= - * - * Copyright (C) 2016 - 2019 Continental Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ========================= eCAL LICENSE ================================= -*/ - -#include "protocol_v0.h" - -#include "protocol_layout.h" -#include -#include -#include -#include -#include - -#include -#include - -#ifdef WIN32 - #include -#else - #include -#endif - -namespace eCAL -{ - namespace service - { - namespace ProtocolV0 - { - namespace - { - void read_header(asio::ip::tcp::socket& socket, std::mutex& socket_mutex, const ErrorCallbackT& error_cb, const ReceiveSuccessCallback& success_cb); - void read_payload(asio::ip::tcp::socket& socket, std::mutex& socket_mutex, const std::shared_ptr& header_buffer, const ErrorCallbackT& error_cb, const ReceiveSuccessCallback& success_cb); - - /////////////////////////////////////////////////// - // Read and write implementation - /////////////////////////////////////////////////// - void read_header(asio::ip::tcp::socket& socket, std::mutex& socket_mutex, const ErrorCallbackT& error_cb, const ReceiveSuccessCallback& success_cb) - { - // Allocate a buffer for the header as we know it. We can make it larger - // later, if necessary. We zero the buffer. - const auto header_buffer = std::make_shared(); - constexpr size_t header_buffer_size = sizeof(eCAL::service::TcpHeaderV0); - - // Receive data from the socket: - // - Maximum "bytes_to_read_now" - // - At least "bytes_to_read_now" - // => We read exactly the "bytes_to_read_now" amount of bytes - const std::lock_guard socket_lock(socket_mutex); - asio::async_read(socket - , asio::buffer(reinterpret_cast(header_buffer.get()), header_buffer_size) - , asio::transfer_at_least(header_buffer_size) - , [&socket, &socket_mutex, header_buffer, error_cb, success_cb](asio::error_code ec, std::size_t /*bytes_read*/) - { - if (ec) - { - // Call error callback - error_cb(ec); - return; - } - - // Read the payload that comes after the header - read_payload(socket, socket_mutex, header_buffer, error_cb, success_cb); - }); - } - - void read_payload(asio::ip::tcp::socket& socket, std::mutex& socket_mutex, const std::shared_ptr& header_buffer, const ErrorCallbackT& error_cb, const ReceiveSuccessCallback& success_cb) - { - // Read how many bytes we will get as payload - const uint32_t payload_size = ntohl(header_buffer->package_size_n); - - // Reserver enough memory for receiving the entire payload. The payload is - // represented as an std::string for legacy, reasons. It is not textual data. - const std::shared_ptr payload_buffer = std::make_shared(payload_size, '\0'); - - // Read all the payload data into the payload_buffer - const std::lock_guard socket_lock(socket_mutex); - asio::async_read(socket - , asio::buffer(const_cast(payload_buffer->data()), payload_buffer->size()) - , asio::transfer_at_least(payload_buffer->size()) - , [header_buffer, payload_buffer, error_cb, success_cb](asio::error_code ec, std::size_t /*bytes_read*/) - { - if (ec) - { - // Call error callback - error_cb(ec); - return; - } - - // Call success callback - success_cb(header_buffer, payload_buffer); - }); - - } - } - - /////////////////////////////////////////////////// - // Public API - /////////////////////////////////////////////////// - void async_send_payload_with_header(asio::ip::tcp::socket& socket, std::mutex& socket_mutex, const std::shared_ptr& header_buffer, const std::shared_ptr& payload_buffer, const ErrorCallbackT& error_cb, const SendSuccessCallback& success_cb) - { - const std::vector buffer_list { asio::buffer(reinterpret_cast(header_buffer.get()), sizeof(eCAL::service::TcpHeaderV0)) - , asio::buffer(*payload_buffer)}; - - const std::lock_guard socket_lock(socket_mutex); - asio::async_write(socket - , buffer_list - , [header_buffer, payload_buffer, error_cb, success_cb](asio::error_code ec, std::size_t /*bytes_sent*/) - { - if (ec) - { - // Call error callback - error_cb(ec); - return; - } - success_cb(); - }); - - } - - void async_receive_payload_with_header(asio::ip::tcp::socket& socket, std::mutex& socket_mutex, const ErrorCallbackT& error_cb, const ReceiveSuccessCallback& success_cb) - { - read_header(socket, socket_mutex, error_cb, success_cb); - } - } // namespace ProtocolV1 - } // namespace service -} // namespace eCAL diff --git a/ecal/service/ecal_service/src/protocol_v0.h b/ecal/service/ecal_service/src/protocol_v0.h deleted file mode 100644 index d5fb1aeb42..0000000000 --- a/ecal/service/ecal_service/src/protocol_v0.h +++ /dev/null @@ -1,45 +0,0 @@ -/* ========================= eCAL LICENSE ===== ============================ - * - * Copyright (C) 2016 - 2019 Continental Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ========================= eCAL LICENSE ================================= -*/ - -#pragma once - -#include -#include -#include -#include - -#include - -#include "protocol_layout.h" - -namespace eCAL -{ - namespace service - { - namespace ProtocolV0 - { - using ErrorCallbackT = std::function; - using SendSuccessCallback = std::function; - using ReceiveSuccessCallback = std::function& header_buffer, const std::shared_ptr& payload_buffer)>; - - void async_send_payload_with_header (asio::ip::tcp::socket& socket, std::mutex& socket_mutex, const std::shared_ptr& header_buffer, const std::shared_ptr& payload_buffer, const ErrorCallbackT& error_cb, const SendSuccessCallback& success_cb); - void async_receive_payload_with_header(asio::ip::tcp::socket& socket, std::mutex& socket_mutex, const ErrorCallbackT& error_cb, const ReceiveSuccessCallback& success_cb); - } - } -} diff --git a/ecal/service/ecal_service/src/server_impl.cpp b/ecal/service/ecal_service/src/server_impl.cpp index f1f077baee..9566e4ec07 100644 --- a/ecal/service/ecal_service/src/server_impl.cpp +++ b/ecal/service/ecal_service/src/server_impl.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,6 @@ #include "server_session_impl_base.h" #include "server_session_impl_v1.h" -#include "server_session_impl_v0.h" #include #include @@ -239,14 +238,8 @@ namespace eCAL service_callback_strand = service_callback_common_strand_; } - if (protocol_version == 0) - { - new_session = eCAL::service::ServerSessionV0::create(io_context_, service_callback_, service_callback_strand, event_callback_, shutdown_callback, logger_); - } - else - { - new_session = eCAL::service::ServerSessionV1::create(io_context_, service_callback_, service_callback_strand, event_callback_, shutdown_callback, logger_); - } + // we support v1 protocol only + new_session = eCAL::service::ServerSessionV1::create(io_context_, service_callback_, service_callback_strand, event_callback_, shutdown_callback, logger_); // Accept new session. // By only storing a weak_ptr to this, we assure that the user can still diff --git a/ecal/service/ecal_service/src/server_session_impl_v0.cpp b/ecal/service/ecal_service/src/server_session_impl_v0.cpp deleted file mode 100644 index a96f6ec8ac..0000000000 --- a/ecal/service/ecal_service/src/server_session_impl_v0.cpp +++ /dev/null @@ -1,270 +0,0 @@ -/* ========================= eCAL LICENSE ================================= - * - * Copyright (C) 2016 - 2019 Continental Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ========================= eCAL LICENSE ================================= -*/ - -#include "server_session_impl_v0.h" -#include "server_session_impl_base.h" - -#include "log_helpers.h" -#include "log_defs.h" -#include "protocol_layout.h" - -#include -#include -#include -#include -#include -#include - -#include - -#include -#include -#include - -#ifdef WIN32 - #include -#else - #include -#endif - -/////////////////////////////////////////////// -// Create, Constructor, Destructor -/////////////////////////////////////////////// - -namespace eCAL -{ - namespace service - { - - std::shared_ptr ServerSessionV0::create(const std::shared_ptr& io_context - , const ServerServiceCallbackT& service_callback - , const std::shared_ptr& service_callback_strand - , const ServerEventCallbackT& event_callback - , const ShutdownCallbackT& shutdown_callback - , const LoggerT& logger) - { - std::shared_ptr instance = std::shared_ptr(new ServerSessionV0(io_context, service_callback, service_callback_strand, event_callback, shutdown_callback, logger)); - return instance; - } - - ServerSessionV0::ServerSessionV0(const std::shared_ptr& io_context - , const ServerServiceCallbackT& service_callback - , const std::shared_ptr& service_callback_strand - , const ServerEventCallbackT& event_callback - , const ShutdownCallbackT& shutdown_callback - , const LoggerT& logger) - : ServerSessionBase(io_context, service_callback, service_callback_strand, event_callback, shutdown_callback) - , logger_ (logger) - , state_ (State::NOT_CONNECTED) - { - ECAL_SERVICE_LOG_DEBUG_VERBOSE(logger_, "Server Session Created"); - } - - // Destructor - ServerSessionV0::~ServerSessionV0() - { - ServerSessionV0::stop(); - ECAL_SERVICE_LOG_DEBUG_VERBOSE(logger_, "Server Session Deleted"); - } - - /////////////////////////////////////////////// - // Data receiving and sending - /////////////////////////////////////////////// - void ServerSessionV0::start() - { - // Call the handle_start with the io_service - // It is important to async call handle_start(), as it will call a - // user-defined callback. As we have no influence what that callback will - // be, we must call it from another thread to make sure to not double-lock - // mutexes from the server_impl, if the callback should itself call a - // server_impl api function. - - io_context_->post([me = shared_from_this()]() { me->handle_start(); }); - } - - void ServerSessionV0::handle_start() - { - // Go to handshake state - state_ = State::CONNECTED; - - ECAL_SERVICE_LOG_DEBUG(logger_, "[" + get_connection_info_string(socket_) + "] " + "Starting..."); - - const std::string message = "Client has connected. Using protocol version 0."; - event_callback_(eCAL::service::ServerEventType::Connected, message); - logger_(LogLevel::Info, "[" + get_connection_info_string(socket_) + "] " + message); - - // Disable Nagle's algorithm. Nagles Algorithm will otherwise cause the - // Socket to wait for more data, if it encounters a frame that can still - // fit more data. Obviously, this is an awfull default behaviour, if we - // want to transmit our data in a timely fashion. - { - asio::error_code socket_option_ec; - { - const std::lock_guard socket_lock(socket_mutex_); - socket_.set_option(asio::ip::tcp::no_delay(true), socket_option_ec); // NOLINT(bugprone-unused-return-value) -> We already get the return value rom the ec parameter - } - if (socket_option_ec) - { - logger_(LogLevel::Warning, "[" + get_connection_info_string(socket_) + "] " + "Failed setting tcp::no_delay option: " + socket_option_ec.message()); - } - } - - ECAL_SERVICE_LOG_DEBUG(logger_, "[" + get_connection_info_string(socket_) + "] " + "Waiting for service request..."); - const std::lock_guard socket_lock(socket_mutex_); - socket_.async_read_some(asio::buffer(data_, max_length) - , service_callback_strand_->wrap([me = shared_from_this()](asio::error_code ec, std::size_t bytes_read) - { - me->handle_read(ec, bytes_read, std::make_shared()); - })); - } - - void ServerSessionV0::stop() - { - ECAL_SERVICE_LOG_DEBUG(logger_, "[" + get_connection_info_string(socket_) + "] " + "Stopping..."); - - const std::lock_guard socket_lock(socket_mutex_); - if (socket_.is_open()) - { - { - // Shutdown the socket - asio::error_code ec; - socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec); // NOLINT(bugprone-unused-return-value) -> We already get the return value rom the ec parameter - } - - { - // Close the socket - asio::error_code ec; - socket_.close(ec); // NOLINT(bugprone-unused-return-value) -> We already get the return value rom the ec parameter - } - } - } - - eCAL::service::State ServerSessionV0::get_state() const - { - return state_; - } - - void ServerSessionV0::handle_read(const asio::error_code& ec, size_t bytes_transferred, const std::shared_ptr& request) - { - ECAL_SERVICE_LOG_DEBUG_VERBOSE(logger_, "[" + get_connection_info_string(socket_) + "] " + "Received " + std::to_string(bytes_transferred) + " bytes."); - - if (!ec) - { - // collect request - *request += std::string(data_, bytes_transferred); - // are there some more data on the socket ? - - size_t bytes_available_on_socket(0); - { - asio::error_code socket_available_ec; - { - const std::lock_guard socket_lock(socket_mutex_); - bytes_available_on_socket = socket_.available(socket_available_ec); - } - - if (socket_available_ec) - { - // -- This code is a copy of the code in the else branch below. -- - state_ = State::FAILED; - const auto message = "Disconnected on read: " + socket_available_ec.message(); - logger_(eCAL::service::LogLevel::Info, "[" + get_connection_info_string(socket_) + "] " + message); - event_callback_(eCAL::service::ServerEventType::Disconnected, message); - shutdown_callback_(shared_from_this()); - return; - } - } - - if (bytes_available_on_socket != 0u) - { - ECAL_SERVICE_LOG_DEBUG_VERBOSE(logger_, "[" + get_connection_info_string(socket_) + "] " + "More data is available on socket! Reading more data..."); - - { - const std::lock_guard socket_lock(socket_mutex_); - socket_.async_read_some(asio::buffer(data_, max_length) - , service_callback_strand_->wrap([me = shared_from_this(), request](asio::error_code ec, std::size_t bytes_read) - { - me->handle_read(ec, bytes_read, request); - })); - } - } - // no more data - else - { - // execute service callback - ECAL_SERVICE_LOG_DEBUG_VERBOSE(logger_, "[" + get_connection_info_string(socket_) + "] " + "Socket currently doesn't hold any more data."); - ECAL_SERVICE_LOG_DEBUG(logger_, "[" + get_connection_info_string(socket_) + "] " + "handle_read final request size: " + std::to_string(request->size()) + ". Executing callback..."); - - auto response = std::make_shared(); - service_callback_(request, response); - - ECAL_SERVICE_LOG_DEBUG_VERBOSE(logger_, "[" + get_connection_info_string(socket_) + "] " + "Server callback executed. Reponse size: " + std::to_string(response->size()) + "."); - - const auto header = std::make_shared(); - header->package_size_n = htonl(static_cast(response->size())); - - const std::vector buffer_list { asio::buffer(reinterpret_cast(header.get()), sizeof(eCAL::service::TcpHeaderV0)) - , asio::buffer(*response)}; - - { - const std::lock_guard socket_lock(socket_mutex_); - asio::async_write(socket_ - , buffer_list - , [me = shared_from_this(), header, response](asio::error_code ec, std::size_t bytes_written) - { - me->handle_write(ec, bytes_written); - }); - } - } - } - else - { - state_ = State::FAILED; - const auto message = "Disconnected on read: " + ec.message(); - logger_(eCAL::service::LogLevel::Info, "[" + get_connection_info_string(socket_) + "] " + message); - event_callback_(eCAL::service::ServerEventType::Disconnected, message); - shutdown_callback_(shared_from_this()); - } - } - - void ServerSessionV0::handle_write(const asio::error_code& ec, std::size_t /*bytes_transferred*/) - { - if (!ec) - { - ECAL_SERVICE_LOG_DEBUG(logger_, "[" + get_connection_info_string(socket_) + "] " + "Waiting for service request..."); - { - const std::lock_guard socket_lock(socket_mutex_); - socket_.async_read_some(asio::buffer(data_, max_length) - , service_callback_strand_->wrap([me = shared_from_this()](asio::error_code ec, std::size_t bytes_read) - { - me->handle_read(ec, bytes_read, std::make_shared()); - })); - } - } - else - { - state_ = State::FAILED; - const auto message = "Disconnected on write: " + ec.message(); - logger_(eCAL::service::LogLevel::Error, "[" + get_connection_info_string(socket_) + "] " + message); - event_callback_(eCAL::service::ServerEventType::Disconnected, message); - shutdown_callback_(shared_from_this()); - } - } - - } // namespace service -} // namespace eCAL diff --git a/ecal/service/ecal_service/src/server_session_impl_v0.h b/ecal/service/ecal_service/src/server_session_impl_v0.h deleted file mode 100644 index af3c33ea54..0000000000 --- a/ecal/service/ecal_service/src/server_session_impl_v0.h +++ /dev/null @@ -1,107 +0,0 @@ -/* ========================= eCAL LICENSE ===== ============================ - * - * Copyright (C) 2016 - 2019 Continental Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ========================= eCAL LICENSE ================================= -*/ - -#pragma once - -#include "server_session_impl_base.h" -#include -#include -#include -#include - -#include - -#include -#include -#include - -namespace eCAL -{ - namespace service - { - class ServerSessionV0 - : public ServerSessionBase - , public std::enable_shared_from_this - { - - /////////////////////////////////////////////// - // Create, Constructor, Destructor - /////////////////////////////////////////////// - - public: - static std::shared_ptr create(const std::shared_ptr& io_context - , const ServerServiceCallbackT& service_callback - , const std::shared_ptr& service_callback_strand - , const ServerEventCallbackT& event_callback - , const ShutdownCallbackT& shutdown_callback - , const LoggerT& logger); - - protected: - ServerSessionV0(const std::shared_ptr& io_context - , const ServerServiceCallbackT& service_callback - , const std::shared_ptr& service_callback_strand - , const ServerEventCallbackT& event_callback - , const ShutdownCallbackT& shutdown_callback - , const LoggerT& logger); - - public: - // Copy - ServerSessionV0(const ServerSessionV0&) = delete; - ServerSessionV0& operator=(const ServerSessionV0&) = delete; - - // Move - ServerSessionV0(ServerSessionV0&&) noexcept = delete; - ServerSessionV0& operator=(ServerSessionV0&&) noexcept = delete; - - // Destructor - ~ServerSessionV0() override; - - /////////////////////////////////////////////// - // Data receiving and sending - /////////////////////////////////////////////// - public: - void start() override; - - private: - void handle_start(); - - public: - void stop() override; - - eCAL::service::State get_state() const override; - - private: - void handle_read(const asio::error_code& ec, size_t bytes_transferred, const std::shared_ptr& request); - - void handle_write(const asio::error_code& ec, std::size_t /*bytes_transferred*/); - - ///////////////////////////////////// - // Member variables - ///////////////////////////////////// - private: - const LoggerT logger_; - - std::atomic state_; - - enum { max_length = 64 * 1024 }; - char data_[max_length]{}; - }; - - } -} From 611d901d2449de070a235ef891560e4e86dad605 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Fri, 6 Dec 2024 13:24:05 +0100 Subject: [PATCH 2/2] service test adapted --- ecal/service/test/src/ecal_tcp_service_test.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ecal/service/test/src/ecal_tcp_service_test.cpp b/ecal/service/test/src/ecal_tcp_service_test.cpp index cd5abf2fcb..bfdfb756db 100644 --- a/ecal/service/test/src/ecal_tcp_service_test.cpp +++ b/ecal/service/test/src/ecal_tcp_service_test.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,7 +57,7 @@ eCAL::service::LoggerT critical_logger(const std::string& node_name) }; } -constexpr std::uint8_t min_protocol_version = 0; +constexpr std::uint8_t min_protocol_version = 1; constexpr std::uint8_t max_protocol_version = 1; #if 1