Skip to content

Commit

Permalink
Tcp transport fixes
Browse files Browse the repository at this point in the history
o Flip do-while loop for epoll return processing for both transmit
  and receive path.
o Handle activateEndpoint failures.
o Remove writeLock() in addEndpoint since all endpoint list
  management is alreay locked by global tcpTransport write lock.

Signed-off-by: Bernard Metzler <[email protected]>
  • Loading branch information
BernardMetzler committed Oct 4, 2023
1 parent 20d24b4 commit fd73208
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
33 changes: 20 additions & 13 deletions src/libgeds/TcpTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ void TcpTransport::stop() {
if (eventFd > 0) {
u_int64_t buf = 1;
LOG_DEBUG("TCP Transport: write CTL fd");
write(eventFd, &buf, 8);
if (write(eventFd, &buf, 8) <= 0)
perror("TCP Transport writing control socket failed: ");
}

std::vector<std::shared_ptr<TcpPeer>> tcpPeerV;
Expand Down Expand Up @@ -334,7 +335,7 @@ void TcpTransport::tcpTxThread(unsigned int id) {
perror("epoll_ctl: ");
}
}
do {
while (isServing) {
int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, -1);

for (int i = 0; i < cnt; i++) {
Expand Down Expand Up @@ -383,7 +384,7 @@ void TcpTransport::tcpTxThread(unsigned int id) {
tcpPeers.remove(tcpPeer->Id);
}
}
} while (isServing);
};
if (eventFd > 0)
epoll_ctl(poll_fd, EPOLL_CTL_DEL, eventFd, NULL);

Expand Down Expand Up @@ -702,7 +703,7 @@ void TcpTransport::tcpRxThread(unsigned int id) {
}
}

do {
while (isServing) {
int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, -1);

for (int i = 0; i < cnt; i++) {
Expand Down Expand Up @@ -751,7 +752,7 @@ void TcpTransport::tcpRxThread(unsigned int id) {
}
}
}
} while (isServing);
};
if (eventFd > 0)
epoll_ctl(poll_fd, EPOLL_CTL_DEL, eventFd, NULL);

Expand Down Expand Up @@ -843,13 +844,16 @@ bool TcpTransport::addEndpointPassive(int sock) {

tep->sock = sock;
tcpPeer->addEndpoint(tep);
activateEndpoint(tep, tcpPeer);
LOG_DEBUG("Server with ", tcpPeer->endpoints.size(),
" connections to ", hostname, "::", in_peer->sin_port);
return true;
if (activateEndpoint(tep, tcpPeer) == true) {
LOG_DEBUG("Server with ", tcpPeer->endpoints.size(),
" connections to ", hostname, "::", in_peer->sin_port);
return true;
}
tcpPeer->delEndpoint(tep);
LOG_ERROR("Server failed adding connection to ", hostname, "::", in_peer->sin_port);
}
/*
* Will kill this extra connection probably created during cross-connect
* Drop this extra connection probably created during cross-connect
* from both sides
*/
LOG_DEBUG("Server with ", tcpPeer->endpoints.size() + 1,
Expand Down Expand Up @@ -915,10 +919,13 @@ std::shared_ptr<TcpPeer> TcpTransport::getPeer(sockaddr *peer) {
std::shared_ptr<TcpEndpoint> tep = std::make_shared<TcpEndpoint>();
tep->sock = sock;
tcpPeer->addEndpoint(tep);
activateEndpoint(tep, tcpPeer);

if (activateEndpoint(tep, tcpPeer) == false) {
tcpPeer->delEndpoint(tep);
::close(sock);
LOG_ERROR("Client failed adding connection to ", hostname, "::", inaddr->sin_port);
break;
}
num_ep++;

LOG_DEBUG("Client with ", num_ep, " connections to ", hostname, "::", inaddr->sin_port);
}
if (num_ep)
Expand Down
6 changes: 5 additions & 1 deletion src/libgeds/TcpTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,13 @@ class TcpPeer : public std::enable_shared_from_this<TcpPeer>, utility::RWConcurr
sendRpcRequest(uint64_t dest, std::string name, size_t src_off, size_t len);
int sendRpcReply(uint64_t reqId, int in_fd, uint64_t start, size_t len, int status);
void addEndpoint(std::shared_ptr<TcpEndpoint> tep) {
auto lock = getWriteLock();
// Caller must hold TcpTransports write lock
endpoints.emplace(tep->sock, tep);
};
void delEndpoint(std::shared_ptr<TcpEndpoint> tep) {
// Caller must hold TcpTransport write lock
endpoints.erase(tep->sock);
};
TcpPeer(std::string name, std::shared_ptr<GEDS> geds, TcpTransport &tcpTransport)
: Id(SStringHash(name)), _geds(std::move(geds)), _tcpTransport(tcpTransport),
hostname(std::move(name)){};
Expand Down

0 comments on commit fd73208

Please sign in to comment.