From ed2ffd3f03f2bba11f8af68c69ec96ab40a5c4b7 Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Mon, 19 Sep 2016 10:19:06 +0200 Subject: [PATCH] added method tcpconnection::flush() --- include/tcpconnection.h | 8 +++++++ src/tcpclosed.h | 2 +- src/tcpconnected.h | 46 +++++++++++++++++++++++++++++++++++++++++ src/tcpconnection.cpp | 27 +++++++++++++++++++++++- src/tcpresolver.h | 27 ++++++++++++++++++++---- src/tcpstate.h | 8 ++++++- 6 files changed, 111 insertions(+), 7 deletions(-) diff --git a/include/tcpconnection.h b/include/tcpconnection.h index 76147ee0..632063d7 100644 --- a/include/tcpconnection.h +++ b/include/tcpconnection.h @@ -130,6 +130,14 @@ class TcpConnection : */ void process(int fd, int flags); + /** + * Flush the connection - all unsent bytes are sent to the socket rigth away + * This is a blocking operation. The connection object normally only sends data + * when the socket is known to be writable, but with this method you can force + * the outgoing buffer to be fushed + */ + void flush(); + /** * Close the connection * This closes all channels and the TCP connection diff --git a/src/tcpclosed.h b/src/tcpclosed.h index cd980cda..71ca4ee9 100644 --- a/src/tcpclosed.h +++ b/src/tcpclosed.h @@ -4,7 +4,7 @@ * Class that is used when the TCP connection ends up in a closed state * * @author Emiel Bruijntjes - * @copyright 2015 Copernica BV + * @copyright 2015 - 2016 Copernica BV */ /** diff --git a/src/tcpconnected.h b/src/tcpconnected.h index ded777a1..df5e5b90 100644 --- a/src/tcpconnected.h +++ b/src/tcpconnected.h @@ -83,6 +83,29 @@ class TcpConnected : public TcpState, private Watchable return monitor.valid() ? new TcpClosed(this) : nullptr; } + /** + * Wait until the socket is writable + * @return bool + */ + bool wait4writable() + { + // we need the fd-sets + fd_set readables, writables, exceptions; + + // initialize all the sets + FD_ZERO(&readables); + FD_ZERO(&writables); + FD_ZERO(&exceptions); + + // add the one socket + FD_SET(_socket, &writables); + + // wait for the socket + auto result = select(_socket + 1, &readables, &writables, &exceptions, nullptr); + + // check for success + return result == 0; + } public: /** @@ -203,6 +226,29 @@ class TcpConnected : public TcpState, private Watchable // start monitoring the socket to find out when it is writable _handler->monitor(_connection, _socket, readable | writable); } + + /** + * Flush the connection, sent all buffered data to the socket + * @return TcpState new tcp state + */ + virtual TcpState *flush() override + { + // keep running until the out buffer is empty + while (_out) + { + // poll the socket, is it already writable? + if (!wait4writable()) return this; + + // socket is writable, send as much data as possible + auto *newstate = process(_socket, writable); + + // are we done + if (newstate != this) return newstate; + } + + // all has been sent + return this; + } /** * Report that heartbeat negotiation is going on diff --git a/src/tcpconnection.cpp b/src/tcpconnection.cpp index 943d08bd..cf31fe7c 100644 --- a/src/tcpconnection.cpp +++ b/src/tcpconnection.cpp @@ -4,7 +4,7 @@ * Implementation file for the TCP connection * * @author Emiel Bruijntjes - * @copyright 2015 Copernica BV + * @copyright 2015 - 2016 Copernica BV */ /** @@ -56,6 +56,31 @@ void TcpConnection::process(int fd, int flags) _state.reset(result); } +/** + * Flush the tcp connection + */ +void TcpConnection::flush() +{ + // monitor the object for destruction + Monitor monitor(this); + + // keep looping + while (true) + { + // flush the object + auto *newstate = _state->flush(); + + // done if object no longer exists + if (!monitor.valid()) return; + + // also done if the object is still in the same state + if (newstate == _state.get()) return; + + // replace the new state + _state.reset(newstate); + } +} + /** * Method that is called when the heartbeat frequency is negotiated. * @param connection The connection that suggested a heartbeat interval diff --git a/src/tcpresolver.h b/src/tcpresolver.h index fb157816..de543b44 100644 --- a/src/tcpresolver.h +++ b/src/tcpresolver.h @@ -5,7 +5,7 @@ * server, and to make the initial connection * * @author Emiel Bruijntjes - * @copyright 2015 Copernica BV + * @copyright 2015 - 2016 Copernica BV */ /** @@ -173,9 +173,9 @@ class TcpResolver : public TcpState /** * Wait for the resolver to be ready - * @param fd The filedescriptor that is active - * @param flags Flags to indicate that fd is readable and/or writable - * @return New implementation object + * @param fd The filedescriptor that is active + * @param flags Flags to indicate that fd is readable and/or writable + * @return New implementation object */ virtual TcpState *process(int fd, int flags) override { @@ -191,6 +191,25 @@ class TcpResolver : public TcpState // create dummy implementation return new TcpClosed(_connection, _handler); } + + /** + * Flush state / wait for the connection to complete + * @return New implementation object + */ + virtual TcpState *flush() override + { + // just wait for the other thread to be ready + _thread.join(); + + // do we have a valid socket? + if (_socket >= 0) return new TcpConnected(_connection, _socket, std::move(_buffer), _handler); + + // report error + _handler->onError(_connection, _error.data()); + + // create dummy implementation + return new TcpClosed(_connection, _handler); + } /** * Send data over the connection diff --git a/src/tcpstate.h b/src/tcpstate.h index bb817590..11aa75d1 100644 --- a/src/tcpstate.h +++ b/src/tcpstate.h @@ -4,7 +4,7 @@ * Base class / interface of the various states of the TCP connection * * @author Emiel Bruijntjes - * @copyright 2015 Copernica BV + * @copyright 2015 - 2016 Copernica BV */ /** @@ -90,6 +90,12 @@ class TcpState return _handler->onNegotiate(_connection, heartbeat); } + /** + * Flush the connection + * @return TcpState new implementation object + */ + virtual TcpState *flush() { return this; } + /** * Report to the handler that the object is in an error state * @param error