diff --git a/srtcore/api.cpp b/srtcore/api.cpp index bb5dd64fe..45cfaca7f 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -833,6 +833,9 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, ns->removeFromGroup(true); } #endif + // You won't be updating any EIDs anymore. + m_EPoll.wipe_usock(id, ns->core().m_sPollID); + m_Sockets.erase(id); m_ClosedSockets[id] = ns; } @@ -1557,6 +1560,9 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i targets[tii].errorcode = e.getErrorCode(); targets[tii].id = CUDT::INVALID_SOCK; + // You won't be updating any EIDs anymore. + m_EPoll.wipe_usock(ns->m_SocketID, ns->core().m_sPollID); + ScopedLock cl(m_GlobControlLock); ns->removeFromGroup(false); m_Sockets.erase(ns->m_SocketID); @@ -1571,6 +1577,8 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i targets[tii].id = CUDT::INVALID_SOCK; ScopedLock cl(m_GlobControlLock); ns->removeFromGroup(false); + // You won't be updating any EIDs anymore. + m_EPoll.wipe_usock(ns->m_SocketID, ns->core().m_sPollID); m_Sockets.erase(ns->m_SocketID); // Intercept to delete the socket on failure. delete ns; @@ -2063,6 +2071,9 @@ int srt::CUDTUnited::close(CUDTSocket* s) } #endif + // You won't be updating any EIDs anymore. + m_EPoll.wipe_usock(s->m_SocketID, s->core().m_sPollID); + m_Sockets.erase(s->m_SocketID); m_ClosedSockets[s->m_SocketID] = s; HLOGC(smlog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later."); @@ -2851,6 +2862,10 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) CUDTSocket* as = si->second; as->breakSocket_LOCKED(); + + // You won't be updating any EIDs anymore. + m_EPoll.wipe_usock(as->m_SocketID, as->core().m_sPollID); + m_ClosedSockets[q->first] = as; m_Sockets.erase(q->first); } @@ -2870,7 +2885,8 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) * remains forever causing epoll_wait to unblock continuously for inexistent * sockets. Get rid of all events for this socket. */ - m_EPoll.update_events(u, s->core().m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, false); + // (just in case, this should be wiped out already) + m_EPoll.wipe_usock(u, s->core().m_sPollID); // delete this one m_ClosedSockets.erase(i); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index eca2b2069..4ef1454bc 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -6282,42 +6282,7 @@ bool srt::CUDT::closeInternal() ATR_NOEXCEPT * it would remove the socket from the EPoll after close. */ - // Make a copy under a lock because other thread might access it - // at the same time. - enterCS(uglobal().m_EPoll.m_EPollLock); - set epollid = m_sPollID; - leaveCS(uglobal().m_EPoll.m_EPollLock); - - // trigger any pending IO events. - HLOGC(smlog.Debug, log << CONID() << "close: SETTING ERR readiness on E" << Printable(epollid)); - uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_ERR, true); - // then remove itself from all epoll monitoring - int no_events = 0; - for (set::iterator i = epollid.begin(); i != epollid.end(); ++i) - { - HLOGC(smlog.Debug, log << CONID() << "close: CLEARING subscription on E" << (*i)); - try - { - uglobal().m_EPoll.update_usock(*i, m_SocketID, &no_events); - } - catch (...) - { - // The goal of this loop is to remove all subscriptions in - // the epoll system to this socket. If it's unsubscribed already, - // that's even better. - } - HLOGC(smlog.Debug, log << CONID() << "close: removing E" << (*i) << " from back-subscribers"); - } - - // Not deleting elements from m_sPollID inside the loop because it invalidates - // the control iterator of the loop. Instead, all will be removed at once. - - // IMPORTANT: there's theoretically little time between setting ERR readiness - // and unsubscribing, however if there's an application waiting on this event, - // it should be informed before this below instruction locks the epoll mutex. - enterCS(uglobal().m_EPoll.m_EPollLock); - m_sPollID.clear(); - leaveCS(uglobal().m_EPoll.m_EPollLock); + uglobal().m_EPoll.wipe_usock(m_SocketID, m_sPollID); // XXX What's this, could any of the above actions make it !m_bOpened? if (!m_bOpened) diff --git a/srtcore/epoll.cpp b/srtcore/epoll.cpp index 8cd8440c7..bc52ec10b 100644 --- a/srtcore/epoll.cpp +++ b/srtcore/epoll.cpp @@ -945,6 +945,33 @@ int srt::CEPoll::update_events(const SRTSOCKET& uid, std::set& eids, const return nupdated; } +/// This is a simple function which removes the socket from epoll system. +/// The subscription list should be provided in the @a eids container and +/// the socket is removed from each of them, then this is cleared. This +/// should be the socket's private EID container that keeps EIDs that it +/// should update when an appropriate event comes. +/// +/// @param uid Socket ID that has to be removed from the epoll system +/// @param eids EIDs that the given socket believes being subscribed in +void srt::CEPoll::wipe_usock(const SRTSOCKET uid, std::set& eids) +{ + ScopedLock pg (m_EPollLock); + for (set::iterator i = eids.begin(); i != eids.end(); ++ i) + { + map::iterator p = m_mPolls.find(*i); + if (p == m_mPolls.end()) + { + HLOGC(eilog.Note, log << "epoll/wipe: E" << *i << " was deleted in the meantime"); + continue; + } + + CEPollDesc& ed = p->second; + ed.removeSubscription(uid); + } + + eids.clear(); +} + // Debug use only. #if ENABLE_HEAVY_LOGGING namespace srt diff --git a/srtcore/epoll.h b/srtcore/epoll.h index 00d46ceb4..3ac190186 100644 --- a/srtcore/epoll.h +++ b/srtcore/epoll.h @@ -489,6 +489,8 @@ friend class srt::CRendezvousQueue; int update_events(const SRTSOCKET& uid, std::set& eids, int events, bool enable); + void wipe_usock(const SRTSOCKET uid, std::set& eids); + int setflags(const int eid, int32_t flags); private: diff --git a/test/test_bonding.cpp b/test/test_bonding.cpp index 4ec4a84f5..207f44065 100644 --- a/test/test_bonding.cpp +++ b/test/test_bonding.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include "gtest/gtest.h" @@ -62,6 +63,8 @@ TEST(Bonding, SRTConnectGroup) #define EXPECT_SRT_SUCCESS(callform) EXPECT_NE(callform, -1) << "SRT ERROR: " << srt_getlasterror_str() +static std::mutex g_listening_stopped; + void listening_thread(bool should_read) { const SRTSOCKET server_sock = srt_create_socket(); @@ -118,6 +121,9 @@ void listening_thread(bool should_read) } } + std::cout << "Listen: wait for green light from the caller...\n"; + std::unique_lock listen_lock (g_listening_stopped); + srt_close(acp); srt_close(server_sock); @@ -135,8 +141,8 @@ int g_nfailed = 0; void ConnectCallback(void* , SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token) { std::cout << "Connect callback. Socket: " << sock - << ", error: " << error - << ", token: " << token << '\n'; + << ", error: " << error << " (" << srt_strerror(error, 0) + << "), token: " << token << '\n'; if (error == SRT_SUCCESS) ++g_nconnected; @@ -171,6 +177,10 @@ TEST(Bonding, NonBlockingGroupConnect) sockaddr_in safail = sa; safail.sin_port = htons(4201); // port where we have no listener + // We need to keep the listener with the socket without closing it + // until we are done. + std::unique_lock listen_lock (g_listening_stopped); + std::future listen_promise = std::async(std::launch::async, std::bind(&listening_thread, false)); std::cout << "Connecting two sockets " << std::endl; @@ -203,7 +213,7 @@ TEST(Bonding, NonBlockingGroupConnect) write, &wlen, 5000, /* timeout */ 0, 0, 0, 0); - + std::cout << "Epoll result: " << epoll_res << '\n'; std::cout << "Epoll rlen: " << rlen << ", wlen: " << wlen << '\n'; for (int i = 0; i < rlen; ++i) @@ -212,10 +222,14 @@ TEST(Bonding, NonBlockingGroupConnect) } for (int i = 0; i < wlen; ++i) { - std::cout << "Epoll write[" << i << "]: " << write[i] << " (removed from epoll)\n"; + SRT_SOCKSTATUS st = srt_getsockstate(write[i]); + std::cout << "Epoll write[" << i << "]: " << write[i] + << " ST:" << srt_logging::SockStatusStr(st) + << " (removing from epoll)\n"; EXPECT_EQ(srt_epoll_remove_usock(poll_id, write[i]), 0); } } + listen_lock.unlock(); // give green light to the listener so that it closes sockets. listen_promise.wait();