From f87b289b3dac9fac5a68c7119bf154c75f1ce935 Mon Sep 17 00:00:00 2001 From: M Hightower <27247790+mhightower83@users.noreply.github.com> Date: Thu, 4 Jul 2019 19:50:44 -0700 Subject: [PATCH 1/2] Fixes the handling tcp_err() callback and addresses build issues with different core releases. Summary: The operator "new ..." was changed to "new (std::nothrow) ...", which will return NULL when the heap is out of memory. Without the change "soft WDT" was the result, starting with Arduino ESP8266 Core 2.5.0. (Note, RE:"soft WDT" - the error reporting may improve with core 2.6.) With proir core versions the library appears to work fine. ref: https://github.com/esp8266/Arduino/issues/6269#issue-464978944 To support newer lwIP versions and buffer models. All references to 1460 were replaced with TCP_MSS. If TCP_MSS is not defined (exp. 1.4v lwIP) 1460 is assumed. The ESPAsyncTCP library should build for Arduino ESP8266 Core releases: 2.3.0, 2.4.1, 2.4.2, 2.5.1, 2.5.2. It may still build with core versions 2.4.0 and 2.5.0. I did not do any regression testing with these, since they had too many issues and were quickly superseded. lwIP tcp_err() callback often resulted in crashes. The problem was a tcp_err() would come in, while processing a send or receive in the forground. The tcp_err() callback would be passed down to a client's registered disconnect CB. A common problem with SyncClient and other modules as well as some client code was: the freeing of ESPAsyncTCP AsyncClient objects via disconnect CB handlers while the library was waiting for an operstion to finished. Attempts to access bad pointers followed. For SyncClient this commonly occured during a call to delay(). On return to SyncClient _client was invalid. Also the problem described by issue #94 also surfaced Use of tcp_abort() required some very special handling and was very challenging to make work without changing client API. ERR_ABRT can only be used once on a return to lwIP for a given connection and since the AsyncClient structure was sometimes deleted before returning to lwIP, the state tracking became tricky. While ugly, a global variable for this seemed to work; however, I abanded it when I saw a possible reentrancy/concurrency issue. After several approaches I settled the problem by creating "class ACErrorTracker" to manage the issue. Additional Async Client considerations: The client sketch must always test if the connection is still up at loop() entry and after the return of any function call, that may have done a delay() or yield() or any ESPAsyncTCP library family call. For example, the connection could be lost during a call to _client->write(...). Client sketches that delete _client as part of their onDisconnect() handler must be very careful as _client will become invalid after calls to delay(), yield(), etc. --- DebugPrintMacros.h | 96 ++++ library.json | 4 +- library.properties | 2 +- src/AsyncPrinter.cpp | 55 +- src/AsyncPrinter.h | 2 +- src/ESPAsyncTCP.cpp | 525 ++++++++++++++---- src/ESPAsyncTCP.h | 90 ++- src/ESPAsyncTCPbuffer.cpp | 1096 +++++++++++++++++++------------------ src/ESPAsyncTCPbuffer.h | 236 ++++---- src/SyncClient.cpp | 178 +++++- src/SyncClient.h | 41 +- src/async_config.h | 31 +- src/tcp_axtls.c | 1 + src/tcp_axtls.h | 0 14 files changed, 1517 insertions(+), 840 deletions(-) create mode 100644 DebugPrintMacros.h mode change 100755 => 100644 src/tcp_axtls.c mode change 100755 => 100644 src/tcp_axtls.h diff --git a/DebugPrintMacros.h b/DebugPrintMacros.h new file mode 100644 index 0000000..29accaf --- /dev/null +++ b/DebugPrintMacros.h @@ -0,0 +1,96 @@ +#ifndef _DEBUG_PRINT_MACROS_H +#define _DEBUG_PRINT_MACROS_H +// Some customizable print macros to suite the debug needs de jour. + +// Debug macros +// #include +// https://stackoverflow.com/questions/8487986/file-macro-shows-full-path +// This value is resolved at compile time. +#define _FILENAME_ strrchr("/" __FILE__, '/') + +// #define DEBUG_ESP_ASYNC_TCP 1 +// #define DEBUG_ESP_TCP_SSL 1 +// #define DEBUG_ESP_PORT Serial + +#if defined(DEBUG_ESP_PORT) && !defined(DEBUG_TIME_STAMP_FMT) +#define DEBUG_TIME_STAMP_FMT "%06u.%03u " +struct _DEBUG_TIME_STAMP { + unsigned dec; + unsigned whole; +}; +inline struct _DEBUG_TIME_STAMP debugTimeStamp(void) { + struct _DEBUG_TIME_STAMP st; + unsigned now = millis() % 1000000000; + st.dec = now % 1000; + st.whole = now / 1000; + return st; +} +#endif + +#if defined(DEBUG_ESP_PORT) && !defined(DEBUG_GENERIC) + #define DEBUG_GENERIC( module, format, ... ) \ + do { \ + struct _DEBUG_TIME_STAMP st = debugTimeStamp(); \ + DEBUG_ESP_PORT.printf( DEBUG_TIME_STAMP_FMT module " " format, st.whole, st.dec, ##__VA_ARGS__ ); \ + } while(false) +#endif +#if defined(DEBUG_ESP_PORT) && !defined(DEBUG_GENERIC_P) + #define DEBUG_GENERIC_P( module, format, ... ) \ + do { \ + struct _DEBUG_TIME_STAMP st = debugTimeStamp(); \ + DEBUG_ESP_PORT.printf_P(PSTR( DEBUG_TIME_STAMP_FMT module " " format ), st.whole, st.dec, ##__VA_ARGS__ ); \ + } while(false) +#endif + +#if defined(DEBUG_GENERIC) && !defined(ASSERT_GENERIC) +#define ASSERT_GENERIC( a, module ) \ + do { \ + if ( !(a) ) { \ + DEBUG_GENERIC( module, "%s:%s:%u: ASSERT("#a") failed!\n", __FILE__, __func__, __LINE__); \ + DEBUG_ESP_PORT.flush(); \ + } \ + } while(false) +#endif +#if defined(DEBUG_GENERIC_P) && !defined(ASSERT_GENERIC_P) +#define ASSERT_GENERIC_P( a, module ) \ + do { \ + if ( !(a) ) { \ + DEBUG_GENERIC_P( module, "%s:%s:%u: ASSERT("#a") failed!\n", __FILE__, __func__, __LINE__); \ + DEBUG_ESP_PORT.flush(); \ + } \ + } while(false) +#endif + +#ifndef DEBUG_GENERIC +#define DEBUG_GENERIC(...) do { (void)0;} while(false) +#endif + +#ifndef DEBUG_GENERIC_P +#define DEBUG_GENERIC_P(...) do { (void)0;} while(false) +#endif + +#ifndef ASSERT_GENERIC +#define ASSERT_GENERIC(...) do { (void)0;} while(false) +#endif + +#ifndef ASSERT_GENERIC_P +#define ASSERT_GENERIC_P(...) do { (void)0;} while(false) +#endif + +#ifndef DEBUG_ESP_PRINTF +#define DEBUG_ESP_PRINTF( format, ...) DEBUG_GENERIC_P("[%s]", format, &_FILENAME_[1], ##__VA_ARGS__) +#endif + +#if defined(DEBUG_ESP_ASYNC_TCP) && !defined(ASYNC_TCP_DEBUG) +#define ASYNC_TCP_DEBUG( format, ...) DEBUG_GENERIC_P("[ASYNC_TCP]", format, ##__VA_ARGS__) +#endif + +#ifndef ASYNC_TCP_ASSERT +#define ASYNC_TCP_ASSERT( a ) ASSERT_GENERIC_P( (a), "[ASYNC_TCP]") +#endif + +#if defined(DEBUG_ESP_TCP_SSL) && !defined(TCP_SSL_DEBUG) +#define TCP_SSL_DEBUG( format, ...) DEBUG_GENERIC_P("[TCP_SSL]", format, ##__VA_ARGS__) +#endif + +#endif //_DEBUG_PRINT_MACROS_H diff --git a/library.json b/library.json index 8f39000..d71d3fc 100644 --- a/library.json +++ b/library.json @@ -12,11 +12,11 @@ "type": "git", "url": "https://github.com/me-no-dev/ESPAsyncTCP.git" }, - "version": "1.2.0", + "version": "1.2.1", "license": "LGPL-3.0", "frameworks": "arduino", "platforms": "espressif8266", "build": { "libCompatMode": 2 - } + } } diff --git a/library.properties b/library.properties index d8e1f9c..2059535 100644 --- a/library.properties +++ b/library.properties @@ -1,5 +1,5 @@ name=ESP AsyncTCP -version=1.2.0 +version=1.2.1 author=Me-No-Dev maintainer=Me-No-Dev sentence=Async TCP Library for ESP8266 and ESP31B diff --git a/src/AsyncPrinter.cpp b/src/AsyncPrinter.cpp index dd61e76..8a63f20 100644 --- a/src/AsyncPrinter.cpp +++ b/src/AsyncPrinter.cpp @@ -28,7 +28,7 @@ AsyncPrinter::AsyncPrinter() , _close_cb(NULL) , _close_arg(NULL) , _tx_buffer(NULL) - , _tx_buffer_size(1460) + , _tx_buffer_size(TCP_MSS) , next(NULL) {} @@ -43,7 +43,10 @@ AsyncPrinter::AsyncPrinter(AsyncClient *client, size_t txBufLen) , next(NULL) { _attachCallbacks(); - _tx_buffer = new cbuf(_tx_buffer_size); + _tx_buffer = new (std::nothrow) cbuf(_tx_buffer_size); + if(_tx_buffer == NULL) { + panic(); //What should we do? + } } AsyncPrinter::~AsyncPrinter(){ @@ -63,10 +66,14 @@ void AsyncPrinter::onClose(ApCloseHandler cb, void *arg){ int AsyncPrinter::connect(IPAddress ip, uint16_t port){ if(_client != NULL && connected()) return 0; - _client = new AsyncClient(); + _client = new (std::nothrow) AsyncClient(); + if (_client == NULL) { + panic(); + } + _client->onConnect([](void *obj, AsyncClient *c){ ((AsyncPrinter*)(obj))->_onConnect(c); }, this); if(_client->connect(ip, port)){ - while(_client->state() < 4) + while(_client && _client->state() < 4) delay(1); return connected(); } @@ -76,10 +83,14 @@ int AsyncPrinter::connect(IPAddress ip, uint16_t port){ int AsyncPrinter::connect(const char *host, uint16_t port){ if(_client != NULL && connected()) return 0; - _client = new AsyncClient(); + _client = new (std::nothrow) AsyncClient(); + if (_client == NULL) { + panic(); + } + _client->onConnect([](void *obj, AsyncClient *c){ ((AsyncPrinter*)(obj))->_onConnect(c); }, this); if(_client->connect(host, port)){ - while(_client->state() < 4) + while(_client && _client->state() < 4) delay(1); return connected(); } @@ -87,12 +98,17 @@ int AsyncPrinter::connect(const char *host, uint16_t port){ } void AsyncPrinter::_onConnect(AsyncClient *c){ + (void)c; if(_tx_buffer != NULL){ cbuf *b = _tx_buffer; _tx_buffer = NULL; delete b; } - _tx_buffer = new cbuf(_tx_buffer_size); + _tx_buffer = new (std::nothrow) cbuf(_tx_buffer_size); + if(_tx_buffer) { + panic(); + } + _attachCallbacks(); } @@ -109,7 +125,11 @@ AsyncPrinter & AsyncPrinter::operator=(const AsyncPrinter &other){ _tx_buffer = NULL; delete b; } - _tx_buffer = new cbuf(other._tx_buffer_size); + _tx_buffer = new (std::nothrow) cbuf(other._tx_buffer_size); + if(_tx_buffer == NULL) { + panic(); + } + _client = other._client; _attachCallbacks(); return *this; @@ -127,13 +147,16 @@ size_t AsyncPrinter::write(const uint8_t *data, size_t len){ while(_tx_buffer->room() < toSend){ toWrite = _tx_buffer->room(); _tx_buffer->write((const char*)data, toWrite); - while(!_client->canSend()) + while(connected() && !_client->canSend()) delay(0); + if(!connected()) + return 0; // or len - toSend; _sendBuffer(); toSend -= toWrite; } _tx_buffer->write((const char*)(data+(len - toSend)), toSend); - while(!_client->canSend()) delay(0); + while(connected() && !_client->canSend()) delay(0); + if(!connected()) return 0; // or len - toSend; _sendBuffer(); return len; } @@ -154,7 +177,11 @@ size_t AsyncPrinter::_sendBuffer(){ size_t sendable = _client->space(); if(sendable < available) available= sendable; - char *out = new char[available]; + char *out = new (std::nothrow) char[available]; + if (out == NULL) { + panic(); // Connection should be aborted instead + } + _tx_buffer->read(out, available); size_t sent = _client->write(out, available); delete out; @@ -180,8 +207,8 @@ void AsyncPrinter::_on_close(){ } void AsyncPrinter::_attachCallbacks(){ - _client->onPoll([](void *obj, AsyncClient* c){ ((AsyncPrinter*)(obj))->_sendBuffer(); }, this); - _client->onAck([](void *obj, AsyncClient* c, size_t len, uint32_t time){ ((AsyncPrinter*)(obj))->_sendBuffer(); }, this); + _client->onPoll([](void *obj, AsyncClient* c){ (void)c; ((AsyncPrinter*)(obj))->_sendBuffer(); }, this); + _client->onAck([](void *obj, AsyncClient* c, size_t len, uint32_t time){ (void)c; (void)len; (void)time; ((AsyncPrinter*)(obj))->_sendBuffer(); }, this); _client->onDisconnect([](void *obj, AsyncClient* c){ ((AsyncPrinter*)(obj))->_on_close(); delete c; }, this); - _client->onData([](void *obj, AsyncClient* c, void *data, size_t len){ ((AsyncPrinter*)(obj))->_onData(data, len); }, this); + _client->onData([](void *obj, AsyncClient* c, void *data, size_t len){ (void)c; ((AsyncPrinter*)(obj))->_onData(data, len); }, this); } diff --git a/src/AsyncPrinter.h b/src/AsyncPrinter.h index 1fa0f8b..c3ebe3a 100644 --- a/src/AsyncPrinter.h +++ b/src/AsyncPrinter.h @@ -46,7 +46,7 @@ class AsyncPrinter: public Print { AsyncPrinter *next; AsyncPrinter(); - AsyncPrinter(AsyncClient *client, size_t txBufLen = 1460); + AsyncPrinter(AsyncClient *client, size_t txBufLen = TCP_MSS); virtual ~AsyncPrinter(); int connect(IPAddress ip, uint16_t port); diff --git a/src/ESPAsyncTCP.cpp b/src/ESPAsyncTCP.cpp index 7c3fd97..7a9fdc7 100644 --- a/src/ESPAsyncTCP.cpp +++ b/src/ESPAsyncTCP.cpp @@ -18,7 +18,58 @@ License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - +/* +Changes for July 2019 + +The operator "new ..." was changed to "new (std::nothrow) ...", which will +return NULL when the heap is out of memory. Without the change "soft WDT" +was the result, starting with Arduino ESP8266 Core 2.5.0. (Note, RE:"soft +WDT" - the error reporting may improve with core 2.6.) With proir core +versions the library appears to work fine. +ref: https://github.com/esp8266/Arduino/issues/6269#issue-464978944 + +To support newer lwIP versions and buffer models. All references to 1460 +were replaced with TCP_MSS. If TCP_MSS is not defined (exp. 1.4v lwIP) +1460 is assumed. + +The ESPAsyncTCP library should build for Arduino ESP8266 Core releases: +2.3.0, 2.4.1, 2.4.2, 2.5.1, 2.5.2. It may still build with core versions +2.4.0 and 2.5.0. I did not do any regression testing with these, since +they had too many issues and were quickly superseded. + +lwIP tcp_err() callback often resulted in crashes. The problem was a +tcp_err() would come in, while processing a send or receive in the +forground. The tcp_err() callback would be passed down to a client's +registered disconnect CB. A common problem with SyncClient and other +modules as well as some client code was: the freeing of ESPAsyncTCP +AsyncClient objects via disconnect CB handlers while the library was +waiting for an operstion to finished. Attempts to access bad pointers +followed. For SyncClient this commonly occured during a call to delay(). +On return to SyncClient _client was invalid. Also the problem described by +issue #94 also surfaced + +Use of tcp_abort() required some very special handling and was very +challenging to make work without changing client API. ERR_ABRT can only be +used once on a return to lwIP for a given connection and since the +AsyncClient structure was sometimes deleted before returning to lwIP, the +state tracking became tricky. While ugly, a global variable for this +seemed to work; however, I abanded it when I saw a possible +reentrancy/concurrency issue. After several approaches I settled the +problem by creating "class ACErrorTracker" to manage the issue. + + +Additional Async Client considerations: + +The client sketch must always test if the connection is still up at loop() +entry and after the return of any function call, that may have done a +delay() or yield() or any ESPAsyncTCP library family call. For example, +the connection could be lost during a call to _client->write(...). Client +sketches that delete _client as part of their onDisconnect() handler must +be very careful as _client will become invalid after calls to delay(), +yield(), etc. + + + */ #include "Arduino.h" #include "ESPAsyncTCP.h" @@ -31,9 +82,71 @@ extern "C"{ } #include +/* + Async Client Error Return Tracker +*/ +// Assumption: callbacks are never called with err == ERR_ABRT; however, +// they may return ERR_ABRT. + +ACErrorTracker::ACErrorTracker(AsyncClient *c): + _client(c) + , _close_error(ERR_OK) + , _errored(EE_OK) +#ifdef DEBUG_MORE + , _error_event_cb(NULL) + , _error_event_cb_arg(NULL) +#endif +{} + +#ifdef DEBUG_MORE +/** + * This is not necessary, but a start at gathering some statistics on + * errored out connections. Used from AsyncServer. + */ +void ACErrorTracker::onErrorEvent(AsNotifyHandler cb, void *arg) { + _error_event_cb = cb; + _error_event_cb_arg = arg; +} +#endif + +void ACErrorTracker::setCloseError(err_t e) { + if (e != ERR_OK) + ASYNC_TCP_DEBUG("setCloseError() to: %s(%ld)\n", _client->errorToString(e), e); + if(_errored == EE_OK) + _close_error = e; +} +/** + * Called mainly by callback routines, called when err is not ERR_OK. + * This prevents the possiblity of aborting an already errored out + * connection. + */ +void ACErrorTracker::setErrored(size_t errorEvent){ + if(EE_OK == _errored) + _errored = errorEvent; +#ifdef DEBUG_MORE + if (_error_event_cb) + _error_event_cb(_error_event_cb_arg, errorEvent); +#endif +} +/** + * Used by callback functions only. Used for proper ERR_ABRT return value + * reporting. ERR_ABRT is only reported/returned once; thereafter ERR_OK + * is always returned. + */ +err_t ACErrorTracker::getCallbackCloseError(void){ + if (EE_OK != _errored) + return ERR_OK; + if (ERR_ABRT == _close_error) + setErrored(EE_ABORTED); + return _close_error; +} + /* Async TCP Client */ +#if DEBUG_ESP_ASYNC_TCP +static size_t _connectionCount=0; +#endif #if ASYNC_TCP_SSL_ENABLED AsyncClient::AsyncClient(tcp_pcb* pcb, SSL_CTX * ssl_ctx): @@ -54,6 +167,8 @@ AsyncClient::AsyncClient(tcp_pcb* pcb): , _pb_cb_arg(0) , _timeout_cb(0) , _timeout_cb_arg(0) + , _poll_cb(0) + , _poll_cb_arg(0) , _pcb_busy(false) #if ASYNC_TCP_SSL_ENABLED , _pcb_secure(false) @@ -65,10 +180,13 @@ AsyncClient::AsyncClient(tcp_pcb* pcb): , _tx_unacked_len(0) , _tx_acked_len(0) , _tx_unsent_len(0) + , _rx_ack_len(0) , _rx_last_packet(0) , _rx_since_timeout(0) , _ack_timeout(ASYNC_MAX_ACK_TIME) , _connect_port(0) + , _recv_pbuf_flags(0) + , _errorTracker(NULL) , prev(NULL) , next(NULL) { @@ -97,11 +215,26 @@ AsyncClient::AsyncClient(tcp_pcb* pcb): } #endif } + + _errorTracker = std::make_shared(this); +#if DEBUG_ESP_ASYNC_TCP + _errorTracker->setConnectionId(++_connectionCount); +#endif } AsyncClient::~AsyncClient(){ if(_pcb) _close(); + + _errorTracker->clearClient(); +} + +inline void clearTcpCallbacks(tcp_pcb* pcb){ + tcp_arg(pcb, NULL); + tcp_sent(pcb, NULL); + tcp_recv(pcb, NULL); + tcp_err(pcb, NULL); + tcp_poll(pcb, NULL, 0); } #if ASYNC_TCP_SSL_ENABLED @@ -124,14 +257,15 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){ return false; } + tcp_setprio(pcb, TCP_PRIO_MIN); #if ASYNC_TCP_SSL_ENABLED _pcb_secure = secure; _handshake_done = !secure; #endif tcp_arg(pcb, this); tcp_err(pcb, &_s_error); - tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_s_connected); - return true; + size_t err = tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_s_connected); + return (ERR_OK == err); } #if ASYNC_TCP_SSL_ENABLED @@ -159,9 +293,14 @@ bool AsyncClient::connect(const char* host, uint16_t port){ } AsyncClient& AsyncClient::operator=(const AsyncClient& other){ - if (_pcb) + if (_pcb) { + ASYNC_TCP_DEBUG("operator=[%u]: Abandoned _pcb(0x%" PRIXPTR ") forced close.\n", getConnectionId(), uintptr_t(_pcb)); _close(); + } + _errorTracker = other._errorTracker; + // I am confused when "other._pcb" falls out of scope the destructor will + // close it? TODO: Look to see where this is used and how it might work. _pcb = other._pcb; if (_pcb) { _rx_last_packet = millis(); @@ -192,12 +331,25 @@ bool AsyncClient::operator==(const AsyncClient &other) { return (_pcb != NULL && other._pcb != NULL && (_pcb->remote_ip.addr == other._pcb->remote_ip.addr) && (_pcb->remote_port == other._pcb->remote_port)); } -int8_t AsyncClient::abort(){ +void AsyncClient::abort(){ + // Notes: + // 1) _pcb is set to NULL, so we cannot call tcp_abort() more than once. + // 2) setCloseError(ERR_ABRT) is only done here! + // 3) Using this abort() function guarantees only one tcp_abort() call is + // made and only one CB returns with ERR_ABORT. + // 4) After abort() is called from _close(), no callbacks with an err + // parameter will be called. eg. _recv(), _error(), _connected(). + // _close() will reset there CB handlers before calling. + // 5) A callback to _error(), will set _pcb to NULL, thus avoiding the + // of a 2nd call to tcp_abort(). + // 6) Callbacks to _recv() or _connected() with err set, will result in _pcb + // set to NULL. Thus, preventing possible calls later to tcp_abort(). if(_pcb) { tcp_abort(_pcb); _pcb = NULL; + setCloseError(ERR_ABRT); } - return ERR_ABRT; + return; } void AsyncClient::close(bool now){ @@ -229,6 +381,7 @@ size_t AsyncClient::write(const char* data) { size_t AsyncClient::write(const char* data, size_t size, uint8_t apiflags) { size_t will_send = add(data, size, apiflags); + if(!will_send || !send()) return 0; return will_send; @@ -252,9 +405,11 @@ size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) { } #endif size_t will_send = (room < size) ? room : size; - int8_t err = tcp_write(_pcb, data, will_send, apiflags); - if(err != ERR_OK) + err_t err = tcp_write(_pcb, data, will_send, apiflags); + if(err != ERR_OK) { + ASYNC_TCP_DEBUG("_add[%u]: tcp_write() returned err: %s(%ld)\n", getConnectionId(), errorToString(err), err); return 0; + } _tx_unsent_len += will_send; return will_send; } @@ -264,13 +419,16 @@ bool AsyncClient::send(){ if(_pcb_secure) return true; #endif - if(tcp_output(_pcb) == ERR_OK){ + err_t err = tcp_output(_pcb); + if(err == ERR_OK){ _pcb_busy = true; _pcb_sent_at = millis(); _tx_unacked_len += _tx_unsent_len; _tx_unsent_len = 0; return true; } + + ASYNC_TCP_DEBUG("send[%u]: tcp_output() returned err: %s(%ld)", getConnectionId(), errorToString(err), err); _tx_unsent_len = 0; return false; } @@ -286,7 +444,25 @@ size_t AsyncClient::ack(size_t len){ // Private Callbacks -err_t AsyncClient::_connected(void* pcb, err_t err){ +void AsyncClient::_connected(std::shared_ptr& errorTracker, void* pcb, err_t err){ + //(void)err; // LWIP v1.4 appears to always call with ERR_OK + // Documentation for 2.1.0 also says: + // "err - An unused error code, always ERR_OK currently ;-)" + // https://www.nongnu.org/lwip/2_1_x/tcp_8h.html#a939867106bd492caf2d85852fb7f6ae8 + // Based on that wording and emoji lets just handle it now. + // After all, the API does allow for an err != ERR_OK. + if(NULL == pcb || ERR_OK != err) { + ASYNC_TCP_DEBUG("_connected[%u]:%s err: %s(%ld)\n", errorTracker->getConnectionId(), ((NULL == pcb) ? " NULL == pcb!," : ""), errorToString(err), err); + errorTracker->setCloseError(err); + errorTracker->setErrored(EE_CONNECTED_CB); + _pcb = reinterpret_cast(pcb); + if (_pcb) + clearTcpCallbacks(_pcb); + _pcb = NULL; + _error(err); + return; + } + _pcb = reinterpret_cast(pcb); if(_pcb){ _pcb_busy = false; @@ -298,7 +474,8 @@ err_t AsyncClient::_connected(void* pcb, err_t err){ #if ASYNC_TCP_SSL_ENABLED if(_pcb_secure){ if(tcp_ssl_new_client(_pcb) < 0){ - return _close(); + _close(); + return; } tcp_ssl_arg(_pcb, this); tcp_ssl_data(_pcb, &_s_data); @@ -312,45 +489,41 @@ err_t AsyncClient::_connected(void* pcb, err_t err){ if(_connect_cb) #endif _connect_cb(_connect_cb_arg, this); - return ERR_OK; + return; } -int8_t AsyncClient::_close(){ - int8_t err = ERR_OK; +void AsyncClient::_close(){ if(_pcb) { #if ASYNC_TCP_SSL_ENABLED if(_pcb_secure){ tcp_ssl_free(_pcb); } #endif - tcp_arg(_pcb, NULL); - tcp_sent(_pcb, NULL); - tcp_recv(_pcb, NULL); - tcp_err(_pcb, NULL); - tcp_poll(_pcb, NULL, 0); - err = tcp_close(_pcb); - if(err != ERR_OK) { - err = abort(); + clearTcpCallbacks(_pcb); + err_t err = tcp_close(_pcb); + if(ERR_OK == err) { + setCloseError(err); + } else { + ASYNC_TCP_DEBUG("_close[%u]: abort() called for AsyncClient 0x%" PRIXPTR "\n", getConnectionId(), uintptr_t(this)); + abort(); } _pcb = NULL; if(_discard_cb) _discard_cb(_discard_cb_arg, this); } - return err; + return; } void AsyncClient::_error(err_t err) { + ASYNC_TCP_DEBUG("_error[%u]:%s err: %s(%ld)\n", getConnectionId(), ((NULL == _pcb) ? " NULL == _pcb!," : ""), errorToString(err), err); if(_pcb){ #if ASYNC_TCP_SSL_ENABLED if(_pcb_secure){ tcp_ssl_free(_pcb); } #endif - tcp_arg(_pcb, NULL); - tcp_sent(_pcb, NULL); - tcp_recv(_pcb, NULL); - tcp_err(_pcb, NULL); - tcp_poll(_pcb, NULL, 0); + // At this callback _pcb is possible already freed. Thus, no calls are + // made to set to NULL other callbacks. _pcb = NULL; } if(_error_cb) @@ -366,73 +539,134 @@ void AsyncClient::_ssl_error(int8_t err){ } #endif -err_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) { +void AsyncClient::_sent(std::shared_ptr& errorTracker, tcp_pcb* pcb, uint16_t len) { + (void)pcb; #if ASYNC_TCP_SSL_ENABLED if (_pcb_secure && !_handshake_done) - return ERR_OK; + return; #endif _rx_last_packet = millis(); _tx_unacked_len -= len; _tx_acked_len += len; - ASYNC_TCP_DEBUG("_sent: %u (%d %d)\n", len, _tx_unacked_len, _tx_acked_len); + ASYNC_TCP_DEBUG("_sent[%u]: %4u, unacked=%4u, acked=%4u, space=%4u\n", errorTracker->getConnectionId(), len, _tx_unacked_len, _tx_acked_len, space()); if(_tx_unacked_len == 0){ _pcb_busy = false; - if(_sent_cb) + errorTracker->setCloseError(ERR_OK); + if(_sent_cb) { _sent_cb(_sent_cb_arg, this, _tx_acked_len, (millis() - _pcb_sent_at)); - _tx_acked_len = 0; + if(!errorTracker->hasClient()) + return; + } + _tx_acked_len = 0; + } + return; +} + +void AsyncClient::_recv(std::shared_ptr& errorTracker, tcp_pcb* pcb, pbuf* pb, err_t err) { + // While lwIP v1.4 appears to always call with ERR_OK, 2.x lwIP may present + // a non-ERR_OK value. + // https://www.nongnu.org/lwip/2_1_x/tcp_8h.html#a780cfac08b02c66948ab94ea974202e8 + if(NULL == pcb || ERR_OK != err){ + ASYNC_TCP_DEBUG("_recv[%u]:%s err: %s(%ld)\n", errorTracker->getConnectionId(), ((NULL == pcb) ? " NULL == pcb!," : ""), errorToString(err), err); + ASYNC_TCP_ASSERT(ERR_ABRT != err); + errorTracker->setCloseError(err); + errorTracker->setErrored(EE_RECV_CB); + _pcb = pcb; + if(_pcb) + clearTcpCallbacks(_pcb); + _pcb = NULL; + // I think we are safe from being called from an interrupt context. + // Best Hint that calling _error() is safe: + // https://www.nongnu.org/lwip/2_1_x/group__lwip__nosys.html + // "Feed incoming packets to netif->input(pbuf, netif) function from + // mainloop, not from interrupt context. You can allocate a Packet buffers + // (PBUF) in interrupt context and put them into a queue which is processed + // from mainloop." + // And the description of "Mainloop Mode" option 2: + // https://www.nongnu.org/lwip/2_1_x/pitfalls.html + // "2) Run lwIP in a mainloop. ... lwIP is ONLY called from mainloop + // callstacks here. The ethernet IRQ has to put received telegrams into a + // queue which is polled in the mainloop. Ensure lwIP is NEVER called from + // an interrupt, ...!" + // Based on these comments I am thinking tcp_recv_fn() is called + // from somebody's mainloop(), which could only have been reached from a + // delay like function or the Arduino sketch loop() function has returned. + // What I don't want is for the client sketch to delete the AsyncClient + // object via _error() while it is in the middle of using it. However, + // the client sketch must always test that the connection is still up + // at loop() entry and after the return of any function call, that may + // have done a delay() or yield(). + _error(err); + return; } - return ERR_OK; -} -err_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, err_t err) { if(pb == NULL){ - ASYNC_TCP_DEBUG("_recv: pb == NULL! Closing... %d\n", err); - return _close(); + ASYNC_TCP_DEBUG("_recv[%u]: pb == NULL! Closing... %ld\n", errorTracker->getConnectionId(), err); + _close(); + return; } - _rx_last_packet = millis(); + errorTracker->setCloseError(ERR_OK); #if ASYNC_TCP_SSL_ENABLED if(_pcb_secure){ - ASYNC_TCP_DEBUG("_recv: %d\n", pb->tot_len); + ASYNC_TCP_DEBUG("_recv[%u]: %d\n", getConnectionId(), pb->tot_len); int read_bytes = tcp_ssl_read(pcb, pb); if(read_bytes < 0){ if (read_bytes != SSL_CLOSE_NOTIFY) { - ASYNC_TCP_DEBUG("_recv err: %d\n", read_bytes); + ASYNC_TCP_DEBUG("_recv[%u] err: %d\n", getConnectionId(), read_bytes); _close(); } - //return read_bytes; } - return ERR_OK; + return; } #endif while(pb != NULL){ + // IF this callback function returns ERR_OK or ERR_ABRT + // then it is assummed we freed the pbufs. + // https://www.nongnu.org/lwip/2_1_x/group__tcp__raw.html#ga8afd0b316a87a5eeff4726dc95006ed0 + if(!errorTracker->hasClient()){ + while(pb != NULL){ + pbuf *b = pb; + pb = b->next; + b->next = NULL; + pbuf_free(b); + } + return; + } //we should not ack before we assimilate the data _ack_pcb = true; pbuf *b = pb; pb = b->next; b->next = NULL; - ASYNC_TCP_DEBUG("_recv: %d\n", b->len); + ASYNC_TCP_DEBUG("_recv[%u]: %d%s\n", errorTracker->getConnectionId(), b->len, (b->flags&PBUF_FLAG_PUSH)?", PBUF_FLAG_PUSH":""); if(_pb_cb){ _pb_cb(_pb_cb_arg, this, b); } else { - if(_recv_cb) + if(_recv_cb){ + _recv_pbuf_flags = b->flags; _recv_cb(_recv_cb_arg, this, b->payload, b->len); - if(!_ack_pcb) - _rx_ack_len += b->len; - else - tcp_recved(pcb, b->len); + } + if(errorTracker->hasClient()){ + if(!_ack_pcb) + _rx_ack_len += b->len; + else + tcp_recved(pcb, b->len); + } pbuf_free(b); } } - return ERR_OK; + return; } -err_t AsyncClient::_poll(tcp_pcb* pcb){ +void AsyncClient::_poll(std::shared_ptr& errorTracker, tcp_pcb* pcb){ + (void)pcb; + errorTracker->setCloseError(ERR_OK); + // Close requested if(_close_pcb){ _close_pcb = false; _close(); - return ERR_OK; + return; } uint32_t now = millis(); @@ -441,24 +675,24 @@ err_t AsyncClient::_poll(tcp_pcb* pcb){ _pcb_busy = false; if(_timeout_cb) _timeout_cb(_timeout_cb_arg, this, (now - _pcb_sent_at)); - return ERR_OK; + return; } // RX Timeout if(_rx_since_timeout && (now - _rx_last_packet) >= (_rx_since_timeout * 1000)){ _close(); - return ERR_OK; + return; } #if ASYNC_TCP_SSL_ENABLED // SSL Handshake Timeout if(_pcb_secure && !_handshake_done && (now - _rx_last_packet) >= 2000){ _close(); - return ERR_OK; + return; } #endif // Everything is fine if(_poll_cb) _poll_cb(_poll_cb_arg, this); - return ERR_OK; + return; } #if LWIP_VERSION_MAJOR == 1 @@ -480,33 +714,50 @@ void AsyncClient::_dns_found(const ip_addr *ipaddr){ } } -// lWIP Callbacks +// lwIP Callbacks #if LWIP_VERSION_MAJOR == 1 void AsyncClient::_s_dns_found(const char *name, ip_addr_t *ipaddr, void *arg){ #else void AsyncClient::_s_dns_found(const char *name, const ip_addr *ipaddr, void *arg){ #endif + (void)name; reinterpret_cast(arg)->_dns_found(ipaddr); } err_t AsyncClient::_s_poll(void *arg, struct tcp_pcb *tpcb) { - return reinterpret_cast(arg)->_poll(tpcb); + AsyncClient *c = reinterpret_cast(arg); + std::shared_ptrerrorTracker = c->getACErrorTracker(); + c->_poll(errorTracker, tpcb); + return errorTracker->getCallbackCloseError(); } err_t AsyncClient::_s_recv(void *arg, struct tcp_pcb *tpcb, struct pbuf *pb, err_t err) { - return reinterpret_cast(arg)->_recv(tpcb, pb, err); + AsyncClient *c = reinterpret_cast(arg); + auto errorTracker = c->getACErrorTracker(); + c->_recv(errorTracker, tpcb, pb, err); + return errorTracker->getCallbackCloseError(); } void AsyncClient::_s_error(void *arg, err_t err) { - reinterpret_cast(arg)->_error(err); + AsyncClient *c = reinterpret_cast(arg); + auto errorTracker = c->getACErrorTracker(); + errorTracker->setCloseError(err); + errorTracker->setErrored(EE_ERROR_CB); + c->_error(err); } err_t AsyncClient::_s_sent(void *arg, struct tcp_pcb *tpcb, uint16_t len) { - return reinterpret_cast(arg)->_sent(tpcb, len); + AsyncClient *c = reinterpret_cast(arg); + auto errorTracker = c->getACErrorTracker(); + c->_sent(errorTracker, tpcb, len); + return errorTracker->getCallbackCloseError(); } err_t AsyncClient::_s_connected(void* arg, void* tpcb, err_t err){ - return reinterpret_cast(arg)->_connected(tpcb, err); + AsyncClient *c = reinterpret_cast(arg); + auto errorTracker = c->getACErrorTracker(); + c->_connected(errorTracker, tpcb, err); + return errorTracker->getCallbackCloseError(); } #if ASYNC_TCP_SSL_ENABLED @@ -732,11 +983,11 @@ size_t AsyncClient::space(){ } return s; } -#else +#else // ASYNC_TCP_SSL_ENABLED if((_pcb != NULL) && (_pcb->state == 4)){ return tcp_sndbuf(_pcb); } -#endif +#endif // ASYNC_TCP_SSL_ENABLED return 0; } @@ -748,43 +999,30 @@ void AsyncClient::ackPacket(struct pbuf * pb){ pbuf_free(pb); } -const char * AsyncClient::errorToString(int8_t error) { - switch (error) { - case ERR_OK: - return "OK"; - case ERR_MEM: - return "Out of memory error"; - case ERR_BUF: - return "Buffer error"; - case ERR_TIMEOUT: - return "Timeout"; - case ERR_RTE: - return "Routing problem"; - case ERR_INPROGRESS: - return "Operation in progress"; - case ERR_VAL: - return "Illegal value"; - case ERR_WOULDBLOCK: - return "Operation would block"; - case ERR_ABRT: - return "Connection aborted"; - case ERR_RST: - return "Connection reset"; - case ERR_CLSD: - return "Connection closed"; - case ERR_CONN: - return "Not connected"; - case ERR_ARG: - return "Illegal argument"; - case ERR_USE: - return "Address in use"; - case ERR_IF: - return "Low-level netif error"; - case ERR_ISCONN: - return "Connection already established"; - default: - return "Unknown error"; - } +const char * AsyncClient::errorToString(err_t error) { + switch (error) { + case ERR_OK: return "No error, everything OK"; + case ERR_MEM: return "Out of memory error"; + case ERR_BUF: return "Buffer error"; + case ERR_TIMEOUT: return "Timeout"; + case ERR_RTE: return "Routing problem"; + case ERR_INPROGRESS: return "Operation in progress"; + case ERR_VAL: return "Illegal value"; + case ERR_WOULDBLOCK: return "Operation would block"; + case ERR_ABRT: return "Connection aborted"; + case ERR_RST: return "Connection reset"; + case ERR_CLSD: return "Connection closed"; + case ERR_CONN: return "Not connected"; + case ERR_ARG: return "Illegal argument"; + case ERR_USE: return "Address in use"; +#if defined(LWIP_VERSION_MAJOR) && (LWIP_VERSION_MAJOR > 1) + case ERR_ALREADY: return "Already connectioning"; +#endif + case ERR_IF: return "Low-level netif error"; + case ERR_ISCONN: return "Connection already established"; + case -55: return "DNS failed"; + default: return "Unknown error"; + } } const char * AsyncClient::stateToString(){ @@ -826,7 +1064,12 @@ AsyncServer::AsyncServer(IPAddress addr, uint16_t port) , _file_cb(0) , _file_cb_arg(0) #endif -{} +{ +#ifdef DEBUG_MORE + for (size_t i=0; inext = new_item; } } else { - AsyncClient *c = new AsyncClient(pcb, _ssl_ctx); + AsyncClient *c = new (std::nothrow) AsyncClient(pcb, _ssl_ctx); if(c){ - c->onConnect([this](void * arg, AsyncClient *c){ - _connect_cb(_connect_cb_arg, c); - }, this); + ASYNC_TCP_DEBUG("_accept[%u]: SSL connected\n", c->getConnectionId()); + c->onConnect([this](void * arg, AsyncClient *c){ + _connect_cb(_connect_cb_arg, c); + }, this); + } else { + ASYNC_TCP_DEBUG("_accept[_ssl_ctx]: new AsyncClient() failed, connection aborted!\n"); + if(tcp_close(pcb) != ERR_OK){ + tcp_abort(pcb); + return ERR_ABRT; + } } } return ERR_OK; } else { - AsyncClient *c = new AsyncClient(pcb, NULL); + AsyncClient *c = new (std::nothrow) AsyncClient(pcb, NULL); #else - AsyncClient *c = new AsyncClient(pcb); + AsyncClient *c = new (std::nothrow) AsyncClient(pcb); #endif + if(c){ + auto errorTracker = c->getACErrorTracker(); +#ifdef DEBUG_MORE + errorTracker->onErrorEvent( + [](void *obj, size_t ee){ ((AsyncServer*)(obj))->incEventCount(ee); }, + this); +#endif + ASYNC_TCP_DEBUG("_accept[%u]: connected\n", errorTracker->getConnectionId()); _connect_cb(_connect_cb_arg, c); - return ERR_OK; + return errorTracker->getCallbackCloseError(); + } else { + ASYNC_TCP_DEBUG("_accept: new AsyncClient() failed, connection aborted!\n"); + if(tcp_close(pcb) != ERR_OK){ + tcp_abort(pcb); + return ERR_ABRT; + } } #if ASYNC_TCP_SSL_ENABLED } @@ -1008,13 +1292,14 @@ err_t AsyncServer::_accept(tcp_pcb* pcb, err_t err){ } if(tcp_close(pcb) != ERR_OK){ tcp_abort(pcb); + return ERR_ABRT; } return ERR_OK; } - err_t AsyncServer::_s_accept(void *arg, tcp_pcb* pcb, err_t err){ - return reinterpret_cast(arg)->_accept(pcb, err); - } +err_t AsyncServer::_s_accept(void *arg, tcp_pcb* pcb, err_t err){ + return reinterpret_cast(arg)->_accept(pcb, err); +} #if ASYNC_TCP_SSL_ENABLED err_t AsyncServer::_poll(tcp_pcb* pcb){ @@ -1030,7 +1315,7 @@ err_t AsyncServer::_poll(tcp_pcb* pcb){ p = b; } ASYNC_TCP_DEBUG("### remove from wait: %d\n", _clients_waiting); - AsyncClient *c = new AsyncClient(pcb, _ssl_ctx); + AsyncClient *c = new (std::nothrow) AsyncClient(pcb, _ssl_ctx); if(c){ c->onConnect([this](void * arg, AsyncClient *c){ _connect_cb(_connect_cb_arg, c); @@ -1038,6 +1323,7 @@ err_t AsyncServer::_poll(tcp_pcb* pcb){ if(p->pb) c->_recv(pcb, p->pb, 0); } + // Should there be error handling for when "new AsynClient" fails?? free(p); } return ERR_OK; @@ -1065,8 +1351,11 @@ err_t AsyncServer::_recv(struct tcp_pcb *pcb, struct pbuf *pb, err_t err){ pbuf_free(p->pb); } free(p); - tcp_close(pcb); - tcp_abort(pcb); + size_t err = tcp_close(pcb); + if (err != ERR_OK) { + tcp_abort(pcb); + return ERR_ABRT; + } } else { ASYNC_TCP_DEBUG("### wait _recv: %u %d\n", pb->tot_len, _clients_waiting); p = _pending; diff --git a/src/ESPAsyncTCP.h b/src/ESPAsyncTCP.h index c1803b4..2d1f768 100644 --- a/src/ESPAsyncTCP.h +++ b/src/ESPAsyncTCP.h @@ -25,6 +25,7 @@ #include #include "IPAddress.h" #include +#include extern "C" { #include "lwip/init.h" @@ -33,6 +34,8 @@ extern "C" { }; class AsyncClient; +class AsyncServer; +class ACErrorTracker; #define ASYNC_MAX_ACK_TIME 5000 #define ASYNC_WRITE_FLAG_COPY 0x01 //will allocate new buffer to hold the data while sending (else will hold reference to the data given) @@ -49,14 +52,63 @@ typedef struct SSL_CTX_ SSL_CTX; typedef std::function AcConnectHandler; typedef std::function AcAckHandler; -typedef std::function AcErrorHandler; +typedef std::function AcErrorHandler; typedef std::function AcDataHandler; typedef std::function AcPacketHandler; typedef std::function AcTimeoutHandler; +typedef std::function AsNotifyHandler; + +enum error_events { + EE_OK = 0, + EE_ABORTED, // Callback or foreground aborted connections + EE_ERROR_CB, // Stack initiated aborts via error Callbacks. + EE_CONNECTED_CB, + EE_RECV_CB, + EE_ACCEPT_CB, + EE_MAX +}; +// DEBUG_MORE is for gathering more information on which CBs close events are +// occuring and count. +// #define DEBUG_MORE 1 +class ACErrorTracker { + private: + AsyncClient *_client; + err_t _close_error; + int _errored; +#if DEBUG_ESP_ASYNC_TCP + size_t _connectionId; +#endif +#ifdef DEBUG_MORE + AsNotifyHandler _error_event_cb; + void* _error_event_cb_arg; +#endif + + protected: + friend class AsyncClient; + friend class AsyncServer; +#ifdef DEBUG_MORE + void onErrorEvent(AsNotifyHandler cb, void *arg); +#endif +#if DEBUG_ESP_ASYNC_TCP + void setConnectionId(size_t id) { _connectionId=id;} + size_t getConnectionId(void) { return _connectionId;} +#endif + void setCloseError(err_t e); + void setErrored(size_t errorEvent); + err_t getCallbackCloseError(void); + void clearClient(void){ if (_client) _client = NULL;} + + public: + err_t getCloseError(void) const { return _close_error;} + bool hasClient(void) const { return (_client != NULL);} + ACErrorTracker(AsyncClient *c); + ~ACErrorTracker() {} +}; class AsyncClient { protected: friend class AsyncTCPbuffer; + friend class AsyncServer; tcp_pcb* _pcb; AcConnectHandler _connect_cb; void* _connect_cb_arg; @@ -90,15 +142,17 @@ class AsyncClient { uint32_t _rx_since_timeout; uint32_t _ack_timeout; uint16_t _connect_port; + u8_t _recv_pbuf_flags; + std::shared_ptr _errorTracker; - int8_t _close(); - err_t _connected(void* pcb, err_t err); + void _close(); + void _connected(std::shared_ptr& closeAbort, void* pcb, err_t err); void _error(err_t err); #if ASYNC_TCP_SSL_ENABLED void _ssl_error(int8_t err); #endif - err_t _poll(tcp_pcb* pcb); - err_t _sent(tcp_pcb* pcb, uint16_t len); + void _poll(std::shared_ptr& closeAbort, tcp_pcb* pcb); + void _sent(std::shared_ptr& closeAbort, tcp_pcb* pcb, uint16_t len); #if LWIP_VERSION_MAJOR == 1 void _dns_found(struct ip_addr *ipaddr); #else @@ -119,6 +173,8 @@ class AsyncClient { static void _s_handshake(void *arg, struct tcp_pcb *tcp, SSL *ssl); static void _s_ssl_error(void *arg, struct tcp_pcb *tcp, int8_t err); #endif + std::shared_ptr getACErrorTracker(void) const { return _errorTracker; }; + void setCloseError(err_t e) const { _errorTracker->setCloseError(e);} public: AsyncClient* prev; @@ -148,7 +204,7 @@ class AsyncClient { #endif void close(bool now = false); void stop(); - int8_t abort(); + void abort(); bool free(); bool canSend();//ack is not pending @@ -157,7 +213,10 @@ class AsyncClient { bool send();//send all data added with the method above size_t ack(size_t len); //ack data that you have not acked using the method below void ackLater(){ _ack_pcb = false; } //will not ack the current packet. Call from onData - + bool isRecvPush(){ return !!(_recv_pbuf_flags & PBUF_FLAG_PUSH); } +#if DEBUG_ESP_ASYNC_TCP + size_t getConnectionId(void) const { return _errorTracker->getConnectionId();} +#endif #if ASYNC_TCP_SSL_ENABLED SSL *getSSL(); #endif @@ -197,13 +256,13 @@ class AsyncClient { void onPacket(AcPacketHandler cb, void* arg = 0); //data received void onTimeout(AcTimeoutHandler cb, void* arg = 0); //ack timeout void onPoll(AcConnectHandler cb, void* arg = 0); //every 125ms when connected - void ackPacket(struct pbuf * pb); - const char * errorToString(int8_t error); + const char * errorToString(err_t error); const char * stateToString(); - err_t _recv(tcp_pcb* pcb, pbuf* pb, err_t err); + void _recv(std::shared_ptr& closeAbort, tcp_pcb* pcb, pbuf* pb, err_t err); + err_t getCloseError(void) const { return _errorTracker->getCloseError();} }; #if ASYNC_TCP_SSL_ENABLED @@ -211,6 +270,7 @@ typedef std::function AcSSl struct pending_pcb; #endif + class AsyncServer { protected: uint16_t _port; @@ -225,6 +285,9 @@ class AsyncServer { AcSSlFileHandler _file_cb; void* _file_cb_arg; #endif +#ifdef DEBUG_MORE + int _event_count[EE_MAX]; +#endif public: @@ -241,10 +304,15 @@ class AsyncServer { void setNoDelay(bool nodelay); bool getNoDelay(); uint8_t status(); - +#ifdef DEBUG_MORE + int getEventCount(size_t ee) const { return _event_count[ee];} +#endif protected: err_t _accept(tcp_pcb* newpcb, err_t err); static err_t _s_accept(void *arg, tcp_pcb* newpcb, err_t err); +#ifdef DEBUG_MORE + int incEventCount(size_t ee) { return ++_event_count[ee];} +#endif #if ASYNC_TCP_SSL_ENABLED int _cert(const char *filename, uint8_t **buf); err_t _poll(tcp_pcb* pcb); diff --git a/src/ESPAsyncTCPbuffer.cpp b/src/ESPAsyncTCPbuffer.cpp index 8bf44fb..d2261da 100644 --- a/src/ESPAsyncTCPbuffer.cpp +++ b/src/ESPAsyncTCPbuffer.cpp @@ -1,541 +1,555 @@ -/** - * @file ESPAsyncTCPbuffer.cpp - * @date 22.01.2016 - * @author Markus Sattler - * - * Copyright (c) 2015 Markus Sattler. All rights reserved. - * This file is part of the Asynv TCP for ESP. - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA - * - */ - - -#include -#include - -#include "ESPAsyncTCPbuffer.h" - - -AsyncTCPbuffer::AsyncTCPbuffer(AsyncClient* client) { - if(client == NULL) { - DEBUG_ASYNC_TCP("[A-TCP] client is null!!!\n"); - panic(); - } - - _client = client; - _TXbufferWrite = new cbuf(1460); - _TXbufferRead = _TXbufferWrite; - _RXbuffer = new cbuf(100); - _RXmode = ATB_RX_MODE_FREE; - _rxSize = 0; - _rxTerminator = 0x00; - _rxReadBytesPtr = NULL; - _rxReadStringPtr = NULL; - _cbDisconnect = NULL; - - _cbRX = NULL; - _cbDone = NULL; - _attachCallbacks(); -} - -AsyncTCPbuffer::~AsyncTCPbuffer() { - if(_client) { - _client->close(); - } - - if(_RXbuffer) { - delete _RXbuffer; - _RXbuffer = NULL; - } - - if(_TXbufferWrite) { - // will be deleted in _TXbufferRead chain - _TXbufferWrite = NULL; - } - - if(_TXbufferRead) { - cbuf * next = _TXbufferRead->next; - delete _TXbufferRead; - while(next != NULL) { - _TXbufferRead = next; - next = _TXbufferRead->next; - delete _TXbufferRead; - } - _TXbufferRead = NULL; - } -} - -size_t AsyncTCPbuffer::write(String & data) { - return write(data.c_str(), data.length()); -} - -size_t AsyncTCPbuffer::write(uint8_t data) { - return write(&data, 1); -} - -size_t AsyncTCPbuffer::write(const char* data) { - return write((const uint8_t *) data, strlen(data)); -} - -size_t AsyncTCPbuffer::write(const char *data, size_t len) { - return write((const uint8_t *) data, len); -} - -/** - * write data in to buffer and try to send the data - * @param data - * @param len - * @return - */ -size_t AsyncTCPbuffer::write(const uint8_t *data, size_t len) { - if(_TXbufferWrite == NULL || _client == NULL || !_client->connected() || data == NULL || len == 0) { - return 0; - } - - size_t bytesLeft = len; - while(bytesLeft) { - size_t w = _TXbufferWrite->write((const char*) data, bytesLeft); - bytesLeft -= w; - data += w; - _sendBuffer(); - - // add new buffer since we have more data - if(_TXbufferWrite->full() && bytesLeft > 0) { - - // to less ram!!! - if(ESP.getFreeHeap() < 4096) { - DEBUG_ASYNC_TCP("[A-TCP] run out of Heap can not send all Data!\n"); - return (len - bytesLeft); - } - - cbuf * next = new cbuf(1460); - - if(next == NULL) { - DEBUG_ASYNC_TCP("[A-TCP] run out of Heap!\n"); - panic(); - } else { - DEBUG_ASYNC_TCP("[A-TCP] new cbuf\n"); - } - - // add new buffer to chain (current cbuf) - _TXbufferWrite->next = next; - - // move ptr for next data - _TXbufferWrite = next; - } - } - - return len; - -} - -/** - * wait until all data has send out - */ -void AsyncTCPbuffer::flush() { - while(!_TXbufferWrite->empty()) { - while(!_client->canSend()) { - delay(0); - } - _sendBuffer(); - } -} - -void AsyncTCPbuffer::noCallback() { - _RXmode = ATB_RX_MODE_NONE; -} - -void AsyncTCPbuffer::readStringUntil(char terminator, String * str, AsyncTCPbufferDoneCb done) { - if(_client == NULL) { - return; - } - DEBUG_ASYNC_TCP("[A-TCP] readStringUntil terminator: %02X\n", terminator); - _RXmode = ATB_RX_MODE_NONE; - _cbDone = done; - _rxReadStringPtr = str; - _rxTerminator = terminator; - _rxSize = 0; - _RXmode = ATB_RX_MODE_TERMINATOR_STRING; -} - -/* - void AsyncTCPbuffer::readBytesUntil(char terminator, char *buffer, size_t length, AsyncTCPbufferDoneCb done) { - _RXmode = ATB_RX_MODE_NONE; - _cbDone = done; - _rxReadBytesPtr = (uint8_t *) buffer; - _rxTerminator = terminator; - _rxSize = length; - _RXmode = ATB_RX_MODE_TERMINATOR; - _handleRxBuffer(NULL, 0); - } - - void AsyncTCPbuffer::readBytesUntil(char terminator, uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done) { - readBytesUntil(terminator, (char *) buffer, length, done); - } - */ - -void AsyncTCPbuffer::readBytes(char *buffer, size_t length, AsyncTCPbufferDoneCb done) { - if(_client == NULL) { - return; - } - DEBUG_ASYNC_TCP("[A-TCP] readBytes length: %d\n", length); - _RXmode = ATB_RX_MODE_NONE; - _cbDone = done; - _rxReadBytesPtr = (uint8_t *) buffer; - _rxSize = length; - _RXmode = ATB_RX_MODE_READ_BYTES; -} - -void AsyncTCPbuffer::readBytes(uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done) { - readBytes((char *) buffer, length, done); -} - -void AsyncTCPbuffer::onData(AsyncTCPbufferDataCb cb) { - if(_client == NULL) { - return; - } - DEBUG_ASYNC_TCP("[A-TCP] onData\n"); - _RXmode = ATB_RX_MODE_NONE; - _cbDone = NULL; - _cbRX = cb; - _RXmode = ATB_RX_MODE_FREE; -} - -void AsyncTCPbuffer::onDisconnect(AsyncTCPbufferDisconnectCb cb) { - _cbDisconnect = cb; -} - -IPAddress AsyncTCPbuffer::remoteIP() { - if(!_client) { - return IPAddress(0U); - } - return _client->remoteIP(); -} - -uint16_t AsyncTCPbuffer::remotePort() { - if(!_client) { - return 0; - } - return _client->remotePort(); -} - -bool AsyncTCPbuffer::connected() { - if(!_client) { - return false; - } - return _client->connected(); -} - -void AsyncTCPbuffer::stop() { - - if(!_client) { - return; - } - _client->stop(); - _client = NULL; - - if(_cbDone) { - switch(_RXmode) { - case ATB_RX_MODE_READ_BYTES: - case ATB_RX_MODE_TERMINATOR: - case ATB_RX_MODE_TERMINATOR_STRING: - _RXmode = ATB_RX_MODE_NONE; - _cbDone(false, NULL); - break; - default: - break; - } - } - _RXmode = ATB_RX_MODE_NONE; -} - -void AsyncTCPbuffer::close() { - stop(); -} - - -///-------------------------------- - -/** - * attachCallbacks to AsyncClient class - */ -void AsyncTCPbuffer::_attachCallbacks() { - if(!_client) { - return; - } - DEBUG_ASYNC_TCP("[A-TCP] attachCallbacks\n"); - - _client->onPoll([](void *obj, AsyncClient* c) { - AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj)); - if((b->_TXbufferRead != NULL) && !b->_TXbufferRead->empty()) { - b->_sendBuffer(); - } - // if(!b->_RXbuffer->empty()) { - // b->_handleRxBuffer(NULL, 0); - // } - }, this); - - _client->onAck([](void *obj, AsyncClient* c, size_t len, uint32_t time) { - DEBUG_ASYNC_TCP("[A-TCP] onAck\n"); - ((AsyncTCPbuffer*)(obj))->_sendBuffer(); - }, this); - - _client->onDisconnect([](void *obj, AsyncClient* c) { - DEBUG_ASYNC_TCP("[A-TCP] onDisconnect\n"); - AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj)); - b->_client = NULL; - bool del = true; - if(b->_cbDisconnect) { - del = b->_cbDisconnect(b); - } - delete c; - if(del) { - delete b; - } - }, this); - - _client->onData([](void *obj, AsyncClient* c, void *buf, size_t len) { - AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj)); - b->_rxData((uint8_t *)buf, len); - }, this); - - _client->onTimeout([](void *obj, AsyncClient* c, uint32_t time){ - DEBUG_ASYNC_TCP("[A-TCP] onTimeout\n"); - c->close(); - }, this); - - DEBUG_ASYNC_TCP("[A-TCP] attachCallbacks Done.\n"); -} - -/** - * send TX buffer if possible - */ -void AsyncTCPbuffer::_sendBuffer() { - //DEBUG_ASYNC_TCP("[A-TCP] _sendBuffer...\n"); - size_t available = _TXbufferRead->available(); - if(available == 0 || _client == NULL || !_client->connected() || !_client->canSend()) { - return; - } - - while((_client->space() > 0) && (_TXbufferRead->available() > 0) && _client->canSend()) { - - available = _TXbufferRead->available(); - - if(available > _client->space()) { - available = _client->space(); - } - - char *out = new char[available]; - if(out == NULL) { - DEBUG_ASYNC_TCP("[A-TCP] to less heap, try later.\n"); - return; - } - - // read data from buffer - _TXbufferRead->peek(out, available); - - // send data - size_t send = _client->write((const char*) out, available); - if(send != available) { - DEBUG_ASYNC_TCP("[A-TCP] write failed send: %d available: %d \n", send, available); - } - - // remove really send data from buffer - _TXbufferRead->remove(send); - - // if buffer is empty and there is a other buffer in chain delete the empty one - if(_TXbufferRead->available() == 0 && _TXbufferRead->next != NULL) { - cbuf * old = _TXbufferRead; - _TXbufferRead = _TXbufferRead->next; - delete old; - DEBUG_ASYNC_TCP("[A-TCP] delete cbuf\n"); - } - - delete out; - } - -} - -/** - * called on incoming data - * @param buf - * @param len - */ -void AsyncTCPbuffer::_rxData(uint8_t *buf, size_t len) { - if(!_client || !_client->connected()) { - DEBUG_ASYNC_TCP("[A-TCP] not connected!\n"); - return; - } - if(!_RXbuffer) { - DEBUG_ASYNC_TCP("[A-TCP] _rxData no _RXbuffer!\n"); - return; - } - DEBUG_ASYNC_TCP("[A-TCP] _rxData len: %d RXmode: %d\n", len, _RXmode); - - size_t handled = 0; - - if(_RXmode != ATB_RX_MODE_NONE) { - handled = _handleRxBuffer((uint8_t *) buf, len); - buf += handled; - len -= handled; - - // handle as much as possible before using the buffer - if(_RXbuffer->empty()) { - while(_RXmode != ATB_RX_MODE_NONE && handled != 0 && len > 0) { - handled = _handleRxBuffer(buf, len); - buf += handled; - len -= handled; - } - } - } - - if(len > 0) { - - if(_RXbuffer->room() < len) { - // to less space - DEBUG_ASYNC_TCP("[A-TCP] _rxData buffer full try resize\n"); - _RXbuffer->resizeAdd((len + _RXbuffer->room())); - - if(_RXbuffer->room() < len) { - DEBUG_ASYNC_TCP("[A-TCP] _rxData buffer to full can only handle %d!!!\n", _RXbuffer->room()); - } - } - - _RXbuffer->write((const char *) (buf), len); - } - - if(!_RXbuffer->empty() && _RXmode != ATB_RX_MODE_NONE) { - // handle as much as possible data in buffer - handled = _handleRxBuffer(NULL, 0); - while(_RXmode != ATB_RX_MODE_NONE && handled != 0) { - handled = _handleRxBuffer(NULL, 0); - } - } - - // clean up ram - if(_RXbuffer->empty() && _RXbuffer->room() != 100) { - _RXbuffer->resize(100); - } - -} - -/** - * - */ -size_t AsyncTCPbuffer::_handleRxBuffer(uint8_t *buf, size_t len) { - if(!_client || !_client->connected() || _RXbuffer == NULL) { - return 0; - } - - DEBUG_ASYNC_TCP("[A-TCP] _handleRxBuffer len: %d RXmode: %d\n", len, _RXmode); - - size_t BufferAvailable = _RXbuffer->available(); - size_t r = 0; - - if(_RXmode == ATB_RX_MODE_NONE) { - return 0; - } else if(_RXmode == ATB_RX_MODE_FREE) { - if(_cbRX == NULL) { - return 0; - } - - if(BufferAvailable > 0) { - uint8_t * b = new uint8_t[BufferAvailable]; - _RXbuffer->peek((char *) b, BufferAvailable); - r = _cbRX(b, BufferAvailable); - _RXbuffer->remove(r); - } - - if(r == BufferAvailable && buf && (len > 0)) { - return _cbRX(buf, len); - } else { - return 0; - } - - } else if(_RXmode == ATB_RX_MODE_READ_BYTES) { - if(_rxReadBytesPtr == NULL || _cbDone == NULL) { - return 0; - } - - size_t newReadCount = 0; - - if(BufferAvailable) { - r = _RXbuffer->read((char *) _rxReadBytesPtr, _rxSize); - _rxSize -= r; - _rxReadBytesPtr += r; - } - - if(_RXbuffer->empty() && (len > 0) && buf) { - r = len; - if(r > _rxSize) { - r = _rxSize; - } - memcpy(_rxReadBytesPtr, buf, r); - _rxReadBytesPtr += r; - _rxSize -= r; - newReadCount += r; - } - - if(_rxSize == 0) { - _RXmode = ATB_RX_MODE_NONE; - _cbDone(true, NULL); - } - - // add left over bytes to Buffer - return newReadCount; - - } else if(_RXmode == ATB_RX_MODE_TERMINATOR) { - // TODO implement read terminator non string - - } else if(_RXmode == ATB_RX_MODE_TERMINATOR_STRING) { - if(_rxReadStringPtr == NULL || _cbDone == NULL) { - return 0; - } - - // handle Buffer - if(BufferAvailable > 0) { - while(!_RXbuffer->empty()) { - char c = _RXbuffer->read(); - if(c == _rxTerminator || c == 0x00) { - _RXmode = ATB_RX_MODE_NONE; - _cbDone(true, _rxReadStringPtr); - return 0; - } else { - (*_rxReadStringPtr) += c; - } - } - } - - if(_RXbuffer->empty() && (len > 0) && buf) { - size_t newReadCount = 0; - while(newReadCount < len) { - char c = (char) *buf; - buf++; - newReadCount++; - if(c == _rxTerminator || c == 0x00) { - _RXmode = ATB_RX_MODE_NONE; - _cbDone(true, _rxReadStringPtr); - return newReadCount; - } else { - (*_rxReadStringPtr) += c; - } - } - return newReadCount; - } - } - - return 0; -} +/** + * @file ESPAsyncTCPbuffer.cpp + * @date 22.01.2016 + * @author Markus Sattler + * + * Copyright (c) 2015 Markus Sattler. All rights reserved. + * This file is part of the Asynv TCP for ESP. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + + +#include +#include + +#include "ESPAsyncTCPbuffer.h" + + +AsyncTCPbuffer::AsyncTCPbuffer(AsyncClient* client) { + if(client == NULL) { + DEBUG_ASYNC_TCP("[A-TCP] client is null!!!\n"); + panic(); + } + + _client = client; + _TXbufferWrite = new (std::nothrow) cbuf(TCP_MSS); + _TXbufferRead = _TXbufferWrite; + _RXbuffer = new (std::nothrow) cbuf(100); + _RXmode = ATB_RX_MODE_FREE; + _rxSize = 0; + _rxTerminator = 0x00; + _rxReadBytesPtr = NULL; + _rxReadStringPtr = NULL; + _cbDisconnect = NULL; + + _cbRX = NULL; + _cbDone = NULL; + _attachCallbacks(); +} + +AsyncTCPbuffer::~AsyncTCPbuffer() { + if(_client) { + _client->close(); + } + + if(_RXbuffer) { + delete _RXbuffer; + _RXbuffer = NULL; + } + + if(_TXbufferWrite) { + // will be deleted in _TXbufferRead chain + _TXbufferWrite = NULL; + } + + if(_TXbufferRead) { + cbuf * next = _TXbufferRead->next; + delete _TXbufferRead; + while(next != NULL) { + _TXbufferRead = next; + next = _TXbufferRead->next; + delete _TXbufferRead; + } + _TXbufferRead = NULL; + } +} + +size_t AsyncTCPbuffer::write(String & data) { + return write(data.c_str(), data.length()); +} + +size_t AsyncTCPbuffer::write(uint8_t data) { + return write(&data, 1); +} + +size_t AsyncTCPbuffer::write(const char* data) { + return write((const uint8_t *) data, strlen(data)); +} + +size_t AsyncTCPbuffer::write(const char *data, size_t len) { + return write((const uint8_t *) data, len); +} + +/** + * write data in to buffer and try to send the data + * @param data + * @param len + * @return + */ +size_t AsyncTCPbuffer::write(const uint8_t *data, size_t len) { + if(_TXbufferWrite == NULL || _client == NULL || !_client->connected() || data == NULL || len == 0) { + return 0; + } + + size_t bytesLeft = len; + while(bytesLeft) { + size_t w = _TXbufferWrite->write((const char*) data, bytesLeft); + bytesLeft -= w; + data += w; + _sendBuffer(); + + // add new buffer since we have more data + if(_TXbufferWrite->full() && bytesLeft > 0) { + + // to less ram!!! + if(ESP.getFreeHeap() < 4096) { + DEBUG_ASYNC_TCP("[A-TCP] run out of Heap can not send all Data!\n"); + return (len - bytesLeft); + } + + cbuf * next = new (std::nothrow) cbuf(TCP_MSS); + if(next == NULL) { + DEBUG_ASYNC_TCP("[A-TCP] run out of Heap!\n"); + panic(); + } else { + DEBUG_ASYNC_TCP("[A-TCP] new cbuf\n"); + } + + // add new buffer to chain (current cbuf) + _TXbufferWrite->next = next; + + // move ptr for next data + _TXbufferWrite = next; + } + } + + return len; + +} + +/** + * wait until all data has send out + */ +void AsyncTCPbuffer::flush() { + while(!_TXbufferWrite->empty()) { + while(connected() && !_client->canSend()) { + delay(0); + } + if(!connected()) + return; + _sendBuffer(); + } +} + +void AsyncTCPbuffer::noCallback() { + _RXmode = ATB_RX_MODE_NONE; +} + +void AsyncTCPbuffer::readStringUntil(char terminator, String * str, AsyncTCPbufferDoneCb done) { + if(_client == NULL) { + return; + } + DEBUG_ASYNC_TCP("[A-TCP] readStringUntil terminator: %02X\n", terminator); + _RXmode = ATB_RX_MODE_NONE; + _cbDone = done; + _rxReadStringPtr = str; + _rxTerminator = terminator; + _rxSize = 0; + _RXmode = ATB_RX_MODE_TERMINATOR_STRING; +} + +/* + void AsyncTCPbuffer::readBytesUntil(char terminator, char *buffer, size_t length, AsyncTCPbufferDoneCb done) { + _RXmode = ATB_RX_MODE_NONE; + _cbDone = done; + _rxReadBytesPtr = (uint8_t *) buffer; + _rxTerminator = terminator; + _rxSize = length; + _RXmode = ATB_RX_MODE_TERMINATOR; + _handleRxBuffer(NULL, 0); + } + + void AsyncTCPbuffer::readBytesUntil(char terminator, uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done) { + readBytesUntil(terminator, (char *) buffer, length, done); + } + */ + +void AsyncTCPbuffer::readBytes(char *buffer, size_t length, AsyncTCPbufferDoneCb done) { + if(_client == NULL) { + return; + } + DEBUG_ASYNC_TCP("[A-TCP] readBytes length: %d\n", length); + _RXmode = ATB_RX_MODE_NONE; + _cbDone = done; + _rxReadBytesPtr = (uint8_t *) buffer; + _rxSize = length; + _RXmode = ATB_RX_MODE_READ_BYTES; +} + +void AsyncTCPbuffer::readBytes(uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done) { + readBytes((char *) buffer, length, done); +} + +void AsyncTCPbuffer::onData(AsyncTCPbufferDataCb cb) { + if(_client == NULL) { + return; + } + DEBUG_ASYNC_TCP("[A-TCP] onData\n"); + _RXmode = ATB_RX_MODE_NONE; + _cbDone = NULL; + _cbRX = cb; + _RXmode = ATB_RX_MODE_FREE; +} + +void AsyncTCPbuffer::onDisconnect(AsyncTCPbufferDisconnectCb cb) { + _cbDisconnect = cb; +} + +IPAddress AsyncTCPbuffer::remoteIP() { + if(!_client) { + return IPAddress(0U); + } + return _client->remoteIP(); +} + +uint16_t AsyncTCPbuffer::remotePort() { + if(!_client) { + return 0; + } + return _client->remotePort(); +} + +bool AsyncTCPbuffer::connected() { + if(!_client) { + return false; + } + return _client->connected(); +} + +void AsyncTCPbuffer::stop() { + + if(!_client) { + return; + } + _client->stop(); + _client = NULL; + + if(_cbDone) { + switch(_RXmode) { + case ATB_RX_MODE_READ_BYTES: + case ATB_RX_MODE_TERMINATOR: + case ATB_RX_MODE_TERMINATOR_STRING: + _RXmode = ATB_RX_MODE_NONE; + _cbDone(false, NULL); + break; + default: + break; + } + } + _RXmode = ATB_RX_MODE_NONE; +} + +void AsyncTCPbuffer::close() { + stop(); +} + + +///-------------------------------- + +/** + * attachCallbacks to AsyncClient class + */ +void AsyncTCPbuffer::_attachCallbacks() { + if(!_client) { + return; + } + DEBUG_ASYNC_TCP("[A-TCP] attachCallbacks\n"); + + _client->onPoll([](void *obj, AsyncClient* c) { + (void)c; + AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj)); + if((b->_TXbufferRead != NULL) && !b->_TXbufferRead->empty()) { + b->_sendBuffer(); + } + // if(!b->_RXbuffer->empty()) { + // b->_handleRxBuffer(NULL, 0); + // } + }, this); + + _client->onAck([](void *obj, AsyncClient* c, size_t len, uint32_t time) { + (void)c; + (void)len; + (void)time; + DEBUG_ASYNC_TCP("[A-TCP] onAck\n"); + ((AsyncTCPbuffer*)(obj))->_sendBuffer(); + }, this); + + _client->onDisconnect([](void *obj, AsyncClient* c) { + DEBUG_ASYNC_TCP("[A-TCP] onDisconnect\n"); + AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj)); + b->_client = NULL; + bool del = true; + if(b->_cbDisconnect) { + del = b->_cbDisconnect(b); + } + delete c; + if(del) { + delete b; + } + }, this); + + _client->onData([](void *obj, AsyncClient* c, void *buf, size_t len) { + (void)c; + AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj)); + b->_rxData((uint8_t *)buf, len); + }, this); + + _client->onTimeout([](void *obj, AsyncClient* c, uint32_t time){ + (void)obj; + (void)time; + DEBUG_ASYNC_TCP("[A-TCP] onTimeout\n"); + c->close(); + }, this); + + DEBUG_ASYNC_TCP("[A-TCP] attachCallbacks Done.\n"); +} + +/** + * send TX buffer if possible + */ +void AsyncTCPbuffer::_sendBuffer() { + //DEBUG_ASYNC_TCP("[A-TCP] _sendBuffer...\n"); + size_t available = _TXbufferRead->available(); + if(available == 0 || _client == NULL || !_client->connected() || !_client->canSend()) { + return; + } + + while(connected() && (_client->space() > 0) && (_TXbufferRead->available() > 0) && _client->canSend()) { + + available = _TXbufferRead->available(); + + if(available > _client->space()) { + available = _client->space(); + } + + char *out = new (std::nothrow) char[available]; + if(out == NULL) { + DEBUG_ASYNC_TCP("[A-TCP] to less heap, try later.\n"); + return; + } + + // read data from buffer + _TXbufferRead->peek(out, available); + + // send data + size_t send = _client->write((const char*) out, available); + if(send != available) { + DEBUG_ASYNC_TCP("[A-TCP] write failed send: %d available: %d \n", send, available); + if(!connected()) { + DEBUG_ASYNC_TCP("[A-TCP] incomplete transfer, connection lost.\n"); + } + } + + // remove really send data from buffer + _TXbufferRead->remove(send); + + // if buffer is empty and there is a other buffer in chain delete the empty one + if(_TXbufferRead->available() == 0 && _TXbufferRead->next != NULL) { + cbuf * old = _TXbufferRead; + _TXbufferRead = _TXbufferRead->next; + delete old; + DEBUG_ASYNC_TCP("[A-TCP] delete cbuf\n"); + } + + delete out; + } + +} + +/** + * called on incoming data + * @param buf + * @param len + */ +void AsyncTCPbuffer::_rxData(uint8_t *buf, size_t len) { + if(!_client || !_client->connected()) { + DEBUG_ASYNC_TCP("[A-TCP] not connected!\n"); + return; + } + if(!_RXbuffer) { + DEBUG_ASYNC_TCP("[A-TCP] _rxData no _RXbuffer!\n"); + return; + } + DEBUG_ASYNC_TCP("[A-TCP] _rxData len: %d RXmode: %d\n", len, _RXmode); + + size_t handled = 0; + + if(_RXmode != ATB_RX_MODE_NONE) { + handled = _handleRxBuffer((uint8_t *) buf, len); + buf += handled; + len -= handled; + + // handle as much as possible before using the buffer + if(_RXbuffer->empty()) { + while(_RXmode != ATB_RX_MODE_NONE && handled != 0 && len > 0) { + handled = _handleRxBuffer(buf, len); + buf += handled; + len -= handled; + } + } + } + + if(len > 0) { + + if(_RXbuffer->room() < len) { + // to less space + DEBUG_ASYNC_TCP("[A-TCP] _rxData buffer full try resize\n"); + _RXbuffer->resizeAdd((len + _RXbuffer->room())); + + if(_RXbuffer->room() < len) { + DEBUG_ASYNC_TCP("[A-TCP] _rxData buffer to full can only handle %d!!!\n", _RXbuffer->room()); + } + } + + _RXbuffer->write((const char *) (buf), len); + } + + if(!_RXbuffer->empty() && _RXmode != ATB_RX_MODE_NONE) { + // handle as much as possible data in buffer + handled = _handleRxBuffer(NULL, 0); + while(_RXmode != ATB_RX_MODE_NONE && handled != 0) { + handled = _handleRxBuffer(NULL, 0); + } + } + + // clean up ram + if(_RXbuffer->empty() && _RXbuffer->room() != 100) { + _RXbuffer->resize(100); + } + +} + +/** + * + */ +size_t AsyncTCPbuffer::_handleRxBuffer(uint8_t *buf, size_t len) { + if(!_client || !_client->connected() || _RXbuffer == NULL) { + return 0; + } + + DEBUG_ASYNC_TCP("[A-TCP] _handleRxBuffer len: %d RXmode: %d\n", len, _RXmode); + + size_t BufferAvailable = _RXbuffer->available(); + size_t r = 0; + + if(_RXmode == ATB_RX_MODE_NONE) { + return 0; + } else if(_RXmode == ATB_RX_MODE_FREE) { + if(_cbRX == NULL) { + return 0; + } + + if(BufferAvailable > 0) { + uint8_t * b = new (std::nothrow) uint8_t[BufferAvailable]; + if(b == NULL){ + panic(); //TODO: What action should this be ? + } + _RXbuffer->peek((char *) b, BufferAvailable); + r = _cbRX(b, BufferAvailable); + _RXbuffer->remove(r); + } + + if(r == BufferAvailable && buf && (len > 0)) { + return _cbRX(buf, len); + } else { + return 0; + } + + } else if(_RXmode == ATB_RX_MODE_READ_BYTES) { + if(_rxReadBytesPtr == NULL || _cbDone == NULL) { + return 0; + } + + size_t newReadCount = 0; + + if(BufferAvailable) { + r = _RXbuffer->read((char *) _rxReadBytesPtr, _rxSize); + _rxSize -= r; + _rxReadBytesPtr += r; + } + + if(_RXbuffer->empty() && (len > 0) && buf) { + r = len; + if(r > _rxSize) { + r = _rxSize; + } + memcpy(_rxReadBytesPtr, buf, r); + _rxReadBytesPtr += r; + _rxSize -= r; + newReadCount += r; + } + + if(_rxSize == 0) { + _RXmode = ATB_RX_MODE_NONE; + _cbDone(true, NULL); + } + + // add left over bytes to Buffer + return newReadCount; + + } else if(_RXmode == ATB_RX_MODE_TERMINATOR) { + // TODO implement read terminator non string + + } else if(_RXmode == ATB_RX_MODE_TERMINATOR_STRING) { + if(_rxReadStringPtr == NULL || _cbDone == NULL) { + return 0; + } + + // handle Buffer + if(BufferAvailable > 0) { + while(!_RXbuffer->empty()) { + char c = _RXbuffer->read(); + if(c == _rxTerminator || c == 0x00) { + _RXmode = ATB_RX_MODE_NONE; + _cbDone(true, _rxReadStringPtr); + return 0; + } else { + (*_rxReadStringPtr) += c; + } + } + } + + if(_RXbuffer->empty() && (len > 0) && buf) { + size_t newReadCount = 0; + while(newReadCount < len) { + char c = (char) *buf; + buf++; + newReadCount++; + if(c == _rxTerminator || c == 0x00) { + _RXmode = ATB_RX_MODE_NONE; + _cbDone(true, _rxReadStringPtr); + return newReadCount; + } else { + (*_rxReadStringPtr) += c; + } + } + return newReadCount; + } + } + + return 0; +} diff --git a/src/ESPAsyncTCPbuffer.h b/src/ESPAsyncTCPbuffer.h index 6cbe8dc..08a57c7 100644 --- a/src/ESPAsyncTCPbuffer.h +++ b/src/ESPAsyncTCPbuffer.h @@ -1,118 +1,118 @@ -/** - * @file ESPAsyncTCPbuffer.h - * @date 22.01.2016 - * @author Markus Sattler - * - * Copyright (c) 2015 Markus Sattler. All rights reserved. - * This file is part of the Asynv TCP for ESP. - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA - * - */ - -#ifndef ESPASYNCTCPBUFFER_H_ -#define ESPASYNCTCPBUFFER_H_ - -//#define DEBUG_ASYNC_TCP(...) while(((U0S >> USTXC) & 0x7F) != 0x00); os_printf( __VA_ARGS__ ); while(((U0S >> USTXC) & 0x7F) != 0x00) - -#ifndef DEBUG_ASYNC_TCP -#define DEBUG_ASYNC_TCP(...) -#endif - -#include -#include - -#include "ESPAsyncTCP.h" - - - -typedef enum { - ATB_RX_MODE_NONE, - ATB_RX_MODE_FREE, - ATB_RX_MODE_READ_BYTES, - ATB_RX_MODE_TERMINATOR, - ATB_RX_MODE_TERMINATOR_STRING -} atbRxMode_t; - -class AsyncTCPbuffer: public Print { - - public: - - typedef std::function AsyncTCPbufferDataCb; - typedef std::function AsyncTCPbufferDoneCb; - typedef std::function AsyncTCPbufferDisconnectCb; - - AsyncTCPbuffer(AsyncClient* c); - virtual ~AsyncTCPbuffer(); - - size_t write(String & data); - size_t write(uint8_t data); - size_t write(const char* data); - size_t write(const char *data, size_t len); - size_t write(const uint8_t *data, size_t len); - - void flush(); - - void noCallback(); - - void readStringUntil(char terminator, String * str, AsyncTCPbufferDoneCb done); - - // TODO implement read terminator non string - //void readBytesUntil(char terminator, char *buffer, size_t length, AsyncTCPbufferDoneCb done); - //void readBytesUntil(char terminator, uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done); - - void readBytes(char *buffer, size_t length, AsyncTCPbufferDoneCb done); - void readBytes(uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done); - - // TODO implement - // void setTimeout(size_t timeout); - - void onData(AsyncTCPbufferDataCb cb); - void onDisconnect(AsyncTCPbufferDisconnectCb cb); - - IPAddress remoteIP(); - uint16_t remotePort(); - IPAddress localIP(); - uint16_t localPort(); - - bool connected(); - - void stop(); - void close(); - - protected: - AsyncClient* _client; - cbuf * _TXbufferRead; - cbuf * _TXbufferWrite; - cbuf * _RXbuffer; - atbRxMode_t _RXmode; - size_t _rxSize; - char _rxTerminator; - uint8_t * _rxReadBytesPtr; - String * _rxReadStringPtr; - - AsyncTCPbufferDataCb _cbRX; - AsyncTCPbufferDoneCb _cbDone; - AsyncTCPbufferDisconnectCb _cbDisconnect; - - void _attachCallbacks(); - void _sendBuffer(); - void _on_close(); - void _rxData(uint8_t *buf, size_t len); - size_t _handleRxBuffer(uint8_t *buf, size_t len); - -}; - -#endif /* ESPASYNCTCPBUFFER_H_ */ +/** + * @file ESPAsyncTCPbuffer.h + * @date 22.01.2016 + * @author Markus Sattler + * + * Copyright (c) 2015 Markus Sattler. All rights reserved. + * This file is part of the Asynv TCP for ESP. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#ifndef ESPASYNCTCPBUFFER_H_ +#define ESPASYNCTCPBUFFER_H_ + +//#define DEBUG_ASYNC_TCP(...) while(((U0S >> USTXC) & 0x7F) != 0x00); os_printf( __VA_ARGS__ ); while(((U0S >> USTXC) & 0x7F) != 0x00) +//#define DEBUG_ASYNC_TCP ASYNC_TCP_DEBUG +#ifndef DEBUG_ASYNC_TCP +#define DEBUG_ASYNC_TCP(...) +#endif + +#include +#include + +#include "ESPAsyncTCP.h" + + + +typedef enum { + ATB_RX_MODE_NONE, + ATB_RX_MODE_FREE, + ATB_RX_MODE_READ_BYTES, + ATB_RX_MODE_TERMINATOR, + ATB_RX_MODE_TERMINATOR_STRING +} atbRxMode_t; + +class AsyncTCPbuffer: public Print { + + public: + + typedef std::function AsyncTCPbufferDataCb; + typedef std::function AsyncTCPbufferDoneCb; + typedef std::function AsyncTCPbufferDisconnectCb; + + AsyncTCPbuffer(AsyncClient* c); + virtual ~AsyncTCPbuffer(); + + size_t write(String & data); + size_t write(uint8_t data); + size_t write(const char* data); + size_t write(const char *data, size_t len); + size_t write(const uint8_t *data, size_t len); + + void flush(); + + void noCallback(); + + void readStringUntil(char terminator, String * str, AsyncTCPbufferDoneCb done); + + // TODO implement read terminator non string + //void readBytesUntil(char terminator, char *buffer, size_t length, AsyncTCPbufferDoneCb done); + //void readBytesUntil(char terminator, uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done); + + void readBytes(char *buffer, size_t length, AsyncTCPbufferDoneCb done); + void readBytes(uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done); + + // TODO implement + // void setTimeout(size_t timeout); + + void onData(AsyncTCPbufferDataCb cb); + void onDisconnect(AsyncTCPbufferDisconnectCb cb); + + IPAddress remoteIP(); + uint16_t remotePort(); + IPAddress localIP(); + uint16_t localPort(); + + bool connected(); + + void stop(); + void close(); + + protected: + AsyncClient* _client; + cbuf * _TXbufferRead; + cbuf * _TXbufferWrite; + cbuf * _RXbuffer; + atbRxMode_t _RXmode; + size_t _rxSize; + char _rxTerminator; + uint8_t * _rxReadBytesPtr; + String * _rxReadStringPtr; + + AsyncTCPbufferDataCb _cbRX; + AsyncTCPbufferDoneCb _cbDone; + AsyncTCPbufferDisconnectCb _cbDisconnect; + + void _attachCallbacks(); + void _sendBuffer(); + void _on_close(); + void _rxData(uint8_t *buf, size_t len); + size_t _handleRxBuffer(uint8_t *buf, size_t len); + +}; + +#endif /* ESPASYNCTCPBUFFER_H_ */ diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index df9489f..8335358 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -18,29 +18,62 @@ License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ -#include "SyncClient.h" #include "Arduino.h" +#include "SyncClient.h" #include "ESPAsyncTCP.h" #include "cbuf.h" +#include + +#define DEBUG_ESP_SYNC_CLIENT +#if defined(DEBUG_ESP_SYNC_CLIENT) && !defined(SYNC_CLIENT_DEBUG) +#define SYNC_CLIENT_DEBUG( format, ...) DEBUG_GENERIC_P("[SYNC_CLIENT]", format, ##__VA_ARGS__) +#endif +#ifndef SYNC_CLIENT_DEBUG +#define SYNC_CLIENT_DEBUG(...) do { (void)0;} while(false) +#endif +/* + Without LWIP_NETIF_TX_SINGLE_PBUF, all tcp_writes default to "no copy". + Referenced data must be preserved and free-ed from the specified tcp_sent() + callback. Alternative, tcp_writes need to use the TCP_WRITE_FLAG_COPY + attribute. +*/ +static_assert(LWIP_NETIF_TX_SINGLE_PBUF, "Required, tcp_write() must always copy."); SyncClient::SyncClient(size_t txBufLen) : _client(NULL) , _tx_buffer(NULL) , _tx_buffer_size(txBufLen) , _rx_buffer(NULL) -{} + , _ref(NULL) +{ + ref(); +} SyncClient::SyncClient(AsyncClient *client, size_t txBufLen) : _client(client) - , _tx_buffer(new cbuf(txBufLen)) + , _tx_buffer(new (std::nothrow) cbuf(txBufLen)) , _tx_buffer_size(txBufLen) , _rx_buffer(NULL) + , _ref(NULL) { - _attachCallbacks(); + if(ref() > 0 && _client != NULL) + _attachCallbacks(); } SyncClient::~SyncClient(){ + if (0 == unref()) + _release(); +} + +void SyncClient::_release(){ + if(_client != NULL){ + _client->onData(NULL, NULL); + _client->onAck(NULL, NULL); + _client->onPoll(NULL, NULL); + _client->abort(); + _client = NULL; + } if(_tx_buffer != NULL){ cbuf *b = _tx_buffer; _tx_buffer = NULL; @@ -53,14 +86,43 @@ SyncClient::~SyncClient(){ } } +int SyncClient::ref(){ + if(_ref == NULL){ + _ref = new (std::nothrow) int; + if(_ref != NULL) + *_ref = 0; + else + return -1; + } + return (++*_ref); +} + +int SyncClient::unref(){ + int count = -1; + if (_ref != NULL) { + count = --*_ref; + if (0 == count) { + delete _ref; + _ref = NULL; + } + } + return count; +} + #if ASYNC_TCP_SSL_ENABLED -int SyncClient::connect(IPAddress ip, uint16_t port, bool secure){ +int SyncClient::_connect(const IPAddress& ip, uint16_t port, bool secure){ #else -int SyncClient::connect(IPAddress ip, uint16_t port){ +int SyncClient::_connect(const IPAddress& ip, uint16_t port){ #endif - if(_client != NULL && connected()) + if(connected()) + return 0; + if(_client != NULL) + delete _client; + + _client = new (std::nothrow) AsyncClient(); + if (_client == NULL) return 0; - _client = new AsyncClient(); + _client->onConnect([](void *obj, AsyncClient *c){ ((SyncClient*)(obj))->_onConnect(c); }, this); _attachCallbacks_Disconnect(); #if ASYNC_TCP_SSL_ENABLED @@ -80,10 +142,15 @@ int SyncClient::connect(const char *host, uint16_t port, bool secure){ #else int SyncClient::connect(const char *host, uint16_t port){ #endif - if(_client != NULL && connected()){ + if(connected()) return 0; - } - _client = new AsyncClient(); + if(_client != NULL) + delete _client; + + _client = new (std::nothrow) AsyncClient(); + if (_client == NULL) + return 0; + _client->onConnect([](void *obj, AsyncClient *c){ ((SyncClient*)(obj))->_onConnect(c); }, this); _attachCallbacks_Disconnect(); #if ASYNC_TCP_SSL_ENABLED @@ -97,7 +164,43 @@ int SyncClient::connect(const char *host, uint16_t port){ } return 0; } +//#define SYNCCLIENT_NEW_OPERATOR_EQUAL +#ifdef SYNCCLIENT_NEW_OPERATOR_EQUAL +/* + New behavior for operator= + + Allow for the object to be placed on a queue and transfered to a new container + with buffers still in tact. Avoiding receive data drops. Transfers rx and tx + buffers. Supports return by value. + + Note, this is optional, the old behavior is the default. + +*/ +SyncClient & SyncClient::operator=(const SyncClient &other){ + int *rhsref = other._ref; + ++*rhsref; // Just in case the left and right side are the same object with different containers + if (0 == unref()) + _release(); + _ref = other._ref; + ref(); + --*rhsref; + // Why do I not test _tx_buffer for != NULL and free? + // I allow for the lh target container, to be a copy of an active + // connection. Thus we are just reusing the container. + // The above unref() handles releaseing the previous client of the container. + _tx_buffer_size = other._tx_buffer_size; + _tx_buffer = other._tx_buffer; + _client = other._client; + if (_client != NULL && _tx_buffer == NULL) + _tx_buffer = new (std::nothrow) cbuf(_tx_buffer_size); + _rx_buffer = other._rx_buffer; + if(_client) + _attachCallbacks(); + return *this; +} +#else // ! SYNCCLIENT_NEW_OPERATOR_EQUAL +// This is the origianl logic with null checks SyncClient & SyncClient::operator=(const SyncClient &other){ if(_client != NULL){ _client->abort(); @@ -115,11 +218,16 @@ SyncClient & SyncClient::operator=(const SyncClient &other){ _rx_buffer = b->next; delete b; } - _tx_buffer = new cbuf(other._tx_buffer_size); + if(other._client != NULL) + _tx_buffer = new (std::nothrow) cbuf(other._tx_buffer_size); + _client = other._client; - _attachCallbacks(); + if(_client) + _attachCallbacks(); + return *this; } +#endif void SyncClient::setTimeout(uint32_t seconds){ if(_client != NULL) @@ -136,19 +244,26 @@ uint8_t SyncClient::connected(){ return (_client != NULL && _client->connected()); } -void SyncClient::stop(){ +bool SyncClient::stop(unsigned int maxWaitMs){ + (void)maxWaitMs; if(_client != NULL) _client->close(true); + return true; } size_t SyncClient::_sendBuffer(){ + if(_client == NULL || _tx_buffer == NULL) + return 0; size_t available = _tx_buffer->available(); if(!connected() || !_client->canSend() || available == 0) return 0; size_t sendable = _client->space(); if(sendable < available) available= sendable; - char *out = new char[available]; + char *out = new (std::nothrow) char[available]; + if(out == NULL) + return 0; + _tx_buffer->read(out, available); size_t sent = _client->write(out, available); delete[] out; @@ -157,7 +272,7 @@ size_t SyncClient::_sendBuffer(){ void SyncClient::_onData(void *data, size_t len){ _client->ackLater(); - cbuf *b = new cbuf(len+1); + cbuf *b = new (std::nothrow) cbuf(len+1); if(b != NULL){ b->write((const char *)data, len); if(_rx_buffer == NULL) @@ -168,6 +283,13 @@ void SyncClient::_onData(void *data, size_t len){ p = p->next; p->next = b; } + } else { + // We ran out of memory. This fail causes lost receive data. + // The connection should be closed in a manner that conveys something + // bad/abnormal has happened to the connection. Hence, we abort the + // connection to avoid possible data corruption. + // Note, callbacks maybe called. + _client->abort(); } } @@ -189,7 +311,7 @@ void SyncClient::_onConnect(AsyncClient *c){ _tx_buffer = NULL; delete b; } - _tx_buffer = new cbuf(_tx_buffer_size); + _tx_buffer = new (std::nothrow) cbuf(_tx_buffer_size); _attachCallbacks_AfterConnected(); } @@ -199,9 +321,9 @@ void SyncClient::_attachCallbacks(){ } void SyncClient::_attachCallbacks_AfterConnected(){ - _client->onAck([](void *obj, AsyncClient* c, size_t len, uint32_t time){ ((SyncClient*)(obj))->_sendBuffer(); }, this); - _client->onData([](void *obj, AsyncClient* c, void *data, size_t len){ ((SyncClient*)(obj))->_onData(data, len); }, this); - _client->onTimeout([](void *obj, AsyncClient* c, uint32_t time){ c->close(); }, this); + _client->onAck([](void *obj, AsyncClient* c, size_t len, uint32_t time){ (void)c; (void)len; (void)time; ((SyncClient*)(obj))->_sendBuffer(); }, this); + _client->onData([](void *obj, AsyncClient* c, void *data, size_t len){ (void)c; ((SyncClient*)(obj))->_onData(data, len); }, this); + _client->onTimeout([](void *obj, AsyncClient* c, uint32_t time){ (void)obj; (void)time; c->close(); }, this); } void SyncClient::_attachCallbacks_Disconnect(){ @@ -221,13 +343,15 @@ size_t SyncClient::write(const uint8_t *data, size_t len){ while(_tx_buffer->room() < toSend){ toWrite = _tx_buffer->room(); _tx_buffer->write((const char*)data, toWrite); - while(!_client->canSend() && connected()) + while(connected() && !_client->canSend()) delay(0); + if(!connected()) + return 0; _sendBuffer(); toSend -= toWrite; } _tx_buffer->write((const char*)(data+(len - toSend)), toSend); - if(_client->canSend() && connected()) + if(connected() && _client->canSend()) _sendBuffer(); return len; } @@ -275,12 +399,16 @@ int SyncClient::read(){ return res; } -void SyncClient::flush(){ +bool SyncClient::flush(unsigned int maxWaitMs){ + (void)maxWaitMs; if(_tx_buffer == NULL || !connected()) - return; + return false; if(_tx_buffer->available()){ - while(!_client->canSend() && connected()) + while(connected() && !_client->canSend()) delay(0); + if(_client == NULL || _tx_buffer == NULL) + return false; _sendBuffer(); } + return true; } diff --git a/src/SyncClient.h b/src/SyncClient.h index e46f4e4..cb568de 100644 --- a/src/SyncClient.h +++ b/src/SyncClient.h @@ -23,6 +23,12 @@ #define SYNCCLIENT_H_ #include "Client.h" +// Needed for Arduino core releases prior to 2.5.0, because of changes +// made to accommodate Arduino core 2.5.0 +// CONST was 1st defined in Core 2.5.0 in IPAddress.h +#ifndef CONST +#define CONST +#endif #include class cbuf; class AsyncClient; @@ -33,6 +39,7 @@ class SyncClient: public Client { cbuf *_tx_buffer; size_t _tx_buffer_size; cbuf *_rx_buffer; + int *_ref; size_t _sendBuffer(); void _onData(void *data, size_t len); @@ -41,34 +48,55 @@ class SyncClient: public Client { void _attachCallbacks(); void _attachCallbacks_Disconnect(); void _attachCallbacks_AfterConnected(); + void _release(); public: - SyncClient(size_t txBufLen = 1460); - SyncClient(AsyncClient *client, size_t txBufLen = 1460); + SyncClient(size_t txBufLen = TCP_MSS); + SyncClient(AsyncClient *client, size_t txBufLen = TCP_MSS); virtual ~SyncClient(); + int ref(); + int unref(); operator bool(){ return connected(); } SyncClient & operator=(const SyncClient &other); #if ASYNC_TCP_SSL_ENABLED - int connect(IPAddress ip, uint16_t port, bool secure); + int _connect(const IPAddress& ip, uint16_t port, bool secure); + int connect(CONST IPAddress& ip, uint16_t port, bool secure){ + return _connect(ip, port, secure); + } + int connect(IPAddress ip, uint16_t port, bool secure){ + return _connect(reinterpret_cast(ip), port, secure); + } int connect(const char *host, uint16_t port, bool secure); + int connect(CONST IPAddress& ip, uint16_t port){ + return _connect(ip, port, false); + } int connect(IPAddress ip, uint16_t port){ - return connect(ip, port, false); + return _connect(reinterpret_cast(ip), port, false); } int connect(const char *host, uint16_t port){ return connect(host, port, false); } #else - int connect(IPAddress ip, uint16_t port); + int _connect(const IPAddress& ip, uint16_t port); + int connect(CONST IPAddress& ip, uint16_t port){ + return _connect(ip, port); + } + int connect(IPAddress ip, uint16_t port){ + return _connect(reinterpret_cast(ip), port); + } int connect(const char *host, uint16_t port); #endif void setTimeout(uint32_t seconds); uint8_t status(); uint8_t connected(); - void stop(); + bool stop(unsigned int maxWaitMs); + bool flush(unsigned int maxWaitMs); + void stop() { (void)stop(0);} + void flush() { (void)flush(0);} size_t write(uint8_t data); size_t write(const uint8_t *data, size_t len); @@ -76,7 +104,6 @@ class SyncClient: public Client { int peek(); int read(); int read(uint8_t *data, size_t len); - void flush(); }; #endif /* SYNCCLIENT_H_ */ diff --git a/src/async_config.h b/src/async_config.h index 61cfc25..ca6912f 100644 --- a/src/async_config.h +++ b/src/async_config.h @@ -5,7 +5,34 @@ #define ASYNC_TCP_SSL_ENABLED 0 #endif -#define ASYNC_TCP_DEBUG(...) //ets_printf(__VA_ARGS__) -#define TCP_SSL_DEBUG(...) //ets_printf(__VA_ARGS__) +#ifndef TCP_MSS +// May have been definded as a -DTCP_MSS option on the compile line or not. +// Arduino core 2.3.0 or earlier does not do the -DTCP_MSS option. +// Later versions may set this option with info from board.txt. +// However, Core 2.4.0 and up board.txt does not define TCP_MSS for lwIP v1.4 +#define TCP_MSS (1460) +#endif + +// #define ASYNC_TCP_DEBUG(...) ets_printf(__VA_ARGS__) +// #define TCP_SSL_DEBUG(...) ets_printf(__VA_ARGS__) +// #define ASYNC_TCP_ASSERT( a ) do{ if(!(a)){ets_printf("ASSERT: %s %u \n", __FILE__, __LINE__);}}while(0) + +// Starting with Arduino Core 2.4.0 and up the define of DEBUG_ESP_PORT +// can be handled through the Arduino IDE Board options instead of here. +// #define DEBUG_ESP_PORT Serial + +// #define DEBUG_ESP_ASYNC_TCP 1 +// #define DEBUG_ESP_TCP_SSL 1 +#include + +#ifndef ASYNC_TCP_ASSERT +#define ASYNC_TCP_ASSERT(...) do { (void)0;} while(false) +#endif +#ifndef ASYNC_TCP_DEBUG +#define ASYNC_TCP_DEBUG(...) do { (void)0;} while(false) +#endif +#ifndef TCP_SSL_DEBUG +#define TCP_SSL_DEBUG(...) do { (void)0;} while(false) +#endif #endif /* LIBRARIES_ESPASYNCTCP_SRC_ASYNC_CONFIG_H_ */ diff --git a/src/tcp_axtls.c b/src/tcp_axtls.c old mode 100755 new mode 100644 index 1e2f324..cdbdf41 --- a/src/tcp_axtls.c +++ b/src/tcp_axtls.c @@ -31,6 +31,7 @@ #include #include #include +#include #include uint8_t * default_private_key = NULL; diff --git a/src/tcp_axtls.h b/src/tcp_axtls.h old mode 100755 new mode 100644 From 56d026ba5f8c5d05b59b4aa7bda4584e7e74b5d4 Mon Sep 17 00:00:00 2001 From: M Hightower <27247790+mhightower83@users.noreply.github.com> Date: Tue, 16 Jul 2019 10:20:12 -0700 Subject: [PATCH 2/2] moved DebugPrintMacros.h to the correct loccation. --- DebugPrintMacros.h => src/DebugPrintMacros.h | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename DebugPrintMacros.h => src/DebugPrintMacros.h (100%) diff --git a/DebugPrintMacros.h b/src/DebugPrintMacros.h similarity index 100% rename from DebugPrintMacros.h rename to src/DebugPrintMacros.h