From fd73208b8d73f07c3d3483a4a8a21d77de846154 Mon Sep 17 00:00:00 2001 From: Bernard Metzler Date: Tue, 12 Sep 2023 10:51:35 +0200 Subject: [PATCH] Tcp transport fixes o Flip do-while loop for epoll return processing for both transmit and receive path. o Handle activateEndpoint failures. o Remove writeLock() in addEndpoint since all endpoint list management is alreay locked by global tcpTransport write lock. Signed-off-by: Bernard Metzler --- src/libgeds/TcpTransport.cpp | 33 ++++++++++++++++++++------------- src/libgeds/TcpTransport.h | 6 +++++- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/src/libgeds/TcpTransport.cpp b/src/libgeds/TcpTransport.cpp index 20563692..98b58340 100644 --- a/src/libgeds/TcpTransport.cpp +++ b/src/libgeds/TcpTransport.cpp @@ -90,7 +90,8 @@ void TcpTransport::stop() { if (eventFd > 0) { u_int64_t buf = 1; LOG_DEBUG("TCP Transport: write CTL fd"); - write(eventFd, &buf, 8); + if (write(eventFd, &buf, 8) <= 0) + perror("TCP Transport writing control socket failed: "); } std::vector> tcpPeerV; @@ -334,7 +335,7 @@ void TcpTransport::tcpTxThread(unsigned int id) { perror("epoll_ctl: "); } } - do { + while (isServing) { int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, -1); for (int i = 0; i < cnt; i++) { @@ -383,7 +384,7 @@ void TcpTransport::tcpTxThread(unsigned int id) { tcpPeers.remove(tcpPeer->Id); } } - } while (isServing); + }; if (eventFd > 0) epoll_ctl(poll_fd, EPOLL_CTL_DEL, eventFd, NULL); @@ -702,7 +703,7 @@ void TcpTransport::tcpRxThread(unsigned int id) { } } - do { + while (isServing) { int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, -1); for (int i = 0; i < cnt; i++) { @@ -751,7 +752,7 @@ void TcpTransport::tcpRxThread(unsigned int id) { } } } - } while (isServing); + }; if (eventFd > 0) epoll_ctl(poll_fd, EPOLL_CTL_DEL, eventFd, NULL); @@ -843,13 +844,16 @@ bool TcpTransport::addEndpointPassive(int sock) { tep->sock = sock; tcpPeer->addEndpoint(tep); - activateEndpoint(tep, tcpPeer); - LOG_DEBUG("Server with ", tcpPeer->endpoints.size(), - " connections to ", hostname, "::", in_peer->sin_port); - return true; + if (activateEndpoint(tep, tcpPeer) == true) { + LOG_DEBUG("Server with ", tcpPeer->endpoints.size(), + " connections to ", hostname, "::", in_peer->sin_port); + return true; + } + tcpPeer->delEndpoint(tep); + LOG_ERROR("Server failed adding connection to ", hostname, "::", in_peer->sin_port); } /* - * Will kill this extra connection probably created during cross-connect + * Drop this extra connection probably created during cross-connect * from both sides */ LOG_DEBUG("Server with ", tcpPeer->endpoints.size() + 1, @@ -915,10 +919,13 @@ std::shared_ptr TcpTransport::getPeer(sockaddr *peer) { std::shared_ptr tep = std::make_shared(); tep->sock = sock; tcpPeer->addEndpoint(tep); - activateEndpoint(tep, tcpPeer); - + if (activateEndpoint(tep, tcpPeer) == false) { + tcpPeer->delEndpoint(tep); + ::close(sock); + LOG_ERROR("Client failed adding connection to ", hostname, "::", inaddr->sin_port); + break; + } num_ep++; - LOG_DEBUG("Client with ", num_ep, " connections to ", hostname, "::", inaddr->sin_port); } if (num_ep) diff --git a/src/libgeds/TcpTransport.h b/src/libgeds/TcpTransport.h index 93d30047..61c9cdd2 100644 --- a/src/libgeds/TcpTransport.h +++ b/src/libgeds/TcpTransport.h @@ -182,9 +182,13 @@ class TcpPeer : public std::enable_shared_from_this, utility::RWConcurr sendRpcRequest(uint64_t dest, std::string name, size_t src_off, size_t len); int sendRpcReply(uint64_t reqId, int in_fd, uint64_t start, size_t len, int status); void addEndpoint(std::shared_ptr tep) { - auto lock = getWriteLock(); + // Caller must hold TcpTransports write lock endpoints.emplace(tep->sock, tep); }; + void delEndpoint(std::shared_ptr tep) { + // Caller must hold TcpTransport write lock + endpoints.erase(tep->sock); + }; TcpPeer(std::string name, std::shared_ptr geds, TcpTransport &tcpTransport) : Id(SStringHash(name)), _geds(std::move(geds)), _tcpTransport(tcpTransport), hostname(std::move(name)){};