Skip to content

Commit

Permalink
-fix RegSyncClient so that it detects breaks in the socket connection…
Browse files Browse the repository at this point in the history
… to the server - by using keepalives

 -if we try to send on the socket periodically then we will detect connection errors
-made EGAIN and EWOULDBLOCK processing the same in a few additional spots 
-add WinLeakCheck header to some new Websocket class files to aid in leak detection on windows


git-svn-id: https://svn.resiprocate.org/rep/resiprocate/main@10968 ddefafc4-47db-0310-ae44-fa13212b10f2
  • Loading branch information
sgodin committed Feb 14, 2014
1 parent d5cd4c7 commit d2dcdaf
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 24 deletions.
2 changes: 0 additions & 2 deletions reflow/ErrorCode.hxx
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#ifndef FM_ERRORCODE_HXX
#define FM_ERRORCODE_HXX

#include <asio/error_code.hpp>

namespace flowmanager {

typedef int ErrorType;
Expand Down
5 changes: 4 additions & 1 deletion repro/HttpBase.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ HttpBase::process(FdSet& fdset)
int e = getErrno();
switch (e)
{
case EWOULDBLOCK:
case EAGAIN:
#if EAGAIN != EWOULDBLOCK
case EWOULDBLOCK: // Treat EGAIN and EWOULDBLOCK as the same: http://stackoverflow.com/questions/7003234/which-systems-define-eagain-and-ewouldblock-as-different-values
#endif
// !jf! this can not be ready in some cases
return;
default:
Expand Down
84 changes: 70 additions & 14 deletions repro/RegSyncClient.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ RegSyncClient::thread()
mSocketDesc = (int)socket(servAddr.ipVersion() == V6 ? PF_INET6 : PF_INET , SOCK_STREAM, 0);
if(mSocketDesc < 0)
{
ErrLog(<< "RegSyncClient: cannot open socket");
int e = getErrno();
ErrLog(<< "RegSyncClient: cannot open socket, err=" << e);
mSocketDesc = 0;
return;
}
Expand All @@ -102,7 +103,8 @@ RegSyncClient::thread()
rc = ::bind(mSocketDesc, &localAddr.getMutableSockaddr(), localAddr.length());
if(rc < 0)
{
ErrLog(<<"RegSyncClient: error binding locally");
int e = getErrno();
ErrLog(<<"RegSyncClient: error binding locally, err=" << e);
closeSocket(mSocketDesc);
mSocketDesc = 0;
return;
Expand All @@ -112,7 +114,8 @@ RegSyncClient::thread()
rc = ::connect(mSocketDesc, &servAddr.getMutableSockaddr(), servAddr.length());
if(rc < 0)
{
if(!mShutdown) ErrLog(<< "RegSyncClient: error connecting to " << mAddress << ":" << mPort);
int e = getErrno();
if(!mShutdown) ErrLog(<< "RegSyncClient: error connecting to " << mAddress << ":" << mPort << ", err=" << e);
closeSocket(mSocketDesc);
mSocketDesc = 0;
delaySeconds(30);
Expand All @@ -128,27 +131,78 @@ RegSyncClient::thread()
rc = ::send(mSocketDesc, request.c_str(), (int)request.size(), 0);
if(rc < 0)
{
if(!mShutdown) ErrLog(<< "RegSyncClient: error sending");
int e = getErrno();
if(!mShutdown) ErrLog(<< "RegSyncClient: error sending, err=" << e);
closeSocket(mSocketDesc);
mSocketDesc = 0;
continue;
}

// Make socket non blocking
bool ok = makeSocketNonBlocking(mSocketDesc);
if (!ok)
{
int e = getErrno();
ErrLog(<< "RegSyncClient: Could not make HTTP socket non-blocking, err=" << e);
closeSocket(mSocketDesc);
mSocketDesc = 0;
continue;
}

while(rc > 0)
{
rc = ::recv(mSocketDesc, (char*)&mRxBuffer, sizeof(mRxBuffer), 0);
if(rc < 0)
FdSet fdset;
fdset.setRead(mSocketDesc);
fdset.setExcept(mSocketDesc);
timeval tm;
tm.tv_sec = 30; // TODO - make a setting?
tm.tv_usec = 0;
rc = fdset.select(tm);
if(rc > 0)
{
rc = ::recv(mSocketDesc, (char*)&mRxBuffer, sizeof(mRxBuffer), 0);
if(rc < 0)
{
int e = getErrno();
if(!mShutdown) ErrLog(<< "RegSyncClient: error receiving, err=" << e);
closeSocket(mSocketDesc);
mSocketDesc = 0;
break;
}

if(rc > 0)
{
mRxDataBuffer += Data(Data::Borrow, (const char*)&mRxBuffer, rc);
while(tryParse());
}
}
else if(rc == 0) // timeout - send keepalive
{
if(!mShutdown) ErrLog(<< "RegSyncClient: error receiving");
closeSocket(mSocketDesc);
mSocketDesc = 0;
break;
rc = ::send(mSocketDesc, Symbols::CRLFCRLF, (int)request.size(), 0);
if(rc < 0)
{
int e = getErrno();
// If send is blocking then we must have pending send data - so just ignore error - no need to keepalive
if ( e != EAGAIN && e != EWOULDBLOCK ) // Treat EGAIN and EWOULDBLOCK as the same: http://stackoverflow.com/questions/7003234/which-systems-define-eagain-and-ewouldblock-as-different-values
{
if(!mShutdown) ErrLog(<< "RegSyncClient: error sending keepalive, err=" << e);
closeSocket(mSocketDesc);
mSocketDesc = 0;
continue;
}
}
//else
//{
// InfoLog( << "RegSyncClient: keepalive sent!");
//}
}

if(rc > 0)
else
{
mRxDataBuffer += Data(Data::Borrow, (const char*)&mRxBuffer, rc);
while(tryParse());
int e = getErrno();
if(!mShutdown) ErrLog(<< "RegSyncClient: error calling select, err=" << e);
closeSocket(mSocketDesc);
mSocketDesc = 0;
break;
}
}
} // end while
Expand Down Expand Up @@ -369,6 +423,8 @@ RegSyncClient::processModify(const resip::Uri& aor, ContactList& syncContacts)
bool found;
for(; itSync != syncContacts.end(); itSync++)
{
InfoLog(<< " RegSyncClient::processModify: contact=" << itSync->mContact << ", instance=" << itSync->mInstance << ", regid=" << itSync->mRegId);

// See if contact already exists in currentContacts
found = false;
for(itCurrent = currentContacts.begin(); itCurrent != currentContacts.end(); itCurrent++)
Expand Down
1 change: 0 additions & 1 deletion repro/RegSyncServerThread.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include <rutil/Logger.hxx>

#include "repro/RegSyncServer.hxx"
#include "repro/RegSyncClient.hxx"
#include "repro/RegSyncServerThread.hxx"

#define RESIPROCATE_SUBSYSTEM Subsystem::REPRO
Expand Down
5 changes: 4 additions & 1 deletion repro/XmlRpcServerBase.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ XmlRpcServerBase::process(FdSet& fdset)
int e = getErrno();
switch (e)
{
case EWOULDBLOCK:
case EAGAIN:
#if EAGAIN != EWOULDBLOCK
case EWOULDBLOCK: // Treat EGAIN and EWOULDBLOCK as the same: http://stackoverflow.com/questions/7003234/which-systems-define-eagain-and-ewouldblock-as-different-values
#endif
return;
default:
logSocketError(e);
Expand Down
10 changes: 8 additions & 2 deletions resip/stack/TcpBaseTransport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ TcpBaseTransport::processListen()
int e = getErrno();
switch (e)
{
case EWOULDBLOCK:
case EAGAIN:
#if EAGAIN != EWOULDBLOCK
case EWOULDBLOCK: // Treat EGAIN and EWOULDBLOCK as the same: http://stackoverflow.com/questions/7003234/which-systems-define-eagain-and-ewouldblock-as-different-values
#endif
// !jf! this can not be ready in some cases
// !kw! this will happen every epoll cycle
return 0;
Expand Down Expand Up @@ -245,7 +248,10 @@ TcpBaseTransport::makeOutgoingConnection(const Tuple &dest,
switch (err)
{
case EINPROGRESS:
case EWOULDBLOCK:
case EAGAIN:
#if EAGAIN != EWOULDBLOCK
case EWOULDBLOCK: // Treat EGAIN and EWOULDBLOCK as the same: http://stackoverflow.com/questions/7003234/which-systems-define-eagain-and-ewouldblock-as-different-values
#endif
break;
default:
{
Expand Down
2 changes: 1 addition & 1 deletion resip/stack/WsDecorator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "SipMessage.hxx"
#include "Tuple.hxx"
#include "Transport.hxx"

#include "rutil/WinLeakCheck.hxx"

using namespace std;
using namespace resip;
Expand Down
1 change: 1 addition & 0 deletions resip/stack/WsFrameExtractor.cxx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

#include "rutil/Logger.hxx"
#include "resip/stack/WsFrameExtractor.hxx"
#include "rutil/WinLeakCheck.hxx"

using namespace resip;

Expand Down
7 changes: 5 additions & 2 deletions rutil/SelectInterruptor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,16 @@ SelectInterruptor::interrupt()
assert(count == sizeof(wakeUp));
#else
ssize_t res = write(mPipe[1], wakeUp, sizeof(wakeUp));
if ( res == -1 && errno==EAGAIN )

if ( res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK) ) // Treat EGAIN and EWOULDBLOCK as the same: http://stackoverflow.com/questions/7003234/which-systems-define-eagain-and-ewouldblock-as-different-values
{
; // this can happen when SipStack thread gets behind.
// no need to block since our only purpose is to wake up the thread
// also, this write can occur within the SipStack thread, in which
// case we get dead-lock if this blocks
} else {
}
else
{
assert(res == sizeof(wakeUp));
}
#endif
Expand Down

0 comments on commit d2dcdaf

Please sign in to comment.