Skip to content

Commit

Permalink
added method tcpconnection::flush()
Browse files Browse the repository at this point in the history
  • Loading branch information
EmielBruijntjes committed Sep 19, 2016
1 parent 5bb7b1a commit ed2ffd3
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 7 deletions.
8 changes: 8 additions & 0 deletions include/tcpconnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ class TcpConnection :
*/
void process(int fd, int flags);

/**
* Flush the connection - all unsent bytes are sent to the socket rigth away
* This is a blocking operation. The connection object normally only sends data
* when the socket is known to be writable, but with this method you can force
* the outgoing buffer to be fushed
*/
void flush();

/**
* Close the connection
* This closes all channels and the TCP connection
Expand Down
2 changes: 1 addition & 1 deletion src/tcpclosed.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Class that is used when the TCP connection ends up in a closed state
*
* @author Emiel Bruijntjes <[email protected]>
* @copyright 2015 Copernica BV
* @copyright 2015 - 2016 Copernica BV
*/

/**
Expand Down
46 changes: 46 additions & 0 deletions src/tcpconnected.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,29 @@ class TcpConnected : public TcpState, private Watchable
return monitor.valid() ? new TcpClosed(this) : nullptr;
}

/**
* Wait until the socket is writable
* @return bool
*/
bool wait4writable()
{
// we need the fd-sets
fd_set readables, writables, exceptions;

// initialize all the sets
FD_ZERO(&readables);
FD_ZERO(&writables);
FD_ZERO(&exceptions);

// add the one socket
FD_SET(_socket, &writables);

// wait for the socket
auto result = select(_socket + 1, &readables, &writables, &exceptions, nullptr);

// check for success
return result == 0;
}

public:
/**
Expand Down Expand Up @@ -203,6 +226,29 @@ class TcpConnected : public TcpState, private Watchable
// start monitoring the socket to find out when it is writable
_handler->monitor(_connection, _socket, readable | writable);
}

/**
* Flush the connection, sent all buffered data to the socket
* @return TcpState new tcp state
*/
virtual TcpState *flush() override
{
// keep running until the out buffer is empty
while (_out)
{
// poll the socket, is it already writable?
if (!wait4writable()) return this;

// socket is writable, send as much data as possible
auto *newstate = process(_socket, writable);

// are we done
if (newstate != this) return newstate;
}

// all has been sent
return this;
}

/**
* Report that heartbeat negotiation is going on
Expand Down
27 changes: 26 additions & 1 deletion src/tcpconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Implementation file for the TCP connection
*
* @author Emiel Bruijntjes <[email protected]>
* @copyright 2015 Copernica BV
* @copyright 2015 - 2016 Copernica BV
*/

/**
Expand Down Expand Up @@ -56,6 +56,31 @@ void TcpConnection::process(int fd, int flags)
_state.reset(result);
}

/**
* Flush the tcp connection
*/
void TcpConnection::flush()
{
// monitor the object for destruction
Monitor monitor(this);

// keep looping
while (true)
{
// flush the object
auto *newstate = _state->flush();

// done if object no longer exists
if (!monitor.valid()) return;

// also done if the object is still in the same state
if (newstate == _state.get()) return;

// replace the new state
_state.reset(newstate);
}
}

/**
* Method that is called when the heartbeat frequency is negotiated.
* @param connection The connection that suggested a heartbeat interval
Expand Down
27 changes: 23 additions & 4 deletions src/tcpresolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* server, and to make the initial connection
*
* @author Emiel Bruijntjes <[email protected]>
* @copyright 2015 Copernica BV
* @copyright 2015 - 2016 Copernica BV
*/

/**
Expand Down Expand Up @@ -173,9 +173,9 @@ class TcpResolver : public TcpState

/**
* Wait for the resolver to be ready
* @param fd The filedescriptor that is active
* @param flags Flags to indicate that fd is readable and/or writable
* @return New implementation object
* @param fd The filedescriptor that is active
* @param flags Flags to indicate that fd is readable and/or writable
* @return New implementation object
*/
virtual TcpState *process(int fd, int flags) override
{
Expand All @@ -191,6 +191,25 @@ class TcpResolver : public TcpState
// create dummy implementation
return new TcpClosed(_connection, _handler);
}

/**
* Flush state / wait for the connection to complete
* @return New implementation object
*/
virtual TcpState *flush() override
{
// just wait for the other thread to be ready
_thread.join();

// do we have a valid socket?
if (_socket >= 0) return new TcpConnected(_connection, _socket, std::move(_buffer), _handler);

// report error
_handler->onError(_connection, _error.data());

// create dummy implementation
return new TcpClosed(_connection, _handler);
}

/**
* Send data over the connection
Expand Down
8 changes: 7 additions & 1 deletion src/tcpstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Base class / interface of the various states of the TCP connection
*
* @author Emiel Bruijntjes <[email protected]>
* @copyright 2015 Copernica BV
* @copyright 2015 - 2016 Copernica BV
*/

/**
Expand Down Expand Up @@ -90,6 +90,12 @@ class TcpState
return _handler->onNegotiate(_connection, heartbeat);
}

/**
* Flush the connection
* @return TcpState new implementation object
*/
virtual TcpState *flush() { return this; }

/**
* Report to the handler that the object is in an error state
* @param error
Expand Down

0 comments on commit ed2ffd3

Please sign in to comment.