From 52ed558d456ccbe12bff9aac84610a95e568d7d1 Mon Sep 17 00:00:00 2001 From: sima Date: Tue, 30 Jan 2024 20:10:15 -0800 Subject: [PATCH 1/5] boostification of ws connected signal --- src/cpp/proxy/testwebsocket.cpp | 2 +- src/cpp/proxy/websocketoverhttp.cpp | 2 +- src/cpp/proxy/wsproxysession.cpp | 2144 ++++++++++++++------------- src/cpp/runner/service.cpp | 7 +- src/cpp/websocket.h | 6 +- src/cpp/zwebsocket.cpp | 2 +- 6 files changed, 1093 insertions(+), 1070 deletions(-) diff --git a/src/cpp/proxy/testwebsocket.cpp b/src/cpp/proxy/testwebsocket.cpp index 782ea290..4399b7ed 100644 --- a/src/cpp/proxy/testwebsocket.cpp +++ b/src/cpp/proxy/testwebsocket.cpp @@ -115,7 +115,7 @@ public slots: } } - emit q->connected(); + q->connected(); if(gripEnabled && !channels.isEmpty()) QMetaObject::invokeMethod(q, "readyRead", Qt::QueuedConnection); diff --git a/src/cpp/proxy/websocketoverhttp.cpp b/src/cpp/proxy/websocketoverhttp.cpp index 4d10f39d..d19b2ad9 100644 --- a/src/cpp/proxy/websocketoverhttp.cpp +++ b/src/cpp/proxy/websocketoverhttp.cpp @@ -837,7 +837,7 @@ class WebSocketOverHttp::Private : public QObject if(emitConnected) { - emit q->connected(); + q->connected(); if(!self) return; } diff --git a/src/cpp/proxy/wsproxysession.cpp b/src/cpp/proxy/wsproxysession.cpp index cccc2a28..01913f8e 100644 --- a/src/cpp/proxy/wsproxysession.cpp +++ b/src/cpp/proxy/wsproxysession.cpp @@ -50,6 +50,10 @@ #define ACTIVITY_TIMEOUT 60000 #define KEEPALIVE_RAND_MAX 1000 +struct WSConnections { + Connection connectedConnection; +}; + struct WSProxyConnections { Connection sendEventReceivedConnection; Connection keepAliveSetupEventReceivedConnection; @@ -62,1173 +66,1183 @@ struct WSProxyConnections { class HttpExtension { public: - bool isNull() const { return name.isEmpty(); } + bool isNull() const { return name.isEmpty(); } - QByteArray name; - QHash params; + QByteArray name; + QHash params; }; static int findNext(const QByteArray &in, const char *charList, int start = 0) { - int len = qstrlen(charList); - for(int n = start; n < in.size(); ++n) - { - char c = in[n]; - for(int i = 0; i < len; ++i) - { - if(c == charList[i]) - return n; - } - } - - return -1; + int len = qstrlen(charList); + for(int n = start; n < in.size(); ++n) + { + char c = in[n]; + for(int i = 0; i < len; ++i) + { + if(c == charList[i]) + return n; + } + } + + return -1; } static QHash parseParams(const QByteArray &in, bool *ok = 0) { - QHash out; - - int start = 0; - while(start < in.size()) - { - QByteArray var; - QByteArray val; - - int at = findNext(in, "=;", start); - if(at != -1) - { - var = in.mid(start, at - start).trimmed(); - if(in[at] == '=') - { - if(at + 1 >= in.size()) - { - if(ok) - *ok = false; - return QHash(); - } - - ++at; - - if(in[at] == '\"') - { - ++at; - - bool complete = false; - for(int n = at; n < in.size(); ++n) - { - if(in[n] == '\\') - { - if(n + 1 >= in.size()) - { - if(ok) - *ok = false; - return QHash(); - } - - ++n; - val += in[n]; - } - else if(in[n] == '\"') - { - complete = true; - at = n + 1; - break; - } - else - val += in[n]; - } - - if(!complete) - { - if(ok) - *ok = false; - return QHash(); - } - - at = in.indexOf(';', at); - if(at != -1) - start = at + 1; - else - start = in.size(); - } - else - { - int vstart = at; - at = in.indexOf(';', vstart); - if(at != -1) - { - val = in.mid(vstart, at - vstart).trimmed(); - start = at + 1; - } - else - { - val = in.mid(vstart).trimmed(); - start = in.size(); - } - } - } - else - start = at + 1; - } - else - { - var = in.mid(start).trimmed(); - start = in.size(); - } - - out[var] = val; - } - - if(ok) - *ok = true; - - return out; + QHash out; + + int start = 0; + while(start < in.size()) + { + QByteArray var; + QByteArray val; + + int at = findNext(in, "=;", start); + if(at != -1) + { + var = in.mid(start, at - start).trimmed(); + if(in[at] == '=') + { + if(at + 1 >= in.size()) + { + if(ok) + *ok = false; + return QHash(); + } + + ++at; + + if(in[at] == '\"') + { + ++at; + + bool complete = false; + for(int n = at; n < in.size(); ++n) + { + if(in[n] == '\\') + { + if(n + 1 >= in.size()) + { + if(ok) + *ok = false; + return QHash(); + } + + ++n; + val += in[n]; + } + else if(in[n] == '\"') + { + complete = true; + at = n + 1; + break; + } + else + val += in[n]; + } + + if(!complete) + { + if(ok) + *ok = false; + return QHash(); + } + + at = in.indexOf(';', at); + if(at != -1) + start = at + 1; + else + start = in.size(); + } + else + { + int vstart = at; + at = in.indexOf(';', vstart); + if(at != -1) + { + val = in.mid(vstart, at - vstart).trimmed(); + start = at + 1; + } + else + { + val = in.mid(vstart).trimmed(); + start = in.size(); + } + } + } + else + start = at + 1; + } + else + { + var = in.mid(start).trimmed(); + start = in.size(); + } + + out[var] = val; + } + + if(ok) + *ok = true; + + return out; } static QByteArray getExtensionRaw(const QList &extStrings, const QByteArray &name) { - foreach(const QByteArray &ext, extStrings) - { - int at = ext.indexOf(';'); - if(at != -1) - { - if(ext.mid(0, at).trimmed() == name) - return ext; - } - else - { - if(ext == name) - return ext; - } - } - - return QByteArray(); + foreach(const QByteArray &ext, extStrings) + { + int at = ext.indexOf(';'); + if(at != -1) + { + if(ext.mid(0, at).trimmed() == name) + return ext; + } + else + { + if(ext == name) + return ext; + } + } + + return QByteArray(); } static HttpExtension getExtension(const QList &extStrings, const QByteArray &name) { - QByteArray ext = getExtensionRaw(extStrings, name); - if(ext.isNull()) - return HttpExtension(); - - HttpExtension e; - e.name = name; - - int at = ext.indexOf(';'); - if(at != -1) - { - bool ok; - e.params = parseParams(ext.mid(at + 1), &ok); - if(!ok) - return HttpExtension(); - } - - return e; + QByteArray ext = getExtensionRaw(extStrings, name); + if(ext.isNull()) + return HttpExtension(); + + HttpExtension e; + e.name = name; + + int at = ext.indexOf(';'); + if(at != -1) + { + bool ok; + e.params = parseParams(ext.mid(at + 1), &ok); + if(!ok) + return HttpExtension(); + } + + return e; } class WsProxySession::Private : public QObject { - Q_OBJECT + Q_OBJECT public: - enum State - { - Idle, - Connecting, - Connected, - Closing - }; - - typedef QPair QueuedFrame; - - WsProxySession *q; - State state; - ZRoutes *zroutes; - ZhttpManager *zhttpManager; - ConnectionManager *connectionManager; - StatsManager *statsManager; - WsControlManager *wsControlManager; - WsControlSession *wsControl; - DomainMap::Entry route; - bool debug; - QByteArray defaultSigIss; - Jwt::EncodingKey defaultSigKey; - Jwt::DecodingKey defaultUpstreamKey; - bool passToUpstream; - bool acceptXForwardedProtocol; - bool useXForwardedProto; - bool useXForwardedProtocol; - XffRule xffRule; - XffRule xffTrustedRule; - QList origHeadersNeedMark; - bool acceptPushpinRoute; - QByteArray cdnLoop; - HttpRequestData requestData; - bool trustedClient; - QHostAddress logicalClientAddress; - QByteArray sigIss; - Jwt::EncodingKey sigKey; - WebSocket *inSock; - WebSocket *outSock; - QList inPendingFrames; // true means we should ack a send event - int outReadInProgress; // frame type or -1 - QByteArray pathBeg; - QByteArray channelPrefix; - QList targets; - DomainMap::Target target; - QHostAddress clientAddress; - bool acceptGripMessages; - QByteArray messagePrefix; - bool detached; - QDateTime activityTime; - QByteArray publicCid; - RTimer *keepAliveTimer; - WsControl::KeepAliveMode keepAliveMode; - int keepAliveTimeout; - QList queuedInFrames; // frames to deliver after out read finishes - LogUtil::Config logConfig; - Callback> finishedByPassthroughCallback; - Connection keepAliveConneciton; - Connection aboutToSendRequestConnection; - map wsProxyConnectionMap; - - Private(WsProxySession *_q, ZRoutes *_zroutes, ConnectionManager *_connectionManager, const LogUtil::Config &_logConfig, StatsManager *_statsManager, WsControlManager *_wsControlManager) : - QObject(_q), - q(_q), - state(Idle), - zroutes(_zroutes), - zhttpManager(0), - connectionManager(_connectionManager), - statsManager(_statsManager), - wsControlManager(_wsControlManager), - wsControl(0), - debug(false), - passToUpstream(false), - acceptXForwardedProtocol(false), - useXForwardedProto(false), - useXForwardedProtocol(false), - acceptPushpinRoute(false), - trustedClient(false), - inSock(0), - outSock(0), - outReadInProgress(-1), - acceptGripMessages(false), - detached(false), - keepAliveTimer(0), - keepAliveMode(WsControl::NoKeepAlive), - keepAliveTimeout(0), - logConfig(_logConfig) - { - } - - ~Private() - { - cleanup(); - } - - void cleanup() - { - cleanupKeepAliveTimer(); - - cleanupInSock(); - - delete outSock; - outSock = 0; - - wsProxyConnectionMap.erase(wsControl); - delete wsControl; - wsControl = 0; - - if(zhttpManager) - { - zroutes->removeRef(zhttpManager); - zhttpManager = 0; - } - } - - void cleanupInSock() - { - if(inSock) - { - connectionManager->removeConnection(inSock); - delete inSock; - inSock = 0; - } - } - - void cleanupKeepAliveTimer() - { - if(keepAliveTimer) - { - keepAliveConneciton.disconnect(); - keepAliveTimer->setParent(0); - keepAliveTimer->deleteLater(); - keepAliveTimer = 0; - } - } - - void start(WebSocket *sock, const QByteArray &_publicCid, const DomainMap::Entry &entry) - { - assert(!inSock); - - state = Connecting; - - publicCid = _publicCid; - - if(statsManager) - activityTime = QDateTime::currentDateTimeUtc(); - - inSock = sock; - inSock->setParent(this); - connect(inSock, &WebSocket::readyRead, this, &Private::in_readyRead); - connect(inSock, &WebSocket::framesWritten, this, &Private::in_framesWritten); - connect(inSock, &WebSocket::writeBytesChanged, this, &Private::in_writeBytesChanged); - connect(inSock, &WebSocket::peerClosed, this, &Private::in_peerClosed); - connect(inSock, &WebSocket::closed, this, &Private::in_closed); - connect(inSock, &WebSocket::error, this, &Private::in_error); - - requestData.uri = inSock->requestUri(); - requestData.headers = inSock->requestHeaders(); - - trustedClient = ProxyUtil::checkTrustedClient("wsproxysession", q, requestData, defaultUpstreamKey); - - logicalClientAddress = ProxyUtil::getLogicalAddress(requestData.headers, trustedClient ? xffTrustedRule : xffRule, inSock->peerAddress()); - - QString host = requestData.uri.host(); - - route = entry; - - log_debug("wsproxysession: %p %s has %d routes", q, qPrintable(host), route.targets.count()); - - if(route.isNull()) - { - reject(false, 502, "Bad Gateway", QString("No route for host: %1").arg(host)); - return; - } - - incCounter(Stats::ClientHeaderBytesReceived, ZhttpManager::estimateRequestHeaderBytes("GET", requestData.uri, requestData.headers)); - - if(!route.asHost.isEmpty()) - ProxyUtil::applyHost(&requestData.uri, route.asHost); - - QByteArray path = requestData.uri.path(QUrl::FullyEncoded).toUtf8(); - - if(route.pathRemove > 0) - path = path.mid(route.pathRemove); - - if(!route.pathPrepend.isEmpty()) - path = route.pathPrepend + path; - - requestData.uri.setPath(QString::fromUtf8(path), QUrl::StrictMode); - - sigIss = defaultSigIss; - sigKey = defaultSigKey; - - if(!route.sigIss.isEmpty()) - sigIss = route.sigIss; - - if(!route.sigKey.isNull()) - sigKey = route.sigKey; - - pathBeg = route.pathBeg; - channelPrefix = route.prefix; - targets = route.targets; - - foreach(const HttpHeader &h, route.headers) - { - requestData.headers.removeAll(h.first); - if(!h.second.isEmpty()) - requestData.headers += HttpHeader(h.first, h.second); - } - - clientAddress = inSock->peerAddress(); - - ProxyUtil::manipulateRequestHeaders("wsproxysession", q, &requestData, trustedClient, route, sigIss, sigKey, acceptXForwardedProtocol, useXForwardedProto, useXForwardedProtocol, xffTrustedRule, xffRule, origHeadersNeedMark, acceptPushpinRoute, cdnLoop, clientAddress, InspectData(), route.grip, false); - - // don't proxy extensions, as we may not know how to handle them - requestData.headers.removeAll("Sec-WebSocket-Extensions"); - - if(route.grip) - { - // send grip extension - requestData.headers += HttpHeader("Sec-WebSocket-Extensions", "grip"); - } - - if(trustedClient || !route.grip) - passToUpstream = true; - - tryNextTarget(); - } - - void writeInFrame(const WebSocket::Frame &frame, bool fromSendEvent = false) - { - inPendingFrames += fromSendEvent; - - inSock->writeFrame(frame); - - incCounter(Stats::ClientContentBytesSent, frame.data.size()); - if(!frame.more) - incCounter(Stats::ClientMessagesSent); - } - - void tryNextTarget() - { - if(targets.isEmpty()) - { - QString msg = "Error while proxying to origin."; - - QStringList targetStrs; - foreach(const DomainMap::Target &t, route.targets) - targetStrs += ProxyUtil::targetToString(t); - QString dmsg = QString("Unable to connect to any targets. Tried: %1").arg(targetStrs.join(", ")); - - reject(true, 502, "Bad Gateway", msg, dmsg); - return; - } - - target = targets.takeFirst(); + enum State + { + Idle, + Connecting, + Connected, + Closing + }; + + typedef QPair QueuedFrame; + + WsProxySession *q; + State state; + ZRoutes *zroutes; + ZhttpManager *zhttpManager; + ConnectionManager *connectionManager; + StatsManager *statsManager; + WsControlManager *wsControlManager; + WsControlSession *wsControl; + DomainMap::Entry route; + bool debug; + QByteArray defaultSigIss; + Jwt::EncodingKey defaultSigKey; + Jwt::DecodingKey defaultUpstreamKey; + bool passToUpstream; + bool acceptXForwardedProtocol; + bool useXForwardedProto; + bool useXForwardedProtocol; + XffRule xffRule; + XffRule xffTrustedRule; + QList origHeadersNeedMark; + bool acceptPushpinRoute; + QByteArray cdnLoop; + HttpRequestData requestData; + bool trustedClient; + QHostAddress logicalClientAddress; + QByteArray sigIss; + Jwt::EncodingKey sigKey; + WebSocket *inSock; + WebSocket *outSock; + QList inPendingFrames; // true means we should ack a send event + int outReadInProgress; // frame type or -1 + QByteArray pathBeg; + QByteArray channelPrefix; + QList targets; + DomainMap::Target target; + QHostAddress clientAddress; + bool acceptGripMessages; + QByteArray messagePrefix; + bool detached; + QDateTime activityTime; + QByteArray publicCid; + RTimer *keepAliveTimer; + WsControl::KeepAliveMode keepAliveMode; + int keepAliveTimeout; + QList queuedInFrames; // frames to deliver after out read finishes + LogUtil::Config logConfig; + Callback> finishedByPassthroughCallback; + Connection keepAliveConneciton; + Connection aboutToSendRequestConnection; + map wsProxyConnectionMap; + map outWSConnectionMap; + + Private(WsProxySession *_q, ZRoutes *_zroutes, ConnectionManager *_connectionManager, const LogUtil::Config &_logConfig, StatsManager *_statsManager, WsControlManager *_wsControlManager) : + QObject(_q), + q(_q), + state(Idle), + zroutes(_zroutes), + zhttpManager(0), + connectionManager(_connectionManager), + statsManager(_statsManager), + wsControlManager(_wsControlManager), + wsControl(0), + debug(false), + passToUpstream(false), + acceptXForwardedProtocol(false), + useXForwardedProto(false), + useXForwardedProtocol(false), + acceptPushpinRoute(false), + trustedClient(false), + inSock(0), + outSock(0), + outReadInProgress(-1), + acceptGripMessages(false), + detached(false), + keepAliveTimer(0), + keepAliveMode(WsControl::NoKeepAlive), + keepAliveTimeout(0), + logConfig(_logConfig) + { + } + + ~Private() + { + cleanup(); + } + + void cleanup() + { + cleanupKeepAliveTimer(); + + cleanupInSock(); + + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + + wsProxyConnectionMap.erase(wsControl); + delete wsControl; + wsControl = 0; + + if(zhttpManager) + { + zroutes->removeRef(zhttpManager); + zhttpManager = 0; + } + } + + void cleanupInSock() + { + if(inSock) + { + connectionManager->removeConnection(inSock); + delete inSock; + inSock = 0; + } + } + + void cleanupKeepAliveTimer() + { + if(keepAliveTimer) + { + keepAliveConneciton.disconnect(); + keepAliveTimer->setParent(0); + keepAliveTimer->deleteLater(); + keepAliveTimer = 0; + } + } + + void start(WebSocket *sock, const QByteArray &_publicCid, const DomainMap::Entry &entry) + { + assert(!inSock); + + state = Connecting; + + publicCid = _publicCid; + + if(statsManager) + activityTime = QDateTime::currentDateTimeUtc(); + + inSock = sock; + inSock->setParent(this); + connect(inSock, &WebSocket::readyRead, this, &Private::in_readyRead); + connect(inSock, &WebSocket::framesWritten, this, &Private::in_framesWritten); + connect(inSock, &WebSocket::writeBytesChanged, this, &Private::in_writeBytesChanged); + connect(inSock, &WebSocket::peerClosed, this, &Private::in_peerClosed); + connect(inSock, &WebSocket::closed, this, &Private::in_closed); + connect(inSock, &WebSocket::error, this, &Private::in_error); + + requestData.uri = inSock->requestUri(); + requestData.headers = inSock->requestHeaders(); + + trustedClient = ProxyUtil::checkTrustedClient("wsproxysession", q, requestData, defaultUpstreamKey); + + logicalClientAddress = ProxyUtil::getLogicalAddress(requestData.headers, trustedClient ? xffTrustedRule : xffRule, inSock->peerAddress()); + + QString host = requestData.uri.host(); + + route = entry; + + log_debug("wsproxysession: %p %s has %d routes", q, qPrintable(host), route.targets.count()); + + if(route.isNull()) + { + reject(false, 502, "Bad Gateway", QString("No route for host: %1").arg(host)); + return; + } + + incCounter(Stats::ClientHeaderBytesReceived, ZhttpManager::estimateRequestHeaderBytes("GET", requestData.uri, requestData.headers)); + + if(!route.asHost.isEmpty()) + ProxyUtil::applyHost(&requestData.uri, route.asHost); + + QByteArray path = requestData.uri.path(QUrl::FullyEncoded).toUtf8(); + + if(route.pathRemove > 0) + path = path.mid(route.pathRemove); - QUrl uri = requestData.uri; - if(target.ssl) - uri.setScheme("wss"); - else - uri.setScheme("ws"); + if(!route.pathPrepend.isEmpty()) + path = route.pathPrepend + path; - if(!target.host.isEmpty()) - ProxyUtil::applyHost(&uri, target.host); + requestData.uri.setPath(QString::fromUtf8(path), QUrl::StrictMode); + + sigIss = defaultSigIss; + sigKey = defaultSigKey; + + if(!route.sigIss.isEmpty()) + sigIss = route.sigIss; + + if(!route.sigKey.isNull()) + sigKey = route.sigKey; + + pathBeg = route.pathBeg; + channelPrefix = route.prefix; + targets = route.targets; + + foreach(const HttpHeader &h, route.headers) + { + requestData.headers.removeAll(h.first); + if(!h.second.isEmpty()) + requestData.headers += HttpHeader(h.first, h.second); + } + + clientAddress = inSock->peerAddress(); + + ProxyUtil::manipulateRequestHeaders("wsproxysession", q, &requestData, trustedClient, route, sigIss, sigKey, acceptXForwardedProtocol, useXForwardedProto, useXForwardedProtocol, xffTrustedRule, xffRule, origHeadersNeedMark, acceptPushpinRoute, cdnLoop, clientAddress, InspectData(), route.grip, false); + + // don't proxy extensions, as we may not know how to handle them + requestData.headers.removeAll("Sec-WebSocket-Extensions"); + + if(route.grip) + { + // send grip extension + requestData.headers += HttpHeader("Sec-WebSocket-Extensions", "grip"); + } + + if(trustedClient || !route.grip) + passToUpstream = true; + + tryNextTarget(); + } + + void writeInFrame(const WebSocket::Frame &frame, bool fromSendEvent = false) + { + inPendingFrames += fromSendEvent; + + inSock->writeFrame(frame); + + incCounter(Stats::ClientContentBytesSent, frame.data.size()); + if(!frame.more) + incCounter(Stats::ClientMessagesSent); + } + + void tryNextTarget() + { + if(targets.isEmpty()) + { + QString msg = "Error while proxying to origin."; + + QStringList targetStrs; + foreach(const DomainMap::Target &t, route.targets) + targetStrs += ProxyUtil::targetToString(t); + QString dmsg = QString("Unable to connect to any targets. Tried: %1").arg(targetStrs.join(", ")); + + reject(true, 502, "Bad Gateway", msg, dmsg); + return; + } - if(zhttpManager) - { - zroutes->removeRef(zhttpManager); - zhttpManager = 0; - } - - if(target.type == DomainMap::Target::Test) - { - // for test route, auto-adjust path - if(!pathBeg.isEmpty()) - { - int pathRemove = pathBeg.length(); - if(pathBeg.endsWith('/')) - --pathRemove; - - if(pathRemove > 0) - uri.setPath(uri.path(QUrl::FullyEncoded).mid(pathRemove)); - } - - outSock = new TestWebSocket(this); - } - else - { - if(target.type == DomainMap::Target::Custom) - { - zhttpManager = zroutes->managerForRoute(target.zhttpRoute); - log_debug("wsproxysession: %p forwarding to %s", q, qPrintable(target.zhttpRoute.baseSpec)); - } - else // Default - { - zhttpManager = zroutes->defaultManager(); - log_debug("wsproxysession: %p forwarding to %s:%d", q, qPrintable(target.connectHost), target.connectPort); - } - - zroutes->addRef(zhttpManager); - - if(target.overHttp) - { - WebSocketOverHttp *woh = new WebSocketOverHttp(zhttpManager, this); - - woh->setConnectionId(publicCid); - - if(target.oneEvent) - woh->setMaxEventsPerRequest(1); - - aboutToSendRequestConnection = woh->aboutToSendRequest.connect(boost::bind(&Private::out_aboutToSendRequest, this, woh)); - outSock = woh; - } - else - { - // websockets don't work with zhttp req mode - if(zhttpManager->clientUsesReq()) - { - reject(false, 502, "Bad Gateway", "Error while proxying to origin.", "WebSockets cannot be used with zhttpreq target"); - return; - } - - outSock = zhttpManager->createSocket(); - outSock->setParent(this); - } - } - - connect(outSock, &WebSocket::connected, this, &Private::out_connected); - connect(outSock, &WebSocket::readyRead, this, &Private::out_readyRead); - connect(outSock, &WebSocket::writeBytesChanged, this, &Private::out_writeBytesChanged); - connect(outSock, &WebSocket::peerClosed, this, &Private::out_peerClosed); - connect(outSock, &WebSocket::closed, this, &Private::out_closed); - connect(outSock, &WebSocket::error, this, &Private::out_error); - - if(target.trusted) - outSock->setIgnorePolicies(true); - - if(target.trustConnectHost) - outSock->setTrustConnectHost(true); - - if(target.insecure) - outSock->setIgnoreTlsErrors(true); - - if(target.type == DomainMap::Target::Default) - { - outSock->setConnectHost(target.connectHost); - outSock->setConnectPort(target.connectPort); - } - - ProxyUtil::applyHostHeader(&requestData.headers, uri); - - incCounter(Stats::ServerHeaderBytesSent, ZhttpManager::estimateRequestHeaderBytes("GET", uri, requestData.headers)); - - outSock->start(uri, requestData.headers); - } - - void reject(bool proxied, int code, const QByteArray &reason, const HttpHeaders &headers, const QByteArray &body) - { - assert(state == Connecting); - - state = Closing; - inSock->respondError(code, reason, headers, body); - - incCounter(Stats::ClientHeaderBytesSent, ZhttpManager::estimateResponseHeaderBytes(code, reason, headers)); - - logConnection(proxied, code, body.size()); - } - - void reject(bool proxied, int code, const QString &reason, const QString &errorMessage, const QString &debugErrorMessage) - { - QString msg = debug ? debugErrorMessage : errorMessage; - - reject(proxied, code, reason.toUtf8(), HttpHeaders(), (msg + '\n').toUtf8()); - } - - void reject(bool proxied, int code, const QString &reason, const QString &errorMessage) - { - reject(proxied, code, reason, errorMessage, errorMessage); - } - - void tryReadIn() - { - while(inSock->framesAvailable() > 0 && ((outSock && outSock->writeBytesAvailable() > 0) || detached)) - { - WebSocket::Frame f = inSock->readFrame(); - - tryLogActivity(); - - incCounter(Stats::ClientContentBytesReceived, f.data.size()); - if(!f.more) - incCounter(Stats::ClientMessagesReceived); - - if(detached) - continue; - - outSock->writeFrame(f); - - incCounter(Stats::ServerContentBytesSent, f.data.size()); - if(!f.more) - incCounter(Stats::ServerMessagesSent); - } - } - - void tryReadOut() - { - while(outSock->framesAvailable() > 0 && ((inSock && inSock->writeBytesAvailable() > 0) || detached)) - { - WebSocket::Frame f = outSock->readFrame(); - - tryLogActivity(); - - incCounter(Stats::ServerContentBytesReceived, f.data.size()); - if(!f.more) - incCounter(Stats::ServerMessagesReceived); - - if(detached && outReadInProgress == -1) - continue; - - if(f.type == WebSocket::Frame::Text || f.type == WebSocket::Frame::Binary || f.type == WebSocket::Frame::Continuation) - { - // we are skipping the rest of this message - if(f.type == WebSocket::Frame::Continuation && outReadInProgress == -1) - continue; - - if(f.type != WebSocket::Frame::Continuation) - outReadInProgress = (int)f.type; - - if(wsControl && acceptGripMessages) - { - if(f.type == WebSocket::Frame::Text && f.data.startsWith("c:")) - { - // grip messages must only be one frame - if(!f.more) - wsControl->sendGripMessage(f.data.mid(2)); // process - else - outReadInProgress = -1; // ignore rest of message - } - else if(f.type != WebSocket::Frame::Continuation) - { - if(f.data.startsWith(messagePrefix)) - { - f.data = f.data.mid(messagePrefix.size()); - writeInFrame(f); - - adjustKeepAlive(); - } - else - { - log_debug("wsproxysession: dropping unprefixed message"); - } - } - else if(f.type == WebSocket::Frame::Continuation) - { - assert(outReadInProgress != -1); - - writeInFrame(f); - - adjustKeepAlive(); - } - } - else - { - writeInFrame(f); - - adjustKeepAlive(); - } - - if(!f.more) - outReadInProgress = -1; - } - else - { - // always relay non-content frames - writeInFrame(f); - - adjustKeepAlive(); - } - - if(outReadInProgress == -1 && !queuedInFrames.isEmpty()) - { - foreach(const QueuedFrame &i, queuedInFrames) - writeInFrame(i.first, i.second); - } - } - } - - void tryFinish() - { - if(!inSock && !outSock) - { - cleanup(); - finishedByPassthroughCallback.call({q}); - } - } - - void tryLogActivity() - { - if(statsManager && !activityTime.isNull()) - { - QDateTime now = QDateTime::currentDateTimeUtc(); - if(now >= activityTime.addMSecs(ACTIVITY_TIMEOUT)) - { - statsManager->addActivity(route.id); - - activityTime = activityTime.addMSecs((activityTime.msecsTo(now) / ACTIVITY_TIMEOUT) * ACTIVITY_TIMEOUT); - } - } - } - - void logConnection(bool proxied, int responseCode, int responseBodySize) - { - LogUtil::RequestData rd; - - // only log route id if explicitly set - if(route.separateStats) - rd.routeId = route.id; - - if(responseCode != -1) - { - rd.status = LogUtil::Response; - rd.responseData.code = responseCode; - rd.responseBodySize = responseBodySize; - } - else - { - rd.status = LogUtil::Error; - } - - rd.requestData.method = "GET"; - rd.requestData.uri = inSock->requestUri(); - rd.requestData.headers = inSock->requestHeaders(); - - if(proxied) - { - rd.targetStr = ProxyUtil::targetToString(target); - rd.targetOverHttp = target.overHttp; - } - - rd.fromAddress = logicalClientAddress; - - LogUtil::logRequest(LOG_LEVEL_INFO, rd, logConfig); - } - - void setupKeepAlive() - { - if(keepAliveTimeout >= 0) - { - int timeout = keepAliveTimeout * 1000; - timeout = qMax(timeout - (int)(QRandomGenerator::global()->generate() % KEEPALIVE_RAND_MAX), 0); - keepAliveTimer->start(timeout); - } - } - - void adjustKeepAlive() - { - // if idle mode, restart the timer. else leave alone - if(keepAliveTimer && keepAliveMode == WsControl::Idle) - setupKeepAlive(); - } - - void incCounter(Stats::Counter c, int count = 1) - { - if(statsManager) - statsManager->incCounter(route.statsRoute(), c, count); - } + target = targets.takeFirst(); + + QUrl uri = requestData.uri; + if(target.ssl) + uri.setScheme("wss"); + else + uri.setScheme("ws"); + + if(!target.host.isEmpty()) + ProxyUtil::applyHost(&uri, target.host); + + if(zhttpManager) + { + zroutes->removeRef(zhttpManager); + zhttpManager = 0; + } + + if(target.type == DomainMap::Target::Test) + { + // for test route, auto-adjust path + if(!pathBeg.isEmpty()) + { + int pathRemove = pathBeg.length(); + if(pathBeg.endsWith('/')) + --pathRemove; + + if(pathRemove > 0) + uri.setPath(uri.path(QUrl::FullyEncoded).mid(pathRemove)); + } + + outSock = new TestWebSocket(this); + } + else + { + if(target.type == DomainMap::Target::Custom) + { + zhttpManager = zroutes->managerForRoute(target.zhttpRoute); + log_debug("wsproxysession: %p forwarding to %s", q, qPrintable(target.zhttpRoute.baseSpec)); + } + else // Default + { + zhttpManager = zroutes->defaultManager(); + log_debug("wsproxysession: %p forwarding to %s:%d", q, qPrintable(target.connectHost), target.connectPort); + } + + zroutes->addRef(zhttpManager); + + if(target.overHttp) + { + WebSocketOverHttp *woh = new WebSocketOverHttp(zhttpManager, this); + + woh->setConnectionId(publicCid); + + if(target.oneEvent) + woh->setMaxEventsPerRequest(1); + + aboutToSendRequestConnection = woh->aboutToSendRequest.connect(boost::bind(&Private::out_aboutToSendRequest, this, woh)); + outSock = woh; + } + else + { + // websockets don't work with zhttp req mode + if(zhttpManager->clientUsesReq()) + { + reject(false, 502, "Bad Gateway", "Error while proxying to origin.", "WebSockets cannot be used with zhttpreq target"); + return; + } + + outSock = zhttpManager->createSocket(); + outSock->setParent(this); + } + } + outWSConnectionMap[outSock] = { + outSock->connected.connect(boost::bind(&Private::out_connected, this)) + }; + connect(outSock, &WebSocket::readyRead, this, &Private::out_readyRead); + connect(outSock, &WebSocket::writeBytesChanged, this, &Private::out_writeBytesChanged); + connect(outSock, &WebSocket::peerClosed, this, &Private::out_peerClosed); + connect(outSock, &WebSocket::closed, this, &Private::out_closed); + connect(outSock, &WebSocket::error, this, &Private::out_error); + + if(target.trusted) + outSock->setIgnorePolicies(true); + + if(target.trustConnectHost) + outSock->setTrustConnectHost(true); + + if(target.insecure) + outSock->setIgnoreTlsErrors(true); + + if(target.type == DomainMap::Target::Default) + { + outSock->setConnectHost(target.connectHost); + outSock->setConnectPort(target.connectPort); + } + + ProxyUtil::applyHostHeader(&requestData.headers, uri); + + incCounter(Stats::ServerHeaderBytesSent, ZhttpManager::estimateRequestHeaderBytes("GET", uri, requestData.headers)); + + outSock->start(uri, requestData.headers); + } + + void reject(bool proxied, int code, const QByteArray &reason, const HttpHeaders &headers, const QByteArray &body) + { + assert(state == Connecting); + + state = Closing; + inSock->respondError(code, reason, headers, body); + + incCounter(Stats::ClientHeaderBytesSent, ZhttpManager::estimateResponseHeaderBytes(code, reason, headers)); + + logConnection(proxied, code, body.size()); + } + + void reject(bool proxied, int code, const QString &reason, const QString &errorMessage, const QString &debugErrorMessage) + { + QString msg = debug ? debugErrorMessage : errorMessage; + + reject(proxied, code, reason.toUtf8(), HttpHeaders(), (msg + '\n').toUtf8()); + } + + void reject(bool proxied, int code, const QString &reason, const QString &errorMessage) + { + reject(proxied, code, reason, errorMessage, errorMessage); + } + + void tryReadIn() + { + while(inSock->framesAvailable() > 0 && ((outSock && outSock->writeBytesAvailable() > 0) || detached)) + { + WebSocket::Frame f = inSock->readFrame(); + + tryLogActivity(); + + incCounter(Stats::ClientContentBytesReceived, f.data.size()); + if(!f.more) + incCounter(Stats::ClientMessagesReceived); + + if(detached) + continue; + + outSock->writeFrame(f); + + incCounter(Stats::ServerContentBytesSent, f.data.size()); + if(!f.more) + incCounter(Stats::ServerMessagesSent); + } + } + + void tryReadOut() + { + while(outSock->framesAvailable() > 0 && ((inSock && inSock->writeBytesAvailable() > 0) || detached)) + { + WebSocket::Frame f = outSock->readFrame(); + + tryLogActivity(); + + incCounter(Stats::ServerContentBytesReceived, f.data.size()); + if(!f.more) + incCounter(Stats::ServerMessagesReceived); + + if(detached && outReadInProgress == -1) + continue; + + if(f.type == WebSocket::Frame::Text || f.type == WebSocket::Frame::Binary || f.type == WebSocket::Frame::Continuation) + { + // we are skipping the rest of this message + if(f.type == WebSocket::Frame::Continuation && outReadInProgress == -1) + continue; + + if(f.type != WebSocket::Frame::Continuation) + outReadInProgress = (int)f.type; + + if(wsControl && acceptGripMessages) + { + if(f.type == WebSocket::Frame::Text && f.data.startsWith("c:")) + { + // grip messages must only be one frame + if(!f.more) + wsControl->sendGripMessage(f.data.mid(2)); // process + else + outReadInProgress = -1; // ignore rest of message + } + else if(f.type != WebSocket::Frame::Continuation) + { + if(f.data.startsWith(messagePrefix)) + { + f.data = f.data.mid(messagePrefix.size()); + writeInFrame(f); + + adjustKeepAlive(); + } + else + { + log_debug("wsproxysession: dropping unprefixed message"); + } + } + else if(f.type == WebSocket::Frame::Continuation) + { + assert(outReadInProgress != -1); + + writeInFrame(f); + + adjustKeepAlive(); + } + } + else + { + writeInFrame(f); + + adjustKeepAlive(); + } + + if(!f.more) + outReadInProgress = -1; + } + else + { + // always relay non-content frames + writeInFrame(f); + + adjustKeepAlive(); + } + + if(outReadInProgress == -1 && !queuedInFrames.isEmpty()) + { + foreach(const QueuedFrame &i, queuedInFrames) + writeInFrame(i.first, i.second); + } + } + } + + void tryFinish() + { + if(!inSock && !outSock) + { + cleanup(); + finishedByPassthroughCallback.call({q}); + } + } + + void tryLogActivity() + { + if(statsManager && !activityTime.isNull()) + { + QDateTime now = QDateTime::currentDateTimeUtc(); + if(now >= activityTime.addMSecs(ACTIVITY_TIMEOUT)) + { + statsManager->addActivity(route.id); + + activityTime = activityTime.addMSecs((activityTime.msecsTo(now) / ACTIVITY_TIMEOUT) * ACTIVITY_TIMEOUT); + } + } + } + + void logConnection(bool proxied, int responseCode, int responseBodySize) + { + LogUtil::RequestData rd; + + // only log route id if explicitly set + if(route.separateStats) + rd.routeId = route.id; + + if(responseCode != -1) + { + rd.status = LogUtil::Response; + rd.responseData.code = responseCode; + rd.responseBodySize = responseBodySize; + } + else + { + rd.status = LogUtil::Error; + } + + rd.requestData.method = "GET"; + rd.requestData.uri = inSock->requestUri(); + rd.requestData.headers = inSock->requestHeaders(); + + if(proxied) + { + rd.targetStr = ProxyUtil::targetToString(target); + rd.targetOverHttp = target.overHttp; + } + + rd.fromAddress = logicalClientAddress; + + LogUtil::logRequest(LOG_LEVEL_INFO, rd, logConfig); + } + + void setupKeepAlive() + { + if(keepAliveTimeout >= 0) + { + int timeout = keepAliveTimeout * 1000; + timeout = qMax(timeout - (int)(QRandomGenerator::global()->generate() % KEEPALIVE_RAND_MAX), 0); + keepAliveTimer->start(timeout); + } + } + + void adjustKeepAlive() + { + // if idle mode, restart the timer. else leave alone + if(keepAliveTimer && keepAliveMode == WsControl::Idle) + setupKeepAlive(); + } + + void incCounter(Stats::Counter c, int count = 1) + { + if(statsManager) + statsManager->incCounter(route.statsRoute(), c, count); + } private slots: - void in_readyRead() - { - if((outSock && outSock->state() == WebSocket::Connected) || detached) - tryReadIn(); - } - - void in_framesWritten(int count, int contentBytes) - { - Q_UNUSED(contentBytes); - - for(int n = 0; n < count; ++n) - { - bool fromSendEvent = inPendingFrames.takeFirst(); - if(fromSendEvent) - wsControl->sendEventWritten(); - } - } - - void in_writeBytesChanged() - { - if(!detached && outSock) - tryReadOut(); - } - - void in_peerClosed() - { - if(detached) - { - inSock->close(); - } - else - { - if(outSock) - { - if(outSock->state() == WebSocket::Connecting) - { - delete outSock; - outSock = 0; - - inSock->close(); - } - else if(outSock->state() == WebSocket::Connected) - { - outSock->close(inSock->peerCloseCode(), inSock->peerCloseReason()); - } - } - } - } - - void in_closed() - { - int code = inSock->peerCloseCode(); - QString reason = inSock->peerCloseReason(); - cleanupInSock(); - - if(!detached && outSock && outSock->state() != WebSocket::Closing) - outSock->close(code, reason); - - tryFinish(); - } - - void in_error() - { - cleanupInSock(); - - if(!detached) - { - delete outSock; - outSock = 0; - } - - tryFinish(); - } - - void out_connected() - { - log_debug("wsproxysession: %p connected", q); - - state = Connected; - - HttpHeaders headers = outSock->responseHeaders(); - - incCounter(Stats::ServerHeaderBytesReceived, ZhttpManager::estimateResponseHeaderBytes(101, outSock->responseReason(), headers)); - - // don't proxy extensions, as we may not know how to handle them - QList wsExtensions = headers.takeAll("Sec-WebSocket-Extensions"); - - HttpExtension grip = getExtension(wsExtensions, "grip"); - if(!grip.isNull() || !target.subscriptions.isEmpty()) - { - if(!grip.isNull()) - { - if(!passToUpstream) - { - if(grip.params.contains("message-prefix")) - messagePrefix = grip.params.value("message-prefix"); - else - messagePrefix = "m:"; - - acceptGripMessages = true; - log_debug("wsproxysession: %p grip enabled, message-prefix=[%s]", q, messagePrefix.data()); - } - else - { - // tell upstream to do the grip stuff - headers += HttpHeader("Sec-WebSocket-Extensions", getExtensionRaw(wsExtensions, "grip")); - } - } - - if(wsControlManager) - { - wsControl = wsControlManager->createSession(publicCid); - wsProxyConnectionMap[wsControl] = { - wsControl->sendEventReceived.connect(boost::bind(&Private::wsControl_sendEventReceived, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3)), - wsControl->keepAliveSetupEventReceived.connect(boost::bind(&Private::wsControl_keepAliveSetupEventReceived, this, boost::placeholders::_1, boost::placeholders::_2)), - wsControl->closeEventReceived.connect(boost::bind(&Private::wsControl_closeEventReceived, this, boost::placeholders::_1, boost::placeholders::_2)), - wsControl->detachEventReceived.connect(boost::bind(&Private::wsControl_detachEventReceived, this)), - wsControl->cancelEventReceived.connect(boost::bind(&Private::wsControl_cancelEventReceived, this)), - wsControl->error.connect(boost::bind(&Private::wsControl_error, this)) - }; - wsControl->start(route.id, route.separateStats, channelPrefix, inSock->requestUri()); - - foreach(const QString &subChannel, target.subscriptions) - { - log_debug("wsproxysession: %p implicit subscription to [%s]", q, qPrintable(subChannel)); - - wsControl->sendSubscribe(subChannel.toUtf8()); - } - } - } - - inSock->respondSuccess(outSock->responseReason(), headers); - - incCounter(Stats::ClientHeaderBytesSent, ZhttpManager::estimateResponseHeaderBytes(101, outSock->responseReason(), headers)); - - logConnection(true, 101, 0); - - // send any pending frames - tryReadIn(); - } - - void out_readyRead() - { - tryReadOut(); - } - - void out_writeBytesChanged() - { - if(!detached && inSock) - tryReadIn(); - } - - void out_peerClosed() - { - if(!detached && inSock && inSock->state() != WebSocket::Closing) - inSock->close(outSock->peerCloseCode(), outSock->peerCloseReason()); - } - - void out_closed() - { - int code = outSock->peerCloseCode(); - QString reason = outSock->peerCloseReason(); - delete outSock; - outSock = 0; - - if(!detached && inSock && inSock->state() != WebSocket::Closing) - inSock->close(code, reason); - - tryFinish(); - } - - void out_error() - { - WebSocket::ErrorCondition e = outSock->errorCondition(); - log_debug("wsproxysession: %p target error state=%d, condition=%d", q, (int)state, (int)e); - - if(detached) - { - delete outSock; - outSock = 0; - - tryFinish(); - return; - } - - if(state == Connecting) - { - bool tryAgain = false; - - switch(e) - { - case WebSocket::ErrorConnect: - case WebSocket::ErrorConnectTimeout: - case WebSocket::ErrorTls: - tryAgain = true; - break; - case WebSocket::ErrorRejected: - reject(true, outSock->responseCode(), outSock->responseReason(), outSock->responseHeaders(), outSock->responseBody()); - break; - default: - reject(true, 502, "Bad Gateway", "Error while proxying to origin."); - break; - } - - delete outSock; - outSock = 0; - - if(tryAgain) - tryNextTarget(); - } - else - { - cleanupInSock(); - - delete outSock; - outSock = 0; - - tryFinish(); - } - } - - void out_aboutToSendRequest(WebSocketOverHttp *woh) - { - ProxyUtil::applyGripSig("wsproxysession", q, &requestData.headers, sigIss, sigKey); - - woh->setHeaders(requestData.headers); - } + void in_readyRead() + { + if((outSock && outSock->state() == WebSocket::Connected) || detached) + tryReadIn(); + } + + void in_framesWritten(int count, int contentBytes) + { + Q_UNUSED(contentBytes); + + for(int n = 0; n < count; ++n) + { + bool fromSendEvent = inPendingFrames.takeFirst(); + if(fromSendEvent) + wsControl->sendEventWritten(); + } + } + + void in_writeBytesChanged() + { + if(!detached && outSock) + tryReadOut(); + } + + void in_peerClosed() + { + if(detached) + { + inSock->close(); + } + else + { + if(outSock) + { + if(outSock->state() == WebSocket::Connecting) + { + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + + inSock->close(); + } + else if(outSock->state() == WebSocket::Connected) + { + outSock->close(inSock->peerCloseCode(), inSock->peerCloseReason()); + } + } + } + } + + void in_closed() + { + int code = inSock->peerCloseCode(); + QString reason = inSock->peerCloseReason(); + cleanupInSock(); + + if(!detached && outSock && outSock->state() != WebSocket::Closing) + outSock->close(code, reason); + + tryFinish(); + } + + void in_error() + { + cleanupInSock(); + + if(!detached) + { + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + } + + tryFinish(); + } + + void out_connected() + { + log_debug("wsproxysession: %p connected", q); + + state = Connected; + + HttpHeaders headers = outSock->responseHeaders(); + + incCounter(Stats::ServerHeaderBytesReceived, ZhttpManager::estimateResponseHeaderBytes(101, outSock->responseReason(), headers)); + + // don't proxy extensions, as we may not know how to handle them + QList wsExtensions = headers.takeAll("Sec-WebSocket-Extensions"); + + HttpExtension grip = getExtension(wsExtensions, "grip"); + if(!grip.isNull() || !target.subscriptions.isEmpty()) + { + if(!grip.isNull()) + { + if(!passToUpstream) + { + if(grip.params.contains("message-prefix")) + messagePrefix = grip.params.value("message-prefix"); + else + messagePrefix = "m:"; + + acceptGripMessages = true; + log_debug("wsproxysession: %p grip enabled, message-prefix=[%s]", q, messagePrefix.data()); + } + else + { + // tell upstream to do the grip stuff + headers += HttpHeader("Sec-WebSocket-Extensions", getExtensionRaw(wsExtensions, "grip")); + } + } + + if(wsControlManager) + { + wsControl = wsControlManager->createSession(publicCid); + wsProxyConnectionMap[wsControl] = { + wsControl->sendEventReceived.connect(boost::bind(&Private::wsControl_sendEventReceived, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3)), + wsControl->keepAliveSetupEventReceived.connect(boost::bind(&Private::wsControl_keepAliveSetupEventReceived, this, boost::placeholders::_1, boost::placeholders::_2)), + wsControl->closeEventReceived.connect(boost::bind(&Private::wsControl_closeEventReceived, this, boost::placeholders::_1, boost::placeholders::_2)), + wsControl->detachEventReceived.connect(boost::bind(&Private::wsControl_detachEventReceived, this)), + wsControl->cancelEventReceived.connect(boost::bind(&Private::wsControl_cancelEventReceived, this)), + wsControl->error.connect(boost::bind(&Private::wsControl_error, this)) + }; + wsControl->start(route.id, route.separateStats, channelPrefix, inSock->requestUri()); + + foreach(const QString &subChannel, target.subscriptions) + { + log_debug("wsproxysession: %p implicit subscription to [%s]", q, qPrintable(subChannel)); + + wsControl->sendSubscribe(subChannel.toUtf8()); + } + } + } + + inSock->respondSuccess(outSock->responseReason(), headers); + + incCounter(Stats::ClientHeaderBytesSent, ZhttpManager::estimateResponseHeaderBytes(101, outSock->responseReason(), headers)); + + logConnection(true, 101, 0); + + // send any pending frames + tryReadIn(); + } + + void out_readyRead() + { + tryReadOut(); + } + + void out_writeBytesChanged() + { + if(!detached && inSock) + tryReadIn(); + } + + void out_peerClosed() + { + if(!detached && inSock && inSock->state() != WebSocket::Closing) + inSock->close(outSock->peerCloseCode(), outSock->peerCloseReason()); + } + + void out_closed() + { + int code = outSock->peerCloseCode(); + QString reason = outSock->peerCloseReason(); + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + + if(!detached && inSock && inSock->state() != WebSocket::Closing) + inSock->close(code, reason); + + tryFinish(); + } + + void out_error() + { + WebSocket::ErrorCondition e = outSock->errorCondition(); + log_debug("wsproxysession: %p target error state=%d, condition=%d", q, (int)state, (int)e); + + if(detached) + { + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + + tryFinish(); + return; + } + + if(state == Connecting) + { + bool tryAgain = false; + + switch(e) + { + case WebSocket::ErrorConnect: + case WebSocket::ErrorConnectTimeout: + case WebSocket::ErrorTls: + tryAgain = true; + break; + case WebSocket::ErrorRejected: + reject(true, outSock->responseCode(), outSock->responseReason(), outSock->responseHeaders(), outSock->responseBody()); + break; + default: + reject(true, 502, "Bad Gateway", "Error while proxying to origin."); + break; + } + + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + + if(tryAgain) + tryNextTarget(); + } + else + { + cleanupInSock(); + + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + + tryFinish(); + } + } + + void out_aboutToSendRequest(WebSocketOverHttp *woh) + { + ProxyUtil::applyGripSig("wsproxysession", q, &requestData.headers, sigIss, sigKey); + + woh->setHeaders(requestData.headers); + } private: - void wsControl_sendEventReceived(WebSocket::Frame::Type type, const QByteArray &message, bool queue) - { - // this method accepts a full message, which must be typed - if(type == WebSocket::Frame::Continuation) - return; - - // if we have no socket to write to, say the data was written anyway. - // this is not quite correct but better than leaving the send event - // dangling - if(!inSock || inSock->state() != WebSocket::Connected) - { - wsControl->sendEventWritten(); - return; - } - - // if queue == false, drop if we can't send right now - if(!queue && (inSock->writeBytesAvailable() == 0 || outReadInProgress != -1)) - { - // if drop is allowed, drop is success :) - wsControl->sendEventWritten(); - return; - } - - WebSocket::Frame f(type, message, false); - - if(outReadInProgress != -1) - { - queuedInFrames += QueuedFrame(f, true); - } - else - { - writeInFrame(f, true); - } - - adjustKeepAlive(); - } - - void wsControl_keepAliveSetupEventReceived(WsControl::KeepAliveMode mode, int timeout) - { - keepAliveMode = mode; - - if(keepAliveMode != WsControl::NoKeepAlive && timeout > 0) - { - keepAliveTimeout = timeout; - - if(!keepAliveTimer) - { - keepAliveTimer = new RTimer(this); - keepAliveConneciton = keepAliveTimer->timeout.connect(boost::bind(&Private::keepAliveTimer_timeout, this)); - keepAliveTimer->setSingleShot(true); - } - - setupKeepAlive(); - } - else - { - cleanupKeepAliveTimer(); - } - } - - void wsControl_closeEventReceived(int code, const QByteArray &reason) - { - if(!detached && outSock && outSock->state() != WebSocket::Closing) - outSock->close(); - - if(inSock && inSock->state() != WebSocket::Closing) - inSock->close(code, reason); - } - - void wsControl_detachEventReceived() - { - // if already detached, do nothing - if(detached) - return; - - detached = true; - - if(outSock && outSock->state() != WebSocket::Closing) - outSock->close(); - } - - void wsControl_cancelEventReceived() - { - if(outSock) - { - delete outSock; - outSock = 0; - } - - cleanupInSock(); - - tryFinish(); - } - - void wsControl_error() - { - log_debug("wsproxysession: %p wscontrol session error", q); - wsControl_cancelEventReceived(); - } - - void keepAliveTimer_timeout() - { - wsControl->sendNeedKeepAlive(); - - if(keepAliveMode == WsControl::Interval) - setupKeepAlive(); - } + void wsControl_sendEventReceived(WebSocket::Frame::Type type, const QByteArray &message, bool queue) + { + // this method accepts a full message, which must be typed + if(type == WebSocket::Frame::Continuation) + return; + + // if we have no socket to write to, say the data was written anyway. + // this is not quite correct but better than leaving the send event + // dangling + if(!inSock || inSock->state() != WebSocket::Connected) + { + wsControl->sendEventWritten(); + return; + } + + // if queue == false, drop if we can't send right now + if(!queue && (inSock->writeBytesAvailable() == 0 || outReadInProgress != -1)) + { + // if drop is allowed, drop is success :) + wsControl->sendEventWritten(); + return; + } + + WebSocket::Frame f(type, message, false); + + if(outReadInProgress != -1) + { + queuedInFrames += QueuedFrame(f, true); + } + else + { + writeInFrame(f, true); + } + + adjustKeepAlive(); + } + + void wsControl_keepAliveSetupEventReceived(WsControl::KeepAliveMode mode, int timeout) + { + keepAliveMode = mode; + + if(keepAliveMode != WsControl::NoKeepAlive && timeout > 0) + { + keepAliveTimeout = timeout; + + if(!keepAliveTimer) + { + keepAliveTimer = new RTimer(this); + keepAliveConneciton = keepAliveTimer->timeout.connect(boost::bind(&Private::keepAliveTimer_timeout, this)); + keepAliveTimer->setSingleShot(true); + } + + setupKeepAlive(); + } + else + { + cleanupKeepAliveTimer(); + } + } + + void wsControl_closeEventReceived(int code, const QByteArray &reason) + { + if(!detached && outSock && outSock->state() != WebSocket::Closing) + outSock->close(); + + if(inSock && inSock->state() != WebSocket::Closing) + inSock->close(code, reason); + } + + void wsControl_detachEventReceived() + { + // if already detached, do nothing + if(detached) + return; + + detached = true; + + if(outSock && outSock->state() != WebSocket::Closing) + outSock->close(); + } + + void wsControl_cancelEventReceived() + { + if(outSock) + { + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + } + + cleanupInSock(); + + tryFinish(); + } + + void wsControl_error() + { + log_debug("wsproxysession: %p wscontrol session error", q); + wsControl_cancelEventReceived(); + } + + void keepAliveTimer_timeout() + { + wsControl->sendNeedKeepAlive(); + + if(keepAliveMode == WsControl::Interval) + setupKeepAlive(); + } }; WsProxySession::WsProxySession(ZRoutes *zroutes, ConnectionManager *connectionManager, const LogUtil::Config &logConfig, StatsManager *statsManager, WsControlManager *wsControlManager, QObject *parent) : - QObject(parent) + QObject(parent) { - d = new Private(this, zroutes, connectionManager, logConfig, statsManager, wsControlManager); + d = new Private(this, zroutes, connectionManager, logConfig, statsManager, wsControlManager); } WsProxySession::~WsProxySession() { - delete d; + delete d; } QHostAddress WsProxySession::logicalClientAddress() const { - return d->logicalClientAddress; + return d->logicalClientAddress; } QByteArray WsProxySession::statsRoute() const { - return d->route.statsRoute(); + return d->route.statsRoute(); } QByteArray WsProxySession::cid() const { - return d->publicCid; + return d->publicCid; } WebSocket *WsProxySession::inSocket() const { - return d->inSock; + return d->inSock; } WebSocket *WsProxySession::outSocket() const { - return d->outSock; + return d->outSock; } void WsProxySession::setDebugEnabled(bool enabled) { - d->debug = enabled; + d->debug = enabled; } void WsProxySession::setDefaultSigKey(const QByteArray &iss, const Jwt::EncodingKey &key) { - d->defaultSigIss = iss; - d->defaultSigKey = key; + d->defaultSigIss = iss; + d->defaultSigKey = key; } void WsProxySession::setDefaultUpstreamKey(const Jwt::DecodingKey &key) { - d->defaultUpstreamKey = key; + d->defaultUpstreamKey = key; } void WsProxySession::setAcceptXForwardedProtocol(bool enabled) { - d->acceptXForwardedProtocol = enabled; + d->acceptXForwardedProtocol = enabled; } void WsProxySession::setUseXForwardedProtocol(bool protoEnabled, bool protocolEnabled) { - d->useXForwardedProto = protoEnabled; - d->useXForwardedProtocol = protocolEnabled; + d->useXForwardedProto = protoEnabled; + d->useXForwardedProtocol = protocolEnabled; } void WsProxySession::setXffRules(const XffRule &untrusted, const XffRule &trusted) { - d->xffRule = untrusted; - d->xffTrustedRule = trusted; + d->xffRule = untrusted; + d->xffTrustedRule = trusted; } void WsProxySession::setOrigHeadersNeedMark(const QList &names) { - d->origHeadersNeedMark = names; + d->origHeadersNeedMark = names; } void WsProxySession::setAcceptPushpinRoute(bool enabled) { - d->acceptPushpinRoute = enabled; + d->acceptPushpinRoute = enabled; } void WsProxySession::setCdnLoop(const QByteArray &value) { - d->cdnLoop = value; + d->cdnLoop = value; } void WsProxySession::start(WebSocket *sock, const QByteArray &publicCid, const DomainMap::Entry &route) { - d->start(sock, publicCid, route); + d->start(sock, publicCid, route); } Callback> & WsProxySession::finishedByPassthroughCallback() { - return d->finishedByPassthroughCallback; + return d->finishedByPassthroughCallback; } #include "wsproxysession.moc" diff --git a/src/cpp/runner/service.cpp b/src/cpp/runner/service.cpp index 721f2fbd..044a9739 100644 --- a/src/cpp/runner/service.cpp +++ b/src/cpp/runner/service.cpp @@ -202,6 +202,11 @@ class Service::Private : public QObject } private slots: + void doError(const QString &str) + { + q->error(str); + } + void proc_started() { if(!pidFile.isEmpty()) @@ -328,7 +333,7 @@ void Service::start() if(!preStart()) { QString str = "Failure preparing to start"; - QMetaObject::invokeMethod(this, "error", Qt::QueuedConnection, Q_ARG(QString, str)); + QMetaObject::invokeMethod(this, "doError", Qt::QueuedConnection, Q_ARG(QString, str)); return; } diff --git a/src/cpp/websocket.h b/src/cpp/websocket.h index b17b0302..5274ad70 100644 --- a/src/cpp/websocket.h +++ b/src/cpp/websocket.h @@ -28,6 +28,9 @@ #include #include #include "httpheaders.h" +#include + +using Signal = boost::signals2::signal; class WebSocket : public QObject { @@ -110,8 +113,9 @@ class WebSocket : public QObject virtual Frame readFrame() = 0; virtual void close(int code = -1, const QString &reason = QString()) = 0; + Signal connected; + signals: - void connected(); void readyRead(); void framesWritten(int count, int contentBytes); void writeBytesChanged(); diff --git a/src/cpp/zwebsocket.cpp b/src/cpp/zwebsocket.cpp index 3233a1ac..1cce3c84 100644 --- a/src/cpp/zwebsocket.cpp +++ b/src/cpp/zwebsocket.cpp @@ -671,7 +671,7 @@ class ZWebSocket::Private : public QObject state = Connected; update(); - emit q->connected(); + q->connected(); } else { From 41a5e04e0ccfe58fe0039a9da94fce2fc95c0468 Mon Sep 17 00:00:00 2001 From: sima Date: Wed, 31 Jan 2024 12:26:20 -0800 Subject: [PATCH 2/5] full boostification of websocket --- src/cpp/handler/httpsessionupdatemanager.cpp | 9 +- src/cpp/proxy/sockjsmanager.cpp | 30 +- src/cpp/proxy/sockjssession.cpp | 72 +- src/cpp/proxy/testwebsocket.cpp | 27 +- src/cpp/proxy/websocketoverhttp.cpp | 51 +- src/cpp/proxy/wsproxysession.cpp | 2184 +++++++++--------- src/cpp/simplehttpserver.cpp | 25 +- src/cpp/websocket.h | 14 +- src/cpp/zwebsocket.cpp | 45 +- 9 files changed, 1270 insertions(+), 1187 deletions(-) diff --git a/src/cpp/handler/httpsessionupdatemanager.cpp b/src/cpp/handler/httpsessionupdatemanager.cpp index 15df59bf..1f403f07 100644 --- a/src/cpp/handler/httpsessionupdatemanager.cpp +++ b/src/cpp/handler/httpsessionupdatemanager.cpp @@ -113,7 +113,9 @@ class HttpSessionUpdateManager::Private : public QObject bucket->key = key; bucket->sessions += hs; bucket->timer = new QTimer(this); - connect(bucket->timer, &QTimer::timeout, this, &Private::timer_timeout); + QObject::connect(bucket->timer, &QTimer::timeout, [this, timer=bucket->timer]() { + this->timer_timeout(timer); + }); buckets[key] = bucket; bucketsByTimer[bucket->timer] = bucket; @@ -137,10 +139,9 @@ class HttpSessionUpdateManager::Private : public QObject removeBucket(bucket); } -private slots: - void timer_timeout() +private: + void timer_timeout(QTimer *timer) { - QTimer *timer = (QTimer *)sender(); Bucket *bucket = bucketsByTimer.value(timer); if(!bucket) return; diff --git a/src/cpp/proxy/sockjsmanager.cpp b/src/cpp/proxy/sockjsmanager.cpp index b26ca31a..3d6b107d 100644 --- a/src/cpp/proxy/sockjsmanager.cpp +++ b/src/cpp/proxy/sockjsmanager.cpp @@ -70,6 +70,11 @@ static QByteArray serializeJsonString(const QString &s) return tmp.mid(1, tmp.length() - 2); } +struct WSConnections { + Connection closedConnection; + Connection errorConnection; +}; + struct ZhttpReqConnections{ Connection readyReadConnection; Connection bytesWrittenConnection; @@ -123,6 +128,7 @@ class SockJsManager::Private : public QObject owner->reqConnectionMap.erase(req); delete req; } + owner->wsConnectionMap.erase(sock); delete sock; if(timer) @@ -146,6 +152,7 @@ class SockJsManager::Private : public QObject QByteArray iframeHtmlEtag; QSet discardedRequests; map reqConnectionMap; + map wsConnectionMap; Private(SockJsManager *_q, const QString &sockJsUrl) : QObject(_q), @@ -198,7 +205,10 @@ class SockJsManager::Private : public QObject { // if there's a close value, hang around for a little bit s->timer = new QTimer(this); - connect(s->timer, &QTimer::timeout, this, &Private::timer_timeout); + QObject::connect(s->timer, &QTimer::timeout, [this, timer=s->timer]() { + this->timer_timeout(timer); + }); + s->timer->setSingleShot(true); sessionsByTimer.insert(s->timer, s); s->timer->start(5000); @@ -277,8 +287,10 @@ class SockJsManager::Private : public QObject s->asUri.setPath(QString::fromUtf8(encPath.mid(0, basePathStart) + "/websocket"), QUrl::StrictMode); s->route = route; - connect(sock, &ZWebSocket::closed, this, &Private::sock_closed); - connect(sock, &ZWebSocket::error, this, &Private::sock_error); + wsConnectionMap[sock] = { + sock->closed.connect(boost::bind(&Private::sock_closed, this, sock)), + sock->error.connect(boost::bind(&Private::sock_error, this, sock)) + }; sessions += s; sessionsBySocket.insert(s->sock, s); @@ -589,6 +601,7 @@ class SockJsManager::Private : public QObject return s->ext; } +private: void req_readyRead(ZhttpRequest *req) { // for a request to have been discardable, we must have read the @@ -646,10 +659,8 @@ class SockJsManager::Private : public QObject removeSession(s); } -private slots: - void sock_closed() + void sock_closed(ZWebSocket *sock) { - ZWebSocket *sock = (ZWebSocket *)sender(); Session *s = sessionsBySocket.value(sock); assert(s); @@ -659,9 +670,8 @@ private slots: removeSession(s); } - void sock_error() + void sock_error(ZWebSocket *sock) { - ZWebSocket *sock = (ZWebSocket *)sender(); Session *s = sessionsBySocket.value(sock); assert(s); @@ -671,9 +681,9 @@ private slots: removeSession(s); } - void timer_timeout() +private: + void timer_timeout(QTimer *timer) { - QTimer *timer = (QTimer *)sender(); Session *s = sessionsByTimer.value(timer); assert(s); diff --git a/src/cpp/proxy/sockjssession.cpp b/src/cpp/proxy/sockjssession.cpp index e3df9adb..d416c4ba 100644 --- a/src/cpp/proxy/sockjssession.cpp +++ b/src/cpp/proxy/sockjssession.cpp @@ -37,10 +37,22 @@ #include "zwebsocket.h" #include "sockjsmanager.h" +using std::map; + #define BUFFER_SIZE 200000 #define KEEPALIVE_TIMEOUT 25 #define UNCONNECTED_TIMEOUT 5 +struct WSConnections { + Connection connectedConnection; + Connection readyReadConnection; + Connection framesWrittenConnection; + Connection writeBytesChangedConnection; + Connection closedConnection; + Connection peerClosedConnection; + Connection sockErrorConnection; +}; + class SockJsSession::Private : public QObject { Q_OBJECT @@ -148,6 +160,7 @@ class SockJsSession::Private : public QObject bool updating; Connection bytesWrittenConnection; Connection errorConnection; + map wsConnectionMap; Private(SockJsSession *_q) : QObject(_q), @@ -231,6 +244,7 @@ class SockJsSession::Private : public QObject } requests.clear(); + wsConnectionMap.erase(sock); delete sock; sock = 0; @@ -261,12 +275,14 @@ class SockJsSession::Private : public QObject } else { - connect(sock, &ZWebSocket::readyRead, this, &Private::sock_readyRead); - connect(sock, &ZWebSocket::framesWritten, this, &Private::sock_framesWritten); - connect(sock, &ZWebSocket::writeBytesChanged, this, &Private::sock_writeBytesChanged); - connect(sock, &ZWebSocket::closed, this, &Private::sock_closed); - connect(sock, &ZWebSocket::peerClosed, this, &Private::sock_peerClosed); - connect(sock, &ZWebSocket::error, this, &Private::sock_error); + wsConnectionMap[sock] = { + sock->readyRead.connect(boost::bind(&Private::sock_readyRead, this)), + sock->framesWritten.connect(boost::bind(&Private::sock_framesWritten, this, boost::placeholders::_1, boost::placeholders::_2)), + sock->writeBytesChanged.connect(boost::bind(&Private::sock_writeBytesChanged, this)), + sock->closed.connect(boost::bind(&Private::sock_closed, this)), + sock->peerClosed.connect(boost::bind(&Private::sock_peerClosed, this)), + sock->error.connect(boost::bind(&Private::sock_error, this)) + }; } } @@ -560,7 +576,7 @@ class SockJsSession::Private : public QObject state = Idle; applyLinger(); cleanup(); - QMetaObject::invokeMethod(q, "closed", Qt::QueuedConnection); + QMetaObject::invokeMethod(q, "doClosed", Qt::QueuedConnection); } else tryWrite(); @@ -622,7 +638,7 @@ class SockJsSession::Private : public QObject if(bytes > 0) { QPointer self = this; - emit q->writeBytesChanged(); + q->writeBytesChanged(); if(!self) return; } @@ -692,7 +708,7 @@ class SockJsSession::Private : public QObject if(emitReadyRead) { - emit q->readyRead(); + q->readyRead(); if(!self) return false; } @@ -799,7 +815,7 @@ class SockJsSession::Private : public QObject { state = Idle; cleanup(); - emit q->error(); + q->error(); // stop signals return false; @@ -807,7 +823,7 @@ class SockJsSession::Private : public QObject if(emitReadyRead) { - emit q->readyRead(); + q->readyRead(); if(!self) return false; } @@ -850,7 +866,7 @@ class SockJsSession::Private : public QObject pendingWrittenBytes = 0; } - emit q->framesWritten(count, contentBytes); + q->framesWritten(count, contentBytes); } QVariant applyLinger() @@ -901,7 +917,7 @@ class SockJsSession::Private : public QObject state = Idle; removeRequestItem(ri); cleanup(); - emit q->closed(); + q->closed(); return; } else if(ri->type == RequestItem::Receive) @@ -918,7 +934,7 @@ class SockJsSession::Private : public QObject state = Idle; removeRequestItem(ri); cleanup(); - emit q->closed(); + q->closed(); return; } } @@ -951,7 +967,7 @@ class SockJsSession::Private : public QObject if(close && !peerClosed) { peerClosed = true; - emit q->peerClosed(); + q->peerClosed(); return; } @@ -959,9 +975,9 @@ class SockJsSession::Private : public QObject cleanup(); if(close) - emit q->closed(); + q->closed(); else - emit q->error(); + q->error(); } else { @@ -969,7 +985,6 @@ class SockJsSession::Private : public QObject } } -private slots: void sock_readyRead() { if(mode == WebSocketFramed) @@ -978,7 +993,7 @@ private slots: } else // WebSocketPassthrough { - emit q->readyRead(); + q->readyRead(); } } @@ -989,14 +1004,14 @@ private slots: void sock_writeBytesChanged() { - emit q->writeBytesChanged(); + q->writeBytesChanged(); } void sock_peerClosed() { peerCloseCode = sock->peerCloseCode(); peerCloseReason = sock->peerCloseReason(); - emit q->peerClosed(); + q->peerClosed(); } void sock_closed() @@ -1005,7 +1020,7 @@ private slots: peerCloseReason = sock->peerCloseReason(); state = Idle; cleanup(); - emit q->closed(); + q->closed(); } void sock_error() @@ -1013,7 +1028,7 @@ private slots: state = Idle; errorCondition = sock->errorCondition(); cleanup(); - emit q->error(); + q->error(); } void doUpdate() @@ -1024,7 +1039,7 @@ private slots: { state = Idle; cleanup(); - emit q->error(); + q->error(); return; } @@ -1040,11 +1055,16 @@ private slots: pendingWrittenFrames = 0; pendingWrittenBytes = 0; - emit q->framesWritten(count, contentBytes); + q->framesWritten(count, contentBytes); } } } +private slots: + void doClosed(){ + q->closed(); + } + void keepAliveTimer_timeout() { assert(mode != WebSocketPassthrough); @@ -1064,7 +1084,7 @@ private slots: // timeout while unconnected state = Idle; cleanup(); - emit q->error(); + q->error(); } } else diff --git a/src/cpp/proxy/testwebsocket.cpp b/src/cpp/proxy/testwebsocket.cpp index 4399b7ed..417a3258 100644 --- a/src/cpp/proxy/testwebsocket.cpp +++ b/src/cpp/proxy/testwebsocket.cpp @@ -118,7 +118,7 @@ public slots: q->connected(); if(gripEnabled && !channels.isEmpty()) - QMetaObject::invokeMethod(q, "readyRead", Qt::QueuedConnection); + QMetaObject::invokeMethod(this, "doReadyRead", Qt::QueuedConnection); } else { @@ -128,14 +128,29 @@ public slots: response.body += QByteArray("no such test resource\n"); errorCondition = ErrorRejected; - emit q->error(); + q->error(); } } + void doReadyRead() + { + q->readyRead(); + } + + void doFramesWritten(int count, int bytes) + { + q->framesWritten(count, bytes); + } + + void doWriteBytesChanged() + { + q->writeBytesChanged(); + } + void handleClose() { state = Idle; - emit q->closed(); + q->closed(); } }; @@ -296,13 +311,13 @@ void TestWebSocket::writeFrame(const Frame &frame) d->inFrames += tmp; - QMetaObject::invokeMethod(this, "framesWritten", Qt::QueuedConnection, Q_ARG(int, 1), Q_ARG(int, tmp.data.size())); - QMetaObject::invokeMethod(this, "readyRead", Qt::QueuedConnection); + QMetaObject::invokeMethod(d, "doFramesWritten", Qt::QueuedConnection, Q_ARG(int, 1), Q_ARG(int, tmp.data.size())); + QMetaObject::invokeMethod(d, "doReadyRead", Qt::QueuedConnection); } WebSocket::Frame TestWebSocket::readFrame() { - QMetaObject::invokeMethod(this, "writeBytesChanged", Qt::QueuedConnection); + QMetaObject::invokeMethod(d, "doWriteBytesChanged", Qt::QueuedConnection); return d->inFrames.takeFirst(); } diff --git a/src/cpp/proxy/websocketoverhttp.cpp b/src/cpp/proxy/websocketoverhttp.cpp index d19b2ad9..f7342d06 100644 --- a/src/cpp/proxy/websocketoverhttp.cpp +++ b/src/cpp/proxy/websocketoverhttp.cpp @@ -46,6 +46,12 @@ namespace { +struct WSConnections { + Connection disconnectedConnection; + Connection closedConnection; + Connection errorConnection; +}; + class WsEvent { public: @@ -69,7 +75,7 @@ class WebSocketOverHttp::DisconnectManager : public QObject { Q_OBJECT - map disconnectedConnection; + map wsConnectionMap; public: DisconnectManager(QObject *parent = 0) : @@ -80,9 +86,11 @@ class WebSocketOverHttp::DisconnectManager : public QObject void addSocket(WebSocketOverHttp *sock) { sock->setParent(this); - disconnectedConnection[sock] = sock->disconnected.connect(boost::bind(&DisconnectManager::sock_disconnected, this, sock)); - connect(sock, &WebSocketOverHttp::closed, this, &DisconnectManager::sock_closed); - connect(sock, &WebSocketOverHttp::error, this, &DisconnectManager::sock_error); + wsConnectionMap[sock] = { + sock->disconnected.connect(boost::bind(&DisconnectManager::sock_disconnected, this, sock)), + sock->closed.connect(boost::bind(&DisconnectManager::sock_closed, this, sock)), + sock->error.connect(boost::bind(&DisconnectManager::sock_error, this, sock)) + }; sock->sendDisconnect(); } @@ -95,7 +103,7 @@ class WebSocketOverHttp::DisconnectManager : public QObject private: void cleanupSocket(WebSocketOverHttp *sock) { - disconnectedConnection.erase(sock); + wsConnectionMap.erase(sock); delete sock; } @@ -105,16 +113,13 @@ class WebSocketOverHttp::DisconnectManager : public QObject cleanupSocket(sock); } -private slots: - void sock_closed() + void sock_closed(WebSocketOverHttp *sock) { - WebSocketOverHttp *sock = (WebSocketOverHttp *)sender(); cleanupSocket(sock); } - void sock_error() + void sock_error(WebSocketOverHttp *sock) { - WebSocketOverHttp *sock = (WebSocketOverHttp *)sender(); cleanupSocket(sock); } }; @@ -646,7 +651,7 @@ class WebSocketOverHttp::Private : public QObject if(inBuf.size() + req->bytesAvailable() > RESPONSE_BODY_MAX) { cleanup(); - emit q->error(); + q->error(); return; } @@ -690,7 +695,7 @@ class WebSocketOverHttp::Private : public QObject errorCondition = ErrorGeneric; cleanup(); - emit q->error(); + q->error(); return; } @@ -726,7 +731,7 @@ class WebSocketOverHttp::Private : public QObject if(!ok) { cleanup(); - emit q->error(); + q->error(); return; } @@ -736,7 +741,7 @@ class WebSocketOverHttp::Private : public QObject if(events.isEmpty() && keepAliveInterval == -1) { cleanup(); - emit q->error(); + q->error(); return; } @@ -744,7 +749,7 @@ class WebSocketOverHttp::Private : public QObject if(!events.isEmpty() && events.first().type != "OPEN") { cleanup(); - emit q->error(); + q->error(); return; } @@ -851,7 +856,7 @@ class WebSocketOverHttp::Private : public QObject if(reqFrames > 0) { - emit q->framesWritten(reqFrames, reqContentSize); + q->framesWritten(reqFrames, reqContentSize); if(!self) return; } @@ -863,7 +868,7 @@ class WebSocketOverHttp::Private : public QObject if(hadContent) { - emit q->writeBytesChanged(); + q->writeBytesChanged(); if(!self) return; } @@ -876,12 +881,12 @@ class WebSocketOverHttp::Private : public QObject if(closeSent) { cleanup(); - emit q->closed(); + q->closed(); return; } else { - emit q->peerClosed(); + q->peerClosed(); } } else if(closeSent && keepAliveInterval == -1) @@ -895,14 +900,14 @@ class WebSocketOverHttp::Private : public QObject if(disconnected) { cleanup(); - emit q->error(); + q->error(); return; } if(reqClose && peerClosing) { cleanup(); - emit q->closed(); + q->closed(); return; } @@ -977,7 +982,7 @@ class WebSocketOverHttp::Private : public QObject errorCondition = WebSocket::ErrorTls; cleanup(); - emit q->error(); + q->error(); } private slots: @@ -996,7 +1001,7 @@ private slots: cleanup(); errorCondition = pendingErrorCondition; pendingErrorCondition = (ErrorCondition)-1; - emit q->error(); + q->error(); } }; diff --git a/src/cpp/proxy/wsproxysession.cpp b/src/cpp/proxy/wsproxysession.cpp index 01913f8e..82ac9638 100644 --- a/src/cpp/proxy/wsproxysession.cpp +++ b/src/cpp/proxy/wsproxysession.cpp @@ -51,1198 +51,1216 @@ #define KEEPALIVE_RAND_MAX 1000 struct WSConnections { - Connection connectedConnection; + Connection connectedConnection; + Connection readyReadConnection; + Connection writeBytesChangedConnection; + Connection peerClosedConnection; + Connection closedConnection; + Connection errorConnection; +}; + +struct InWSConnections { + Connection readyReadConnection; + Connection framesWrittenConnection; + Connection writeBytesChangedConnection; + Connection peerClosedConnection; + Connection closedConnection; + Connection errorConnection; }; struct WSProxyConnections { - Connection sendEventReceivedConnection; - Connection keepAliveSetupEventReceivedConnection; - Connection closeEventReceivedConnection; - Connection detachEventReceivedConnection; - Connection cancelEventReceivedConnection; - Connection errorConnection; + Connection sendEventReceivedConnection; + Connection keepAliveSetupEventReceivedConnection; + Connection closeEventReceivedConnection; + Connection detachEventReceivedConnection; + Connection cancelEventReceivedConnection; + Connection errorConnection; }; class HttpExtension { public: - bool isNull() const { return name.isEmpty(); } + bool isNull() const { return name.isEmpty(); } - QByteArray name; - QHash params; + QByteArray name; + QHash params; }; static int findNext(const QByteArray &in, const char *charList, int start = 0) { - int len = qstrlen(charList); - for(int n = start; n < in.size(); ++n) - { - char c = in[n]; - for(int i = 0; i < len; ++i) - { - if(c == charList[i]) - return n; - } - } - - return -1; + int len = qstrlen(charList); + for(int n = start; n < in.size(); ++n) + { + char c = in[n]; + for(int i = 0; i < len; ++i) + { + if(c == charList[i]) + return n; + } + } + + return -1; } static QHash parseParams(const QByteArray &in, bool *ok = 0) { - QHash out; - - int start = 0; - while(start < in.size()) - { - QByteArray var; - QByteArray val; - - int at = findNext(in, "=;", start); - if(at != -1) - { - var = in.mid(start, at - start).trimmed(); - if(in[at] == '=') - { - if(at + 1 >= in.size()) - { - if(ok) - *ok = false; - return QHash(); - } - - ++at; - - if(in[at] == '\"') - { - ++at; - - bool complete = false; - for(int n = at; n < in.size(); ++n) - { - if(in[n] == '\\') - { - if(n + 1 >= in.size()) - { - if(ok) - *ok = false; - return QHash(); - } - - ++n; - val += in[n]; - } - else if(in[n] == '\"') - { - complete = true; - at = n + 1; - break; - } - else - val += in[n]; - } - - if(!complete) - { - if(ok) - *ok = false; - return QHash(); - } - - at = in.indexOf(';', at); - if(at != -1) - start = at + 1; - else - start = in.size(); - } - else - { - int vstart = at; - at = in.indexOf(';', vstart); - if(at != -1) - { - val = in.mid(vstart, at - vstart).trimmed(); - start = at + 1; - } - else - { - val = in.mid(vstart).trimmed(); - start = in.size(); - } - } - } - else - start = at + 1; - } - else - { - var = in.mid(start).trimmed(); - start = in.size(); - } - - out[var] = val; - } - - if(ok) - *ok = true; - - return out; + QHash out; + + int start = 0; + while(start < in.size()) + { + QByteArray var; + QByteArray val; + + int at = findNext(in, "=;", start); + if(at != -1) + { + var = in.mid(start, at - start).trimmed(); + if(in[at] == '=') + { + if(at + 1 >= in.size()) + { + if(ok) + *ok = false; + return QHash(); + } + + ++at; + + if(in[at] == '\"') + { + ++at; + + bool complete = false; + for(int n = at; n < in.size(); ++n) + { + if(in[n] == '\\') + { + if(n + 1 >= in.size()) + { + if(ok) + *ok = false; + return QHash(); + } + + ++n; + val += in[n]; + } + else if(in[n] == '\"') + { + complete = true; + at = n + 1; + break; + } + else + val += in[n]; + } + + if(!complete) + { + if(ok) + *ok = false; + return QHash(); + } + + at = in.indexOf(';', at); + if(at != -1) + start = at + 1; + else + start = in.size(); + } + else + { + int vstart = at; + at = in.indexOf(';', vstart); + if(at != -1) + { + val = in.mid(vstart, at - vstart).trimmed(); + start = at + 1; + } + else + { + val = in.mid(vstart).trimmed(); + start = in.size(); + } + } + } + else + start = at + 1; + } + else + { + var = in.mid(start).trimmed(); + start = in.size(); + } + + out[var] = val; + } + + if(ok) + *ok = true; + + return out; } static QByteArray getExtensionRaw(const QList &extStrings, const QByteArray &name) { - foreach(const QByteArray &ext, extStrings) - { - int at = ext.indexOf(';'); - if(at != -1) - { - if(ext.mid(0, at).trimmed() == name) - return ext; - } - else - { - if(ext == name) - return ext; - } - } - - return QByteArray(); + foreach(const QByteArray &ext, extStrings) + { + int at = ext.indexOf(';'); + if(at != -1) + { + if(ext.mid(0, at).trimmed() == name) + return ext; + } + else + { + if(ext == name) + return ext; + } + } + + return QByteArray(); } static HttpExtension getExtension(const QList &extStrings, const QByteArray &name) { - QByteArray ext = getExtensionRaw(extStrings, name); - if(ext.isNull()) - return HttpExtension(); - - HttpExtension e; - e.name = name; - - int at = ext.indexOf(';'); - if(at != -1) - { - bool ok; - e.params = parseParams(ext.mid(at + 1), &ok); - if(!ok) - return HttpExtension(); - } - - return e; + QByteArray ext = getExtensionRaw(extStrings, name); + if(ext.isNull()) + return HttpExtension(); + + HttpExtension e; + e.name = name; + + int at = ext.indexOf(';'); + if(at != -1) + { + bool ok; + e.params = parseParams(ext.mid(at + 1), &ok); + if(!ok) + return HttpExtension(); + } + + return e; } class WsProxySession::Private : public QObject { - Q_OBJECT + Q_OBJECT public: - enum State - { - Idle, - Connecting, - Connected, - Closing - }; - - typedef QPair QueuedFrame; - - WsProxySession *q; - State state; - ZRoutes *zroutes; - ZhttpManager *zhttpManager; - ConnectionManager *connectionManager; - StatsManager *statsManager; - WsControlManager *wsControlManager; - WsControlSession *wsControl; - DomainMap::Entry route; - bool debug; - QByteArray defaultSigIss; - Jwt::EncodingKey defaultSigKey; - Jwt::DecodingKey defaultUpstreamKey; - bool passToUpstream; - bool acceptXForwardedProtocol; - bool useXForwardedProto; - bool useXForwardedProtocol; - XffRule xffRule; - XffRule xffTrustedRule; - QList origHeadersNeedMark; - bool acceptPushpinRoute; - QByteArray cdnLoop; - HttpRequestData requestData; - bool trustedClient; - QHostAddress logicalClientAddress; - QByteArray sigIss; - Jwt::EncodingKey sigKey; - WebSocket *inSock; - WebSocket *outSock; - QList inPendingFrames; // true means we should ack a send event - int outReadInProgress; // frame type or -1 - QByteArray pathBeg; - QByteArray channelPrefix; - QList targets; - DomainMap::Target target; - QHostAddress clientAddress; - bool acceptGripMessages; - QByteArray messagePrefix; - bool detached; - QDateTime activityTime; - QByteArray publicCid; - RTimer *keepAliveTimer; - WsControl::KeepAliveMode keepAliveMode; - int keepAliveTimeout; - QList queuedInFrames; // frames to deliver after out read finishes - LogUtil::Config logConfig; - Callback> finishedByPassthroughCallback; - Connection keepAliveConneciton; - Connection aboutToSendRequestConnection; - map wsProxyConnectionMap; - map outWSConnectionMap; - - Private(WsProxySession *_q, ZRoutes *_zroutes, ConnectionManager *_connectionManager, const LogUtil::Config &_logConfig, StatsManager *_statsManager, WsControlManager *_wsControlManager) : - QObject(_q), - q(_q), - state(Idle), - zroutes(_zroutes), - zhttpManager(0), - connectionManager(_connectionManager), - statsManager(_statsManager), - wsControlManager(_wsControlManager), - wsControl(0), - debug(false), - passToUpstream(false), - acceptXForwardedProtocol(false), - useXForwardedProto(false), - useXForwardedProtocol(false), - acceptPushpinRoute(false), - trustedClient(false), - inSock(0), - outSock(0), - outReadInProgress(-1), - acceptGripMessages(false), - detached(false), - keepAliveTimer(0), - keepAliveMode(WsControl::NoKeepAlive), - keepAliveTimeout(0), - logConfig(_logConfig) - { - } - - ~Private() - { - cleanup(); - } - - void cleanup() - { - cleanupKeepAliveTimer(); - - cleanupInSock(); - - outWSConnectionMap.erase(outSock); - delete outSock; - outSock = 0; - - wsProxyConnectionMap.erase(wsControl); - delete wsControl; - wsControl = 0; - - if(zhttpManager) - { - zroutes->removeRef(zhttpManager); - zhttpManager = 0; - } - } - - void cleanupInSock() - { - if(inSock) - { - connectionManager->removeConnection(inSock); - delete inSock; - inSock = 0; - } - } - - void cleanupKeepAliveTimer() - { - if(keepAliveTimer) - { - keepAliveConneciton.disconnect(); - keepAliveTimer->setParent(0); - keepAliveTimer->deleteLater(); - keepAliveTimer = 0; - } - } - - void start(WebSocket *sock, const QByteArray &_publicCid, const DomainMap::Entry &entry) - { - assert(!inSock); - - state = Connecting; - - publicCid = _publicCid; - - if(statsManager) - activityTime = QDateTime::currentDateTimeUtc(); - - inSock = sock; - inSock->setParent(this); - connect(inSock, &WebSocket::readyRead, this, &Private::in_readyRead); - connect(inSock, &WebSocket::framesWritten, this, &Private::in_framesWritten); - connect(inSock, &WebSocket::writeBytesChanged, this, &Private::in_writeBytesChanged); - connect(inSock, &WebSocket::peerClosed, this, &Private::in_peerClosed); - connect(inSock, &WebSocket::closed, this, &Private::in_closed); - connect(inSock, &WebSocket::error, this, &Private::in_error); - - requestData.uri = inSock->requestUri(); - requestData.headers = inSock->requestHeaders(); - - trustedClient = ProxyUtil::checkTrustedClient("wsproxysession", q, requestData, defaultUpstreamKey); - - logicalClientAddress = ProxyUtil::getLogicalAddress(requestData.headers, trustedClient ? xffTrustedRule : xffRule, inSock->peerAddress()); - - QString host = requestData.uri.host(); - - route = entry; - - log_debug("wsproxysession: %p %s has %d routes", q, qPrintable(host), route.targets.count()); - - if(route.isNull()) - { - reject(false, 502, "Bad Gateway", QString("No route for host: %1").arg(host)); - return; - } - - incCounter(Stats::ClientHeaderBytesReceived, ZhttpManager::estimateRequestHeaderBytes("GET", requestData.uri, requestData.headers)); - - if(!route.asHost.isEmpty()) - ProxyUtil::applyHost(&requestData.uri, route.asHost); - - QByteArray path = requestData.uri.path(QUrl::FullyEncoded).toUtf8(); - - if(route.pathRemove > 0) - path = path.mid(route.pathRemove); - - if(!route.pathPrepend.isEmpty()) - path = route.pathPrepend + path; + enum State + { + Idle, + Connecting, + Connected, + Closing + }; + + typedef QPair QueuedFrame; + + WsProxySession *q; + State state; + ZRoutes *zroutes; + ZhttpManager *zhttpManager; + ConnectionManager *connectionManager; + StatsManager *statsManager; + WsControlManager *wsControlManager; + WsControlSession *wsControl; + DomainMap::Entry route; + bool debug; + QByteArray defaultSigIss; + Jwt::EncodingKey defaultSigKey; + Jwt::DecodingKey defaultUpstreamKey; + bool passToUpstream; + bool acceptXForwardedProtocol; + bool useXForwardedProto; + bool useXForwardedProtocol; + XffRule xffRule; + XffRule xffTrustedRule; + QList origHeadersNeedMark; + bool acceptPushpinRoute; + QByteArray cdnLoop; + HttpRequestData requestData; + bool trustedClient; + QHostAddress logicalClientAddress; + QByteArray sigIss; + Jwt::EncodingKey sigKey; + WebSocket *inSock; + WebSocket *outSock; + QList inPendingFrames; // true means we should ack a send event + int outReadInProgress; // frame type or -1 + QByteArray pathBeg; + QByteArray channelPrefix; + QList targets; + DomainMap::Target target; + QHostAddress clientAddress; + bool acceptGripMessages; + QByteArray messagePrefix; + bool detached; + QDateTime activityTime; + QByteArray publicCid; + RTimer *keepAliveTimer; + WsControl::KeepAliveMode keepAliveMode; + int keepAliveTimeout; + QList queuedInFrames; // frames to deliver after out read finishes + LogUtil::Config logConfig; + Callback> finishedByPassthroughCallback; + Connection keepAliveConneciton; + Connection aboutToSendRequestConnection; + map wsProxyConnectionMap; + map outWSConnectionMap; + map inWSConnectionMap; + + Private(WsProxySession *_q, ZRoutes *_zroutes, ConnectionManager *_connectionManager, const LogUtil::Config &_logConfig, StatsManager *_statsManager, WsControlManager *_wsControlManager) : + QObject(_q), + q(_q), + state(Idle), + zroutes(_zroutes), + zhttpManager(0), + connectionManager(_connectionManager), + statsManager(_statsManager), + wsControlManager(_wsControlManager), + wsControl(0), + debug(false), + passToUpstream(false), + acceptXForwardedProtocol(false), + useXForwardedProto(false), + useXForwardedProtocol(false), + acceptPushpinRoute(false), + trustedClient(false), + inSock(0), + outSock(0), + outReadInProgress(-1), + acceptGripMessages(false), + detached(false), + keepAliveTimer(0), + keepAliveMode(WsControl::NoKeepAlive), + keepAliveTimeout(0), + logConfig(_logConfig) + { + } + + ~Private() + { + cleanup(); + } + + void cleanup() + { + cleanupKeepAliveTimer(); + + cleanupInSock(); + + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + + wsProxyConnectionMap.erase(wsControl); + delete wsControl; + wsControl = 0; + + if(zhttpManager) + { + zroutes->removeRef(zhttpManager); + zhttpManager = 0; + } + } + + void cleanupInSock() + { + if(inSock) + { + connectionManager->removeConnection(inSock); + inWSConnectionMap.erase(inSock); + delete inSock; + inSock = 0; + } + } + + void cleanupKeepAliveTimer() + { + if(keepAliveTimer) + { + keepAliveConneciton.disconnect(); + keepAliveTimer->setParent(0); + keepAliveTimer->deleteLater(); + keepAliveTimer = 0; + } + } + + void start(WebSocket *sock, const QByteArray &_publicCid, const DomainMap::Entry &entry) + { + assert(!inSock); + + state = Connecting; + + publicCid = _publicCid; + + if(statsManager) + activityTime = QDateTime::currentDateTimeUtc(); + + inSock = sock; + inSock->setParent(this); + inWSConnectionMap[inSock] = { + inSock->readyRead.connect(boost::bind(&Private::in_readyRead, this)), + inSock->framesWritten.connect(boost::bind(&Private::in_framesWritten, this, boost::placeholders::_1, boost::placeholders::_2)), + inSock->writeBytesChanged.connect(boost::bind(&Private::in_writeBytesChanged, this)), + inSock->peerClosed.connect(boost::bind(&Private::in_peerClosed, this)), + inSock->closed.connect(boost::bind(&Private::in_closed, this)), + inSock->error.connect(boost::bind(&Private::in_error, this)) + }; + + requestData.uri = inSock->requestUri(); + requestData.headers = inSock->requestHeaders(); + + trustedClient = ProxyUtil::checkTrustedClient("wsproxysession", q, requestData, defaultUpstreamKey); + + logicalClientAddress = ProxyUtil::getLogicalAddress(requestData.headers, trustedClient ? xffTrustedRule : xffRule, inSock->peerAddress()); + + QString host = requestData.uri.host(); + + route = entry; + + log_debug("wsproxysession: %p %s has %d routes", q, qPrintable(host), route.targets.count()); + + if(route.isNull()) + { + reject(false, 502, "Bad Gateway", QString("No route for host: %1").arg(host)); + return; + } + + incCounter(Stats::ClientHeaderBytesReceived, ZhttpManager::estimateRequestHeaderBytes("GET", requestData.uri, requestData.headers)); + + if(!route.asHost.isEmpty()) + ProxyUtil::applyHost(&requestData.uri, route.asHost); + + QByteArray path = requestData.uri.path(QUrl::FullyEncoded).toUtf8(); + + if(route.pathRemove > 0) + path = path.mid(route.pathRemove); - requestData.uri.setPath(QString::fromUtf8(path), QUrl::StrictMode); - - sigIss = defaultSigIss; - sigKey = defaultSigKey; - - if(!route.sigIss.isEmpty()) - sigIss = route.sigIss; - - if(!route.sigKey.isNull()) - sigKey = route.sigKey; - - pathBeg = route.pathBeg; - channelPrefix = route.prefix; - targets = route.targets; - - foreach(const HttpHeader &h, route.headers) - { - requestData.headers.removeAll(h.first); - if(!h.second.isEmpty()) - requestData.headers += HttpHeader(h.first, h.second); - } - - clientAddress = inSock->peerAddress(); - - ProxyUtil::manipulateRequestHeaders("wsproxysession", q, &requestData, trustedClient, route, sigIss, sigKey, acceptXForwardedProtocol, useXForwardedProto, useXForwardedProtocol, xffTrustedRule, xffRule, origHeadersNeedMark, acceptPushpinRoute, cdnLoop, clientAddress, InspectData(), route.grip, false); - - // don't proxy extensions, as we may not know how to handle them - requestData.headers.removeAll("Sec-WebSocket-Extensions"); - - if(route.grip) - { - // send grip extension - requestData.headers += HttpHeader("Sec-WebSocket-Extensions", "grip"); - } - - if(trustedClient || !route.grip) - passToUpstream = true; - - tryNextTarget(); - } - - void writeInFrame(const WebSocket::Frame &frame, bool fromSendEvent = false) - { - inPendingFrames += fromSendEvent; - - inSock->writeFrame(frame); - - incCounter(Stats::ClientContentBytesSent, frame.data.size()); - if(!frame.more) - incCounter(Stats::ClientMessagesSent); - } - - void tryNextTarget() - { - if(targets.isEmpty()) - { - QString msg = "Error while proxying to origin."; - - QStringList targetStrs; - foreach(const DomainMap::Target &t, route.targets) - targetStrs += ProxyUtil::targetToString(t); - QString dmsg = QString("Unable to connect to any targets. Tried: %1").arg(targetStrs.join(", ")); - - reject(true, 502, "Bad Gateway", msg, dmsg); - return; - } + if(!route.pathPrepend.isEmpty()) + path = route.pathPrepend + path; + + requestData.uri.setPath(QString::fromUtf8(path), QUrl::StrictMode); + + sigIss = defaultSigIss; + sigKey = defaultSigKey; + + if(!route.sigIss.isEmpty()) + sigIss = route.sigIss; + + if(!route.sigKey.isNull()) + sigKey = route.sigKey; + + pathBeg = route.pathBeg; + channelPrefix = route.prefix; + targets = route.targets; + + foreach(const HttpHeader &h, route.headers) + { + requestData.headers.removeAll(h.first); + if(!h.second.isEmpty()) + requestData.headers += HttpHeader(h.first, h.second); + } + + clientAddress = inSock->peerAddress(); + + ProxyUtil::manipulateRequestHeaders("wsproxysession", q, &requestData, trustedClient, route, sigIss, sigKey, acceptXForwardedProtocol, useXForwardedProto, useXForwardedProtocol, xffTrustedRule, xffRule, origHeadersNeedMark, acceptPushpinRoute, cdnLoop, clientAddress, InspectData(), route.grip, false); + + // don't proxy extensions, as we may not know how to handle them + requestData.headers.removeAll("Sec-WebSocket-Extensions"); + + if(route.grip) + { + // send grip extension + requestData.headers += HttpHeader("Sec-WebSocket-Extensions", "grip"); + } + + if(trustedClient || !route.grip) + passToUpstream = true; + + tryNextTarget(); + } + + void writeInFrame(const WebSocket::Frame &frame, bool fromSendEvent = false) + { + inPendingFrames += fromSendEvent; + + inSock->writeFrame(frame); + + incCounter(Stats::ClientContentBytesSent, frame.data.size()); + if(!frame.more) + incCounter(Stats::ClientMessagesSent); + } + + void tryNextTarget() + { + if(targets.isEmpty()) + { + QString msg = "Error while proxying to origin."; + + QStringList targetStrs; + foreach(const DomainMap::Target &t, route.targets) + targetStrs += ProxyUtil::targetToString(t); + QString dmsg = QString("Unable to connect to any targets. Tried: %1").arg(targetStrs.join(", ")); + + reject(true, 502, "Bad Gateway", msg, dmsg); + return; + } - target = targets.takeFirst(); - - QUrl uri = requestData.uri; - if(target.ssl) - uri.setScheme("wss"); - else - uri.setScheme("ws"); - - if(!target.host.isEmpty()) - ProxyUtil::applyHost(&uri, target.host); - - if(zhttpManager) - { - zroutes->removeRef(zhttpManager); - zhttpManager = 0; - } - - if(target.type == DomainMap::Target::Test) - { - // for test route, auto-adjust path - if(!pathBeg.isEmpty()) - { - int pathRemove = pathBeg.length(); - if(pathBeg.endsWith('/')) - --pathRemove; - - if(pathRemove > 0) - uri.setPath(uri.path(QUrl::FullyEncoded).mid(pathRemove)); - } - - outSock = new TestWebSocket(this); - } - else - { - if(target.type == DomainMap::Target::Custom) - { - zhttpManager = zroutes->managerForRoute(target.zhttpRoute); - log_debug("wsproxysession: %p forwarding to %s", q, qPrintable(target.zhttpRoute.baseSpec)); - } - else // Default - { - zhttpManager = zroutes->defaultManager(); - log_debug("wsproxysession: %p forwarding to %s:%d", q, qPrintable(target.connectHost), target.connectPort); - } - - zroutes->addRef(zhttpManager); - - if(target.overHttp) - { - WebSocketOverHttp *woh = new WebSocketOverHttp(zhttpManager, this); - - woh->setConnectionId(publicCid); - - if(target.oneEvent) - woh->setMaxEventsPerRequest(1); - - aboutToSendRequestConnection = woh->aboutToSendRequest.connect(boost::bind(&Private::out_aboutToSendRequest, this, woh)); - outSock = woh; - } - else - { - // websockets don't work with zhttp req mode - if(zhttpManager->clientUsesReq()) - { - reject(false, 502, "Bad Gateway", "Error while proxying to origin.", "WebSockets cannot be used with zhttpreq target"); - return; - } - - outSock = zhttpManager->createSocket(); - outSock->setParent(this); - } - } - outWSConnectionMap[outSock] = { - outSock->connected.connect(boost::bind(&Private::out_connected, this)) - }; - connect(outSock, &WebSocket::readyRead, this, &Private::out_readyRead); - connect(outSock, &WebSocket::writeBytesChanged, this, &Private::out_writeBytesChanged); - connect(outSock, &WebSocket::peerClosed, this, &Private::out_peerClosed); - connect(outSock, &WebSocket::closed, this, &Private::out_closed); - connect(outSock, &WebSocket::error, this, &Private::out_error); - - if(target.trusted) - outSock->setIgnorePolicies(true); - - if(target.trustConnectHost) - outSock->setTrustConnectHost(true); - - if(target.insecure) - outSock->setIgnoreTlsErrors(true); - - if(target.type == DomainMap::Target::Default) - { - outSock->setConnectHost(target.connectHost); - outSock->setConnectPort(target.connectPort); - } - - ProxyUtil::applyHostHeader(&requestData.headers, uri); - - incCounter(Stats::ServerHeaderBytesSent, ZhttpManager::estimateRequestHeaderBytes("GET", uri, requestData.headers)); - - outSock->start(uri, requestData.headers); - } - - void reject(bool proxied, int code, const QByteArray &reason, const HttpHeaders &headers, const QByteArray &body) - { - assert(state == Connecting); - - state = Closing; - inSock->respondError(code, reason, headers, body); - - incCounter(Stats::ClientHeaderBytesSent, ZhttpManager::estimateResponseHeaderBytes(code, reason, headers)); - - logConnection(proxied, code, body.size()); - } - - void reject(bool proxied, int code, const QString &reason, const QString &errorMessage, const QString &debugErrorMessage) - { - QString msg = debug ? debugErrorMessage : errorMessage; - - reject(proxied, code, reason.toUtf8(), HttpHeaders(), (msg + '\n').toUtf8()); - } - - void reject(bool proxied, int code, const QString &reason, const QString &errorMessage) - { - reject(proxied, code, reason, errorMessage, errorMessage); - } - - void tryReadIn() - { - while(inSock->framesAvailable() > 0 && ((outSock && outSock->writeBytesAvailable() > 0) || detached)) - { - WebSocket::Frame f = inSock->readFrame(); - - tryLogActivity(); - - incCounter(Stats::ClientContentBytesReceived, f.data.size()); - if(!f.more) - incCounter(Stats::ClientMessagesReceived); - - if(detached) - continue; - - outSock->writeFrame(f); - - incCounter(Stats::ServerContentBytesSent, f.data.size()); - if(!f.more) - incCounter(Stats::ServerMessagesSent); - } - } - - void tryReadOut() - { - while(outSock->framesAvailable() > 0 && ((inSock && inSock->writeBytesAvailable() > 0) || detached)) - { - WebSocket::Frame f = outSock->readFrame(); - - tryLogActivity(); - - incCounter(Stats::ServerContentBytesReceived, f.data.size()); - if(!f.more) - incCounter(Stats::ServerMessagesReceived); - - if(detached && outReadInProgress == -1) - continue; - - if(f.type == WebSocket::Frame::Text || f.type == WebSocket::Frame::Binary || f.type == WebSocket::Frame::Continuation) - { - // we are skipping the rest of this message - if(f.type == WebSocket::Frame::Continuation && outReadInProgress == -1) - continue; - - if(f.type != WebSocket::Frame::Continuation) - outReadInProgress = (int)f.type; - - if(wsControl && acceptGripMessages) - { - if(f.type == WebSocket::Frame::Text && f.data.startsWith("c:")) - { - // grip messages must only be one frame - if(!f.more) - wsControl->sendGripMessage(f.data.mid(2)); // process - else - outReadInProgress = -1; // ignore rest of message - } - else if(f.type != WebSocket::Frame::Continuation) - { - if(f.data.startsWith(messagePrefix)) - { - f.data = f.data.mid(messagePrefix.size()); - writeInFrame(f); - - adjustKeepAlive(); - } - else - { - log_debug("wsproxysession: dropping unprefixed message"); - } - } - else if(f.type == WebSocket::Frame::Continuation) - { - assert(outReadInProgress != -1); - - writeInFrame(f); - - adjustKeepAlive(); - } - } - else - { - writeInFrame(f); - - adjustKeepAlive(); - } - - if(!f.more) - outReadInProgress = -1; - } - else - { - // always relay non-content frames - writeInFrame(f); - - adjustKeepAlive(); - } - - if(outReadInProgress == -1 && !queuedInFrames.isEmpty()) - { - foreach(const QueuedFrame &i, queuedInFrames) - writeInFrame(i.first, i.second); - } - } - } - - void tryFinish() - { - if(!inSock && !outSock) - { - cleanup(); - finishedByPassthroughCallback.call({q}); - } - } - - void tryLogActivity() - { - if(statsManager && !activityTime.isNull()) - { - QDateTime now = QDateTime::currentDateTimeUtc(); - if(now >= activityTime.addMSecs(ACTIVITY_TIMEOUT)) - { - statsManager->addActivity(route.id); - - activityTime = activityTime.addMSecs((activityTime.msecsTo(now) / ACTIVITY_TIMEOUT) * ACTIVITY_TIMEOUT); - } - } - } - - void logConnection(bool proxied, int responseCode, int responseBodySize) - { - LogUtil::RequestData rd; - - // only log route id if explicitly set - if(route.separateStats) - rd.routeId = route.id; - - if(responseCode != -1) - { - rd.status = LogUtil::Response; - rd.responseData.code = responseCode; - rd.responseBodySize = responseBodySize; - } - else - { - rd.status = LogUtil::Error; - } - - rd.requestData.method = "GET"; - rd.requestData.uri = inSock->requestUri(); - rd.requestData.headers = inSock->requestHeaders(); - - if(proxied) - { - rd.targetStr = ProxyUtil::targetToString(target); - rd.targetOverHttp = target.overHttp; - } - - rd.fromAddress = logicalClientAddress; - - LogUtil::logRequest(LOG_LEVEL_INFO, rd, logConfig); - } - - void setupKeepAlive() - { - if(keepAliveTimeout >= 0) - { - int timeout = keepAliveTimeout * 1000; - timeout = qMax(timeout - (int)(QRandomGenerator::global()->generate() % KEEPALIVE_RAND_MAX), 0); - keepAliveTimer->start(timeout); - } - } - - void adjustKeepAlive() - { - // if idle mode, restart the timer. else leave alone - if(keepAliveTimer && keepAliveMode == WsControl::Idle) - setupKeepAlive(); - } - - void incCounter(Stats::Counter c, int count = 1) - { - if(statsManager) - statsManager->incCounter(route.statsRoute(), c, count); - } + target = targets.takeFirst(); + + QUrl uri = requestData.uri; + if(target.ssl) + uri.setScheme("wss"); + else + uri.setScheme("ws"); + + if(!target.host.isEmpty()) + ProxyUtil::applyHost(&uri, target.host); + + if(zhttpManager) + { + zroutes->removeRef(zhttpManager); + zhttpManager = 0; + } + + if(target.type == DomainMap::Target::Test) + { + // for test route, auto-adjust path + if(!pathBeg.isEmpty()) + { + int pathRemove = pathBeg.length(); + if(pathBeg.endsWith('/')) + --pathRemove; + + if(pathRemove > 0) + uri.setPath(uri.path(QUrl::FullyEncoded).mid(pathRemove)); + } + + outSock = new TestWebSocket(this); + } + else + { + if(target.type == DomainMap::Target::Custom) + { + zhttpManager = zroutes->managerForRoute(target.zhttpRoute); + log_debug("wsproxysession: %p forwarding to %s", q, qPrintable(target.zhttpRoute.baseSpec)); + } + else // Default + { + zhttpManager = zroutes->defaultManager(); + log_debug("wsproxysession: %p forwarding to %s:%d", q, qPrintable(target.connectHost), target.connectPort); + } + + zroutes->addRef(zhttpManager); + + if(target.overHttp) + { + WebSocketOverHttp *woh = new WebSocketOverHttp(zhttpManager, this); + + woh->setConnectionId(publicCid); + + if(target.oneEvent) + woh->setMaxEventsPerRequest(1); + + aboutToSendRequestConnection = woh->aboutToSendRequest.connect(boost::bind(&Private::out_aboutToSendRequest, this, woh)); + outSock = woh; + } + else + { + // websockets don't work with zhttp req mode + if(zhttpManager->clientUsesReq()) + { + reject(false, 502, "Bad Gateway", "Error while proxying to origin.", "WebSockets cannot be used with zhttpreq target"); + return; + } + + outSock = zhttpManager->createSocket(); + outSock->setParent(this); + } + } + outWSConnectionMap[outSock] = { + outSock->connected.connect(boost::bind(&Private::out_connected, this)), + outSock->readyRead.connect(boost::bind(&Private::out_readyRead, this)), + outSock->writeBytesChanged.connect(boost::bind(&Private::out_writeBytesChanged, this)), + outSock->peerClosed.connect(boost::bind(&Private::out_peerClosed, this)), + outSock->closed.connect(boost::bind(&Private::out_closed, this)), + outSock->error.connect(boost::bind(&Private::out_error, this)) + }; + + if(target.trusted) + outSock->setIgnorePolicies(true); + + if(target.trustConnectHost) + outSock->setTrustConnectHost(true); + + if(target.insecure) + outSock->setIgnoreTlsErrors(true); + + if(target.type == DomainMap::Target::Default) + { + outSock->setConnectHost(target.connectHost); + outSock->setConnectPort(target.connectPort); + } + + ProxyUtil::applyHostHeader(&requestData.headers, uri); + + incCounter(Stats::ServerHeaderBytesSent, ZhttpManager::estimateRequestHeaderBytes("GET", uri, requestData.headers)); + + outSock->start(uri, requestData.headers); + } + + void reject(bool proxied, int code, const QByteArray &reason, const HttpHeaders &headers, const QByteArray &body) + { + assert(state == Connecting); + + state = Closing; + inSock->respondError(code, reason, headers, body); + + incCounter(Stats::ClientHeaderBytesSent, ZhttpManager::estimateResponseHeaderBytes(code, reason, headers)); + + logConnection(proxied, code, body.size()); + } + + void reject(bool proxied, int code, const QString &reason, const QString &errorMessage, const QString &debugErrorMessage) + { + QString msg = debug ? debugErrorMessage : errorMessage; + + reject(proxied, code, reason.toUtf8(), HttpHeaders(), (msg + '\n').toUtf8()); + } + + void reject(bool proxied, int code, const QString &reason, const QString &errorMessage) + { + reject(proxied, code, reason, errorMessage, errorMessage); + } + + void tryReadIn() + { + while(inSock->framesAvailable() > 0 && ((outSock && outSock->writeBytesAvailable() > 0) || detached)) + { + WebSocket::Frame f = inSock->readFrame(); + + tryLogActivity(); + + incCounter(Stats::ClientContentBytesReceived, f.data.size()); + if(!f.more) + incCounter(Stats::ClientMessagesReceived); + + if(detached) + continue; + + outSock->writeFrame(f); + + incCounter(Stats::ServerContentBytesSent, f.data.size()); + if(!f.more) + incCounter(Stats::ServerMessagesSent); + } + } + + void tryReadOut() + { + while(outSock->framesAvailable() > 0 && ((inSock && inSock->writeBytesAvailable() > 0) || detached)) + { + WebSocket::Frame f = outSock->readFrame(); + + tryLogActivity(); + + incCounter(Stats::ServerContentBytesReceived, f.data.size()); + if(!f.more) + incCounter(Stats::ServerMessagesReceived); + + if(detached && outReadInProgress == -1) + continue; + + if(f.type == WebSocket::Frame::Text || f.type == WebSocket::Frame::Binary || f.type == WebSocket::Frame::Continuation) + { + // we are skipping the rest of this message + if(f.type == WebSocket::Frame::Continuation && outReadInProgress == -1) + continue; + + if(f.type != WebSocket::Frame::Continuation) + outReadInProgress = (int)f.type; + + if(wsControl && acceptGripMessages) + { + if(f.type == WebSocket::Frame::Text && f.data.startsWith("c:")) + { + // grip messages must only be one frame + if(!f.more) + wsControl->sendGripMessage(f.data.mid(2)); // process + else + outReadInProgress = -1; // ignore rest of message + } + else if(f.type != WebSocket::Frame::Continuation) + { + if(f.data.startsWith(messagePrefix)) + { + f.data = f.data.mid(messagePrefix.size()); + writeInFrame(f); + + adjustKeepAlive(); + } + else + { + log_debug("wsproxysession: dropping unprefixed message"); + } + } + else if(f.type == WebSocket::Frame::Continuation) + { + assert(outReadInProgress != -1); + + writeInFrame(f); + + adjustKeepAlive(); + } + } + else + { + writeInFrame(f); + + adjustKeepAlive(); + } + + if(!f.more) + outReadInProgress = -1; + } + else + { + // always relay non-content frames + writeInFrame(f); + + adjustKeepAlive(); + } + + if(outReadInProgress == -1 && !queuedInFrames.isEmpty()) + { + foreach(const QueuedFrame &i, queuedInFrames) + writeInFrame(i.first, i.second); + } + } + } + + void tryFinish() + { + if(!inSock && !outSock) + { + cleanup(); + finishedByPassthroughCallback.call({q}); + } + } + + void tryLogActivity() + { + if(statsManager && !activityTime.isNull()) + { + QDateTime now = QDateTime::currentDateTimeUtc(); + if(now >= activityTime.addMSecs(ACTIVITY_TIMEOUT)) + { + statsManager->addActivity(route.id); + + activityTime = activityTime.addMSecs((activityTime.msecsTo(now) / ACTIVITY_TIMEOUT) * ACTIVITY_TIMEOUT); + } + } + } + + void logConnection(bool proxied, int responseCode, int responseBodySize) + { + LogUtil::RequestData rd; + + // only log route id if explicitly set + if(route.separateStats) + rd.routeId = route.id; + + if(responseCode != -1) + { + rd.status = LogUtil::Response; + rd.responseData.code = responseCode; + rd.responseBodySize = responseBodySize; + } + else + { + rd.status = LogUtil::Error; + } + + rd.requestData.method = "GET"; + rd.requestData.uri = inSock->requestUri(); + rd.requestData.headers = inSock->requestHeaders(); + + if(proxied) + { + rd.targetStr = ProxyUtil::targetToString(target); + rd.targetOverHttp = target.overHttp; + } + + rd.fromAddress = logicalClientAddress; + + LogUtil::logRequest(LOG_LEVEL_INFO, rd, logConfig); + } + + void setupKeepAlive() + { + if(keepAliveTimeout >= 0) + { + int timeout = keepAliveTimeout * 1000; + timeout = qMax(timeout - (int)(QRandomGenerator::global()->generate() % KEEPALIVE_RAND_MAX), 0); + keepAliveTimer->start(timeout); + } + } + + void adjustKeepAlive() + { + // if idle mode, restart the timer. else leave alone + if(keepAliveTimer && keepAliveMode == WsControl::Idle) + setupKeepAlive(); + } + + void incCounter(Stats::Counter c, int count = 1) + { + if(statsManager) + statsManager->incCounter(route.statsRoute(), c, count); + } private slots: - void in_readyRead() - { - if((outSock && outSock->state() == WebSocket::Connected) || detached) - tryReadIn(); - } - - void in_framesWritten(int count, int contentBytes) - { - Q_UNUSED(contentBytes); - - for(int n = 0; n < count; ++n) - { - bool fromSendEvent = inPendingFrames.takeFirst(); - if(fromSendEvent) - wsControl->sendEventWritten(); - } - } - - void in_writeBytesChanged() - { - if(!detached && outSock) - tryReadOut(); - } - - void in_peerClosed() - { - if(detached) - { - inSock->close(); - } - else - { - if(outSock) - { - if(outSock->state() == WebSocket::Connecting) - { - outWSConnectionMap.erase(outSock); - delete outSock; - outSock = 0; - - inSock->close(); - } - else if(outSock->state() == WebSocket::Connected) - { - outSock->close(inSock->peerCloseCode(), inSock->peerCloseReason()); - } - } - } - } - - void in_closed() - { - int code = inSock->peerCloseCode(); - QString reason = inSock->peerCloseReason(); - cleanupInSock(); - - if(!detached && outSock && outSock->state() != WebSocket::Closing) - outSock->close(code, reason); - - tryFinish(); - } - - void in_error() - { - cleanupInSock(); - - if(!detached) - { - outWSConnectionMap.erase(outSock); - delete outSock; - outSock = 0; - } - - tryFinish(); - } - - void out_connected() - { - log_debug("wsproxysession: %p connected", q); - - state = Connected; - - HttpHeaders headers = outSock->responseHeaders(); - - incCounter(Stats::ServerHeaderBytesReceived, ZhttpManager::estimateResponseHeaderBytes(101, outSock->responseReason(), headers)); - - // don't proxy extensions, as we may not know how to handle them - QList wsExtensions = headers.takeAll("Sec-WebSocket-Extensions"); - - HttpExtension grip = getExtension(wsExtensions, "grip"); - if(!grip.isNull() || !target.subscriptions.isEmpty()) - { - if(!grip.isNull()) - { - if(!passToUpstream) - { - if(grip.params.contains("message-prefix")) - messagePrefix = grip.params.value("message-prefix"); - else - messagePrefix = "m:"; - - acceptGripMessages = true; - log_debug("wsproxysession: %p grip enabled, message-prefix=[%s]", q, messagePrefix.data()); - } - else - { - // tell upstream to do the grip stuff - headers += HttpHeader("Sec-WebSocket-Extensions", getExtensionRaw(wsExtensions, "grip")); - } - } - - if(wsControlManager) - { - wsControl = wsControlManager->createSession(publicCid); - wsProxyConnectionMap[wsControl] = { - wsControl->sendEventReceived.connect(boost::bind(&Private::wsControl_sendEventReceived, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3)), - wsControl->keepAliveSetupEventReceived.connect(boost::bind(&Private::wsControl_keepAliveSetupEventReceived, this, boost::placeholders::_1, boost::placeholders::_2)), - wsControl->closeEventReceived.connect(boost::bind(&Private::wsControl_closeEventReceived, this, boost::placeholders::_1, boost::placeholders::_2)), - wsControl->detachEventReceived.connect(boost::bind(&Private::wsControl_detachEventReceived, this)), - wsControl->cancelEventReceived.connect(boost::bind(&Private::wsControl_cancelEventReceived, this)), - wsControl->error.connect(boost::bind(&Private::wsControl_error, this)) - }; - wsControl->start(route.id, route.separateStats, channelPrefix, inSock->requestUri()); - - foreach(const QString &subChannel, target.subscriptions) - { - log_debug("wsproxysession: %p implicit subscription to [%s]", q, qPrintable(subChannel)); - - wsControl->sendSubscribe(subChannel.toUtf8()); - } - } - } - - inSock->respondSuccess(outSock->responseReason(), headers); - - incCounter(Stats::ClientHeaderBytesSent, ZhttpManager::estimateResponseHeaderBytes(101, outSock->responseReason(), headers)); - - logConnection(true, 101, 0); - - // send any pending frames - tryReadIn(); - } - - void out_readyRead() - { - tryReadOut(); - } - - void out_writeBytesChanged() - { - if(!detached && inSock) - tryReadIn(); - } - - void out_peerClosed() - { - if(!detached && inSock && inSock->state() != WebSocket::Closing) - inSock->close(outSock->peerCloseCode(), outSock->peerCloseReason()); - } - - void out_closed() - { - int code = outSock->peerCloseCode(); - QString reason = outSock->peerCloseReason(); - outWSConnectionMap.erase(outSock); - delete outSock; - outSock = 0; - - if(!detached && inSock && inSock->state() != WebSocket::Closing) - inSock->close(code, reason); - - tryFinish(); - } - - void out_error() - { - WebSocket::ErrorCondition e = outSock->errorCondition(); - log_debug("wsproxysession: %p target error state=%d, condition=%d", q, (int)state, (int)e); - - if(detached) - { - outWSConnectionMap.erase(outSock); - delete outSock; - outSock = 0; - - tryFinish(); - return; - } - - if(state == Connecting) - { - bool tryAgain = false; - - switch(e) - { - case WebSocket::ErrorConnect: - case WebSocket::ErrorConnectTimeout: - case WebSocket::ErrorTls: - tryAgain = true; - break; - case WebSocket::ErrorRejected: - reject(true, outSock->responseCode(), outSock->responseReason(), outSock->responseHeaders(), outSock->responseBody()); - break; - default: - reject(true, 502, "Bad Gateway", "Error while proxying to origin."); - break; - } - - outWSConnectionMap.erase(outSock); - delete outSock; - outSock = 0; - - if(tryAgain) - tryNextTarget(); - } - else - { - cleanupInSock(); - - outWSConnectionMap.erase(outSock); - delete outSock; - outSock = 0; - - tryFinish(); - } - } - - void out_aboutToSendRequest(WebSocketOverHttp *woh) - { - ProxyUtil::applyGripSig("wsproxysession", q, &requestData.headers, sigIss, sigKey); - - woh->setHeaders(requestData.headers); - } + void in_readyRead() + { + if((outSock && outSock->state() == WebSocket::Connected) || detached) + tryReadIn(); + } + + void in_framesWritten(int count, int contentBytes) + { + Q_UNUSED(contentBytes); + + for(int n = 0; n < count; ++n) + { + bool fromSendEvent = inPendingFrames.takeFirst(); + if(fromSendEvent) + wsControl->sendEventWritten(); + } + } + + void in_writeBytesChanged() + { + if(!detached && outSock) + tryReadOut(); + } + + void in_peerClosed() + { + if(detached) + { + inSock->close(); + } + else + { + if(outSock) + { + if(outSock->state() == WebSocket::Connecting) + { + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + + inSock->close(); + } + else if(outSock->state() == WebSocket::Connected) + { + outSock->close(inSock->peerCloseCode(), inSock->peerCloseReason()); + } + } + } + } + + void in_closed() + { + int code = inSock->peerCloseCode(); + QString reason = inSock->peerCloseReason(); + cleanupInSock(); + + if(!detached && outSock && outSock->state() != WebSocket::Closing) + outSock->close(code, reason); + + tryFinish(); + } + + void in_error() + { + cleanupInSock(); + + if(!detached) + { + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + } + + tryFinish(); + } + + void out_connected() + { + log_debug("wsproxysession: %p connected", q); + + state = Connected; + + HttpHeaders headers = outSock->responseHeaders(); + + incCounter(Stats::ServerHeaderBytesReceived, ZhttpManager::estimateResponseHeaderBytes(101, outSock->responseReason(), headers)); + + // don't proxy extensions, as we may not know how to handle them + QList wsExtensions = headers.takeAll("Sec-WebSocket-Extensions"); + + HttpExtension grip = getExtension(wsExtensions, "grip"); + if(!grip.isNull() || !target.subscriptions.isEmpty()) + { + if(!grip.isNull()) + { + if(!passToUpstream) + { + if(grip.params.contains("message-prefix")) + messagePrefix = grip.params.value("message-prefix"); + else + messagePrefix = "m:"; + + acceptGripMessages = true; + log_debug("wsproxysession: %p grip enabled, message-prefix=[%s]", q, messagePrefix.data()); + } + else + { + // tell upstream to do the grip stuff + headers += HttpHeader("Sec-WebSocket-Extensions", getExtensionRaw(wsExtensions, "grip")); + } + } + + if(wsControlManager) + { + wsControl = wsControlManager->createSession(publicCid); + wsProxyConnectionMap[wsControl] = { + wsControl->sendEventReceived.connect(boost::bind(&Private::wsControl_sendEventReceived, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3)), + wsControl->keepAliveSetupEventReceived.connect(boost::bind(&Private::wsControl_keepAliveSetupEventReceived, this, boost::placeholders::_1, boost::placeholders::_2)), + wsControl->closeEventReceived.connect(boost::bind(&Private::wsControl_closeEventReceived, this, boost::placeholders::_1, boost::placeholders::_2)), + wsControl->detachEventReceived.connect(boost::bind(&Private::wsControl_detachEventReceived, this)), + wsControl->cancelEventReceived.connect(boost::bind(&Private::wsControl_cancelEventReceived, this)), + wsControl->error.connect(boost::bind(&Private::wsControl_error, this)) + }; + wsControl->start(route.id, route.separateStats, channelPrefix, inSock->requestUri()); + + foreach(const QString &subChannel, target.subscriptions) + { + log_debug("wsproxysession: %p implicit subscription to [%s]", q, qPrintable(subChannel)); + + wsControl->sendSubscribe(subChannel.toUtf8()); + } + } + } + + inSock->respondSuccess(outSock->responseReason(), headers); + + incCounter(Stats::ClientHeaderBytesSent, ZhttpManager::estimateResponseHeaderBytes(101, outSock->responseReason(), headers)); + + logConnection(true, 101, 0); + + // send any pending frames + tryReadIn(); + } + + void out_readyRead() + { + tryReadOut(); + } + + void out_writeBytesChanged() + { + if(!detached && inSock) + tryReadIn(); + } + + void out_peerClosed() + { + if(!detached && inSock && inSock->state() != WebSocket::Closing) + inSock->close(outSock->peerCloseCode(), outSock->peerCloseReason()); + } + + void out_closed() + { + int code = outSock->peerCloseCode(); + QString reason = outSock->peerCloseReason(); + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + + if(!detached && inSock && inSock->state() != WebSocket::Closing) + inSock->close(code, reason); + + tryFinish(); + } + + void out_error() + { + WebSocket::ErrorCondition e = outSock->errorCondition(); + log_debug("wsproxysession: %p target error state=%d, condition=%d", q, (int)state, (int)e); + + if(detached) + { + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + + tryFinish(); + return; + } + + if(state == Connecting) + { + bool tryAgain = false; + + switch(e) + { + case WebSocket::ErrorConnect: + case WebSocket::ErrorConnectTimeout: + case WebSocket::ErrorTls: + tryAgain = true; + break; + case WebSocket::ErrorRejected: + reject(true, outSock->responseCode(), outSock->responseReason(), outSock->responseHeaders(), outSock->responseBody()); + break; + default: + reject(true, 502, "Bad Gateway", "Error while proxying to origin."); + break; + } + + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + + if(tryAgain) + tryNextTarget(); + } + else + { + cleanupInSock(); + + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + + tryFinish(); + } + } + + void out_aboutToSendRequest(WebSocketOverHttp *woh) + { + ProxyUtil::applyGripSig("wsproxysession", q, &requestData.headers, sigIss, sigKey); + + woh->setHeaders(requestData.headers); + } private: - void wsControl_sendEventReceived(WebSocket::Frame::Type type, const QByteArray &message, bool queue) - { - // this method accepts a full message, which must be typed - if(type == WebSocket::Frame::Continuation) - return; - - // if we have no socket to write to, say the data was written anyway. - // this is not quite correct but better than leaving the send event - // dangling - if(!inSock || inSock->state() != WebSocket::Connected) - { - wsControl->sendEventWritten(); - return; - } - - // if queue == false, drop if we can't send right now - if(!queue && (inSock->writeBytesAvailable() == 0 || outReadInProgress != -1)) - { - // if drop is allowed, drop is success :) - wsControl->sendEventWritten(); - return; - } - - WebSocket::Frame f(type, message, false); - - if(outReadInProgress != -1) - { - queuedInFrames += QueuedFrame(f, true); - } - else - { - writeInFrame(f, true); - } - - adjustKeepAlive(); - } - - void wsControl_keepAliveSetupEventReceived(WsControl::KeepAliveMode mode, int timeout) - { - keepAliveMode = mode; - - if(keepAliveMode != WsControl::NoKeepAlive && timeout > 0) - { - keepAliveTimeout = timeout; - - if(!keepAliveTimer) - { - keepAliveTimer = new RTimer(this); - keepAliveConneciton = keepAliveTimer->timeout.connect(boost::bind(&Private::keepAliveTimer_timeout, this)); - keepAliveTimer->setSingleShot(true); - } - - setupKeepAlive(); - } - else - { - cleanupKeepAliveTimer(); - } - } - - void wsControl_closeEventReceived(int code, const QByteArray &reason) - { - if(!detached && outSock && outSock->state() != WebSocket::Closing) - outSock->close(); - - if(inSock && inSock->state() != WebSocket::Closing) - inSock->close(code, reason); - } - - void wsControl_detachEventReceived() - { - // if already detached, do nothing - if(detached) - return; - - detached = true; - - if(outSock && outSock->state() != WebSocket::Closing) - outSock->close(); - } - - void wsControl_cancelEventReceived() - { - if(outSock) - { - outWSConnectionMap.erase(outSock); - delete outSock; - outSock = 0; - } - - cleanupInSock(); - - tryFinish(); - } - - void wsControl_error() - { - log_debug("wsproxysession: %p wscontrol session error", q); - wsControl_cancelEventReceived(); - } - - void keepAliveTimer_timeout() - { - wsControl->sendNeedKeepAlive(); - - if(keepAliveMode == WsControl::Interval) - setupKeepAlive(); - } + void wsControl_sendEventReceived(WebSocket::Frame::Type type, const QByteArray &message, bool queue) + { + // this method accepts a full message, which must be typed + if(type == WebSocket::Frame::Continuation) + return; + + // if we have no socket to write to, say the data was written anyway. + // this is not quite correct but better than leaving the send event + // dangling + if(!inSock || inSock->state() != WebSocket::Connected) + { + wsControl->sendEventWritten(); + return; + } + + // if queue == false, drop if we can't send right now + if(!queue && (inSock->writeBytesAvailable() == 0 || outReadInProgress != -1)) + { + // if drop is allowed, drop is success :) + wsControl->sendEventWritten(); + return; + } + + WebSocket::Frame f(type, message, false); + + if(outReadInProgress != -1) + { + queuedInFrames += QueuedFrame(f, true); + } + else + { + writeInFrame(f, true); + } + + adjustKeepAlive(); + } + + void wsControl_keepAliveSetupEventReceived(WsControl::KeepAliveMode mode, int timeout) + { + keepAliveMode = mode; + + if(keepAliveMode != WsControl::NoKeepAlive && timeout > 0) + { + keepAliveTimeout = timeout; + + if(!keepAliveTimer) + { + keepAliveTimer = new RTimer(this); + keepAliveConneciton = keepAliveTimer->timeout.connect(boost::bind(&Private::keepAliveTimer_timeout, this)); + keepAliveTimer->setSingleShot(true); + } + + setupKeepAlive(); + } + else + { + cleanupKeepAliveTimer(); + } + } + + void wsControl_closeEventReceived(int code, const QByteArray &reason) + { + if(!detached && outSock && outSock->state() != WebSocket::Closing) + outSock->close(); + + if(inSock && inSock->state() != WebSocket::Closing) + inSock->close(code, reason); + } + + void wsControl_detachEventReceived() + { + // if already detached, do nothing + if(detached) + return; + + detached = true; + + if(outSock && outSock->state() != WebSocket::Closing) + outSock->close(); + } + + void wsControl_cancelEventReceived() + { + if(outSock) + { + outWSConnectionMap.erase(outSock); + delete outSock; + outSock = 0; + } + + cleanupInSock(); + + tryFinish(); + } + + void wsControl_error() + { + log_debug("wsproxysession: %p wscontrol session error", q); + wsControl_cancelEventReceived(); + } + + void keepAliveTimer_timeout() + { + wsControl->sendNeedKeepAlive(); + + if(keepAliveMode == WsControl::Interval) + setupKeepAlive(); + } }; WsProxySession::WsProxySession(ZRoutes *zroutes, ConnectionManager *connectionManager, const LogUtil::Config &logConfig, StatsManager *statsManager, WsControlManager *wsControlManager, QObject *parent) : - QObject(parent) + QObject(parent) { - d = new Private(this, zroutes, connectionManager, logConfig, statsManager, wsControlManager); + d = new Private(this, zroutes, connectionManager, logConfig, statsManager, wsControlManager); } WsProxySession::~WsProxySession() { - delete d; + delete d; } QHostAddress WsProxySession::logicalClientAddress() const { - return d->logicalClientAddress; + return d->logicalClientAddress; } QByteArray WsProxySession::statsRoute() const { - return d->route.statsRoute(); + return d->route.statsRoute(); } QByteArray WsProxySession::cid() const { - return d->publicCid; + return d->publicCid; } WebSocket *WsProxySession::inSocket() const { - return d->inSock; + return d->inSock; } WebSocket *WsProxySession::outSocket() const { - return d->outSock; + return d->outSock; } void WsProxySession::setDebugEnabled(bool enabled) { - d->debug = enabled; + d->debug = enabled; } void WsProxySession::setDefaultSigKey(const QByteArray &iss, const Jwt::EncodingKey &key) { - d->defaultSigIss = iss; - d->defaultSigKey = key; + d->defaultSigIss = iss; + d->defaultSigKey = key; } void WsProxySession::setDefaultUpstreamKey(const Jwt::DecodingKey &key) { - d->defaultUpstreamKey = key; + d->defaultUpstreamKey = key; } void WsProxySession::setAcceptXForwardedProtocol(bool enabled) { - d->acceptXForwardedProtocol = enabled; + d->acceptXForwardedProtocol = enabled; } void WsProxySession::setUseXForwardedProtocol(bool protoEnabled, bool protocolEnabled) { - d->useXForwardedProto = protoEnabled; - d->useXForwardedProtocol = protocolEnabled; + d->useXForwardedProto = protoEnabled; + d->useXForwardedProtocol = protocolEnabled; } void WsProxySession::setXffRules(const XffRule &untrusted, const XffRule &trusted) { - d->xffRule = untrusted; - d->xffTrustedRule = trusted; + d->xffRule = untrusted; + d->xffTrustedRule = trusted; } void WsProxySession::setOrigHeadersNeedMark(const QList &names) { - d->origHeadersNeedMark = names; + d->origHeadersNeedMark = names; } void WsProxySession::setAcceptPushpinRoute(bool enabled) { - d->acceptPushpinRoute = enabled; + d->acceptPushpinRoute = enabled; } void WsProxySession::setCdnLoop(const QByteArray &value) { - d->cdnLoop = value; + d->cdnLoop = value; } void WsProxySession::start(WebSocket *sock, const QByteArray &publicCid, const DomainMap::Entry &route) { - d->start(sock, publicCid, route); + d->start(sock, publicCid, route); } Callback> & WsProxySession::finishedByPassthroughCallback() { - return d->finishedByPassthroughCallback; + return d->finishedByPassthroughCallback; } #include "wsproxysession.moc" diff --git a/src/cpp/simplehttpserver.cpp b/src/cpp/simplehttpserver.cpp index bec9ba51..bf395dbe 100644 --- a/src/cpp/simplehttpserver.cpp +++ b/src/cpp/simplehttpserver.cpp @@ -91,9 +91,15 @@ class SimpleHttpRequest::Private : public QObject void start(QTcpSocket *_sock) { - connect(_sock, &QTcpSocket::readyRead, this, &Private::sock_readyRead); - connect(_sock, &QTcpSocket::bytesWritten, this, &Private::sock_bytesWritten); - connect(_sock, &QTcpSocket::disconnected, this, &Private::sock_disconnected); + QObject::connect(_sock, &QTcpSocket::readyRead, [this]() { + this->sock_readyRead(); + }); + QObject::connect(_sock, &QTcpSocket::bytesWritten, this, [this](qint64 bytes) { + this->sock_bytesWritten(bytes); + }); + QObject::connect(_sock, &QTcpSocket::disconnected, [this]() { + this->sock_disconnected(); + }); sock = _sock; sock->setParent(this); @@ -103,9 +109,15 @@ class SimpleHttpRequest::Private : public QObject void start(QLocalSocket *_sock) { - connect(_sock, &QLocalSocket::readyRead, this, &Private::sock_readyRead); - connect(_sock, &QLocalSocket::bytesWritten, this, &Private::sock_bytesWritten); - connect(_sock, &QLocalSocket::disconnected, this, &Private::sock_disconnected); + QObject::connect(_sock, &QLocalSocket::readyRead, [this]() { + this->sock_readyRead(); + }); + QObject::connect(_sock, &QLocalSocket::bytesWritten, this, [this](qint64 bytes) { + this->sock_bytesWritten(bytes); + }); + QObject::connect(_sock, &QLocalSocket::disconnected, [this]() { + this->sock_disconnected(); + }); sock = _sock; sock->setParent(this); @@ -342,7 +354,6 @@ class SimpleHttpRequest::Private : public QObject } } -private slots: void sock_readyRead() { if(state == ReadHeader || state == ReadBody) diff --git a/src/cpp/websocket.h b/src/cpp/websocket.h index 5274ad70..312c715d 100644 --- a/src/cpp/websocket.h +++ b/src/cpp/websocket.h @@ -114,14 +114,12 @@ class WebSocket : public QObject virtual void close(int code = -1, const QString &reason = QString()) = 0; Signal connected; - -signals: - void readyRead(); - void framesWritten(int count, int contentBytes); - void writeBytesChanged(); - void peerClosed(); // emitted only if peer closes before we do - void closed(); // emitted after peer acks our close, or immediately if we were acking - void error(); + Signal readyRead; + boost::signals2::signal framesWritten; + Signal writeBytesChanged; + Signal peerClosed; // emitted only if peer closes before we do + Signal closed; // emitted after peer acks our close, or immediately if we were acking + Signal error; }; #endif diff --git a/src/cpp/zwebsocket.cpp b/src/cpp/zwebsocket.cpp index 1cce3c84..4f631b77 100644 --- a/src/cpp/zwebsocket.cpp +++ b/src/cpp/zwebsocket.cpp @@ -297,7 +297,7 @@ class ZWebSocket::Private : public QObject state = Idle; cleanup(); - QMetaObject::invokeMethod(q, "closed", Qt::QueuedConnection); + QMetaObject::invokeMethod(q, "doClosed", Qt::QueuedConnection); } Frame readFrame() @@ -337,7 +337,7 @@ class ZWebSocket::Private : public QObject // if peer was already closed, then we're done! state = Idle; cleanup(); - QMetaObject::invokeMethod(q, "closed", Qt::QueuedConnection); + QMetaObject::invokeMethod(q, "doClosed", Qt::QueuedConnection); } else { @@ -414,7 +414,7 @@ class ZWebSocket::Private : public QObject if(written > 0 || contentBytesWritten > 0) { - emit q->framesWritten(written, contentBytesWritten); + q->framesWritten(written, contentBytesWritten); if(!self) return; } @@ -428,7 +428,7 @@ class ZWebSocket::Private : public QObject // if peer was already closed, then we're done! state = Idle; cleanup(); - emit q->closed(); + q->closed(); return; } else @@ -483,7 +483,7 @@ class ZWebSocket::Private : public QObject state = Idle; cleanup(); - emit q->error(); + q->error(); return; } else if(packet.type == ZhttpRequestPacket::Cancel) @@ -493,7 +493,7 @@ class ZWebSocket::Private : public QObject errorCondition = ErrorGeneric; state = Idle; cleanup(); - emit q->error(); + q->error(); return; } @@ -506,7 +506,7 @@ class ZWebSocket::Private : public QObject state = Idle; errorCondition = ErrorGeneric; cleanup(); - emit q->error(); + q->error(); return; } @@ -588,7 +588,7 @@ class ZWebSocket::Private : public QObject state = Idle; cleanup(); - emit q->error(); + q->error(); return; } else if(packet.type == ZhttpResponsePacket::Cancel) @@ -598,7 +598,7 @@ class ZWebSocket::Private : public QObject errorCondition = ErrorGeneric; state = Idle; cleanup(); - emit q->error(); + q->error(); return; } @@ -614,7 +614,7 @@ class ZWebSocket::Private : public QObject state = Idle; errorCondition = ErrorGeneric; cleanup(); - emit q->error(); + q->error(); return; } @@ -640,7 +640,7 @@ class ZWebSocket::Private : public QObject errorCondition = ErrorGeneric; cleanup(); log_warning("zws client: error id=%s initial response wrong type", id.data()); - emit q->error(); + q->error(); return; } @@ -650,7 +650,7 @@ class ZWebSocket::Private : public QObject errorCondition = ErrorGeneric; cleanup(); log_warning("zws client: error id=%s initial ack did not contain from field", id.data()); - emit q->error(); + q->error(); return; } } @@ -740,12 +740,12 @@ class ZWebSocket::Private : public QObject { state = Idle; cleanup(); - emit q->closed(); + q->closed(); } else { state = ConnectedPeerClosed; - emit q->peerClosed(); + q->peerClosed(); } } } @@ -977,6 +977,11 @@ class ZWebSocket::Private : public QObject } public slots: + void doClosed() + { + q->closed(); + } + void doUpdate() { pendingUpdate = false; @@ -989,14 +994,14 @@ public slots: { state = Idle; cleanup(); - emit q->closed(); + q->closed(); return; } else { QPointer self = this; state = ConnectedPeerClosed; - emit q->peerClosed(); + q->peerClosed(); if(!self) return; } @@ -1008,7 +1013,7 @@ public slots: readableChanged = false; QPointer self = this; - emit q->readyRead(); + q->readyRead(); if(!self) return; } @@ -1021,7 +1026,7 @@ public slots: { state = Idle; errorCondition = ErrorUnavailable; - emit q->error(); + q->error(); cleanup(); return; } @@ -1055,7 +1060,7 @@ public slots: { writableChanged = false; - emit q->writeBytesChanged(); + q->writeBytesChanged(); } } } @@ -1066,7 +1071,7 @@ public slots: state = Idle; errorCondition = ErrorTimeout; cleanup(); - emit q->error(); + q->error(); } void keepAlive_timeout() From f985a05aa61ab192ebe6f2075aec428fc8071cca Mon Sep 17 00:00:00 2001 From: sima Date: Wed, 31 Jan 2024 12:59:38 -0800 Subject: [PATCH 3/5] addressed PR comments --- src/cpp/proxy/sockjssession.cpp | 7 +++---- src/cpp/proxy/websocketoverhttp.cpp | 6 +++--- src/cpp/proxy/wsproxysession.cpp | 17 ++++------------- 3 files changed, 10 insertions(+), 20 deletions(-) diff --git a/src/cpp/proxy/sockjssession.cpp b/src/cpp/proxy/sockjssession.cpp index d416c4ba..fb0de5a1 100644 --- a/src/cpp/proxy/sockjssession.cpp +++ b/src/cpp/proxy/sockjssession.cpp @@ -44,7 +44,7 @@ using std::map; #define UNCONNECTED_TIMEOUT 5 struct WSConnections { - Connection connectedConnection; + Connection connectedConnection; Connection readyReadConnection; Connection framesWrittenConnection; Connection writeBytesChangedConnection; @@ -160,7 +160,7 @@ class SockJsSession::Private : public QObject bool updating; Connection bytesWrittenConnection; Connection errorConnection; - map wsConnectionMap; + WSConnections wsConnection; Private(SockJsSession *_q) : QObject(_q), @@ -244,7 +244,6 @@ class SockJsSession::Private : public QObject } requests.clear(); - wsConnectionMap.erase(sock); delete sock; sock = 0; @@ -275,7 +274,7 @@ class SockJsSession::Private : public QObject } else { - wsConnectionMap[sock] = { + wsConnection = { sock->readyRead.connect(boost::bind(&Private::sock_readyRead, this)), sock->framesWritten.connect(boost::bind(&Private::sock_framesWritten, this, boost::placeholders::_1, boost::placeholders::_2)), sock->writeBytesChanged.connect(boost::bind(&Private::sock_writeBytesChanged, this)), diff --git a/src/cpp/proxy/websocketoverhttp.cpp b/src/cpp/proxy/websocketoverhttp.cpp index f7342d06..1792d39b 100644 --- a/src/cpp/proxy/websocketoverhttp.cpp +++ b/src/cpp/proxy/websocketoverhttp.cpp @@ -47,9 +47,9 @@ namespace { struct WSConnections { - Connection disconnectedConnection; - Connection closedConnection; - Connection errorConnection; + Connection disconnectedConnection; + Connection closedConnection; + Connection errorConnection; }; class WsEvent diff --git a/src/cpp/proxy/wsproxysession.cpp b/src/cpp/proxy/wsproxysession.cpp index 82ac9638..71c011e9 100644 --- a/src/cpp/proxy/wsproxysession.cpp +++ b/src/cpp/proxy/wsproxysession.cpp @@ -309,8 +309,8 @@ class WsProxySession::Private : public QObject Connection keepAliveConneciton; Connection aboutToSendRequestConnection; map wsProxyConnectionMap; - map outWSConnectionMap; - map inWSConnectionMap; + WSConnections outWSConnection; + InWSConnections inWSConnection; Private(WsProxySession *_q, ZRoutes *_zroutes, ConnectionManager *_connectionManager, const LogUtil::Config &_logConfig, StatsManager *_statsManager, WsControlManager *_wsControlManager) : QObject(_q), @@ -352,7 +352,6 @@ class WsProxySession::Private : public QObject cleanupInSock(); - outWSConnectionMap.erase(outSock); delete outSock; outSock = 0; @@ -372,7 +371,6 @@ class WsProxySession::Private : public QObject if(inSock) { connectionManager->removeConnection(inSock); - inWSConnectionMap.erase(inSock); delete inSock; inSock = 0; } @@ -402,7 +400,7 @@ class WsProxySession::Private : public QObject inSock = sock; inSock->setParent(this); - inWSConnectionMap[inSock] = { + inWSConnection = { inSock->readyRead.connect(boost::bind(&Private::in_readyRead, this)), inSock->framesWritten.connect(boost::bind(&Private::in_framesWritten, this, boost::placeholders::_1, boost::placeholders::_2)), inSock->writeBytesChanged.connect(boost::bind(&Private::in_writeBytesChanged, this)), @@ -582,7 +580,7 @@ class WsProxySession::Private : public QObject outSock->setParent(this); } } - outWSConnectionMap[outSock] = { + outWSConnection = { outSock->connected.connect(boost::bind(&Private::out_connected, this)), outSock->readyRead.connect(boost::bind(&Private::out_readyRead, this)), outSock->writeBytesChanged.connect(boost::bind(&Private::out_writeBytesChanged, this)), @@ -860,7 +858,6 @@ private slots: { if(outSock->state() == WebSocket::Connecting) { - outWSConnectionMap.erase(outSock); delete outSock; outSock = 0; @@ -892,7 +889,6 @@ private slots: if(!detached) { - outWSConnectionMap.erase(outSock); delete outSock; outSock = 0; } @@ -988,7 +984,6 @@ private slots: { int code = outSock->peerCloseCode(); QString reason = outSock->peerCloseReason(); - outWSConnectionMap.erase(outSock); delete outSock; outSock = 0; @@ -1005,7 +1000,6 @@ private slots: if(detached) { - outWSConnectionMap.erase(outSock); delete outSock; outSock = 0; @@ -1032,7 +1026,6 @@ private slots: break; } - outWSConnectionMap.erase(outSock); delete outSock; outSock = 0; @@ -1043,7 +1036,6 @@ private slots: { cleanupInSock(); - outWSConnectionMap.erase(outSock); delete outSock; outSock = 0; @@ -1144,7 +1136,6 @@ private slots: { if(outSock) { - outWSConnectionMap.erase(outSock); delete outSock; outSock = 0; } From abdd54cfc212a9804259758af4fd5ea6492065e4 Mon Sep 17 00:00:00 2001 From: sima Date: Wed, 31 Jan 2024 13:56:12 -0800 Subject: [PATCH 4/5] disconnect issue addressed --- src/cpp/proxy/sockjssession.cpp | 5 +++-- src/cpp/proxy/wsproxysession.cpp | 19 ++++++++++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/cpp/proxy/sockjssession.cpp b/src/cpp/proxy/sockjssession.cpp index fb0de5a1..31849464 100644 --- a/src/cpp/proxy/sockjssession.cpp +++ b/src/cpp/proxy/sockjssession.cpp @@ -160,7 +160,7 @@ class SockJsSession::Private : public QObject bool updating; Connection bytesWrittenConnection; Connection errorConnection; - WSConnections wsConnection; + WSConnections* wsConnection; Private(SockJsSession *_q) : QObject(_q), @@ -244,6 +244,7 @@ class SockJsSession::Private : public QObject } requests.clear(); + delete wsConnection; delete sock; sock = 0; @@ -274,7 +275,7 @@ class SockJsSession::Private : public QObject } else { - wsConnection = { + wsConnection = new WSConnections{ sock->readyRead.connect(boost::bind(&Private::sock_readyRead, this)), sock->framesWritten.connect(boost::bind(&Private::sock_framesWritten, this, boost::placeholders::_1, boost::placeholders::_2)), sock->writeBytesChanged.connect(boost::bind(&Private::sock_writeBytesChanged, this)), diff --git a/src/cpp/proxy/wsproxysession.cpp b/src/cpp/proxy/wsproxysession.cpp index 71c011e9..bd499a02 100644 --- a/src/cpp/proxy/wsproxysession.cpp +++ b/src/cpp/proxy/wsproxysession.cpp @@ -309,8 +309,8 @@ class WsProxySession::Private : public QObject Connection keepAliveConneciton; Connection aboutToSendRequestConnection; map wsProxyConnectionMap; - WSConnections outWSConnection; - InWSConnections inWSConnection; + WSConnections* outWSConnection; + InWSConnections* inWSConnection; Private(WsProxySession *_q, ZRoutes *_zroutes, ConnectionManager *_connectionManager, const LogUtil::Config &_logConfig, StatsManager *_statsManager, WsControlManager *_wsControlManager) : QObject(_q), @@ -351,7 +351,8 @@ class WsProxySession::Private : public QObject cleanupKeepAliveTimer(); cleanupInSock(); - + + delete outWSConnection; delete outSock; outSock = 0; @@ -371,6 +372,7 @@ class WsProxySession::Private : public QObject if(inSock) { connectionManager->removeConnection(inSock); + delete inWSConnection; delete inSock; inSock = 0; } @@ -400,7 +402,7 @@ class WsProxySession::Private : public QObject inSock = sock; inSock->setParent(this); - inWSConnection = { + inWSConnection = new InWSConnections{ inSock->readyRead.connect(boost::bind(&Private::in_readyRead, this)), inSock->framesWritten.connect(boost::bind(&Private::in_framesWritten, this, boost::placeholders::_1, boost::placeholders::_2)), inSock->writeBytesChanged.connect(boost::bind(&Private::in_writeBytesChanged, this)), @@ -580,7 +582,7 @@ class WsProxySession::Private : public QObject outSock->setParent(this); } } - outWSConnection = { + outWSConnection = new WSConnections{ outSock->connected.connect(boost::bind(&Private::out_connected, this)), outSock->readyRead.connect(boost::bind(&Private::out_readyRead, this)), outSock->writeBytesChanged.connect(boost::bind(&Private::out_writeBytesChanged, this)), @@ -858,6 +860,7 @@ private slots: { if(outSock->state() == WebSocket::Connecting) { + delete outWSConnection; delete outSock; outSock = 0; @@ -889,6 +892,7 @@ private slots: if(!detached) { + delete outWSConnection; delete outSock; outSock = 0; } @@ -984,6 +988,7 @@ private slots: { int code = outSock->peerCloseCode(); QString reason = outSock->peerCloseReason(); + delete outWSConnection; delete outSock; outSock = 0; @@ -1000,6 +1005,7 @@ private slots: if(detached) { + delete outWSConnection; delete outSock; outSock = 0; @@ -1026,6 +1032,7 @@ private slots: break; } + delete outWSConnection; delete outSock; outSock = 0; @@ -1036,6 +1043,7 @@ private slots: { cleanupInSock(); + delete outWSConnection; delete outSock; outSock = 0; @@ -1136,6 +1144,7 @@ private slots: { if(outSock) { + delete outWSConnection; delete outSock; outSock = 0; } From c88342bfffc36b779f2e711305ea9504ed48cac3 Mon Sep 17 00:00:00 2001 From: sima Date: Thu, 1 Feb 2024 05:07:54 -0800 Subject: [PATCH 5/5] PR comment addressed --- src/cpp/proxy/sockjssession.cpp | 6 +++--- src/cpp/proxy/wsproxysession.cpp | 26 +++++++++++++------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/cpp/proxy/sockjssession.cpp b/src/cpp/proxy/sockjssession.cpp index 31849464..15819ce2 100644 --- a/src/cpp/proxy/sockjssession.cpp +++ b/src/cpp/proxy/sockjssession.cpp @@ -160,7 +160,7 @@ class SockJsSession::Private : public QObject bool updating; Connection bytesWrittenConnection; Connection errorConnection; - WSConnections* wsConnection; + WSConnections wsConnection; Private(SockJsSession *_q) : QObject(_q), @@ -244,7 +244,7 @@ class SockJsSession::Private : public QObject } requests.clear(); - delete wsConnection; + wsConnection = WSConnections(); delete sock; sock = 0; @@ -275,7 +275,7 @@ class SockJsSession::Private : public QObject } else { - wsConnection = new WSConnections{ + wsConnection = WSConnections{ sock->readyRead.connect(boost::bind(&Private::sock_readyRead, this)), sock->framesWritten.connect(boost::bind(&Private::sock_framesWritten, this, boost::placeholders::_1, boost::placeholders::_2)), sock->writeBytesChanged.connect(boost::bind(&Private::sock_writeBytesChanged, this)), diff --git a/src/cpp/proxy/wsproxysession.cpp b/src/cpp/proxy/wsproxysession.cpp index bd499a02..57f139a9 100644 --- a/src/cpp/proxy/wsproxysession.cpp +++ b/src/cpp/proxy/wsproxysession.cpp @@ -309,8 +309,8 @@ class WsProxySession::Private : public QObject Connection keepAliveConneciton; Connection aboutToSendRequestConnection; map wsProxyConnectionMap; - WSConnections* outWSConnection; - InWSConnections* inWSConnection; + WSConnections outWSConnection; + InWSConnections inWSConnection; Private(WsProxySession *_q, ZRoutes *_zroutes, ConnectionManager *_connectionManager, const LogUtil::Config &_logConfig, StatsManager *_statsManager, WsControlManager *_wsControlManager) : QObject(_q), @@ -352,7 +352,7 @@ class WsProxySession::Private : public QObject cleanupInSock(); - delete outWSConnection; + outWSConnection = WSConnections(); delete outSock; outSock = 0; @@ -372,7 +372,7 @@ class WsProxySession::Private : public QObject if(inSock) { connectionManager->removeConnection(inSock); - delete inWSConnection; + inWSConnection = InWSConnections(); delete inSock; inSock = 0; } @@ -402,7 +402,7 @@ class WsProxySession::Private : public QObject inSock = sock; inSock->setParent(this); - inWSConnection = new InWSConnections{ + inWSConnection = InWSConnections{ inSock->readyRead.connect(boost::bind(&Private::in_readyRead, this)), inSock->framesWritten.connect(boost::bind(&Private::in_framesWritten, this, boost::placeholders::_1, boost::placeholders::_2)), inSock->writeBytesChanged.connect(boost::bind(&Private::in_writeBytesChanged, this)), @@ -582,7 +582,7 @@ class WsProxySession::Private : public QObject outSock->setParent(this); } } - outWSConnection = new WSConnections{ + outWSConnection = { outSock->connected.connect(boost::bind(&Private::out_connected, this)), outSock->readyRead.connect(boost::bind(&Private::out_readyRead, this)), outSock->writeBytesChanged.connect(boost::bind(&Private::out_writeBytesChanged, this)), @@ -860,7 +860,7 @@ private slots: { if(outSock->state() == WebSocket::Connecting) { - delete outWSConnection; + outWSConnection = WSConnections(); delete outSock; outSock = 0; @@ -892,7 +892,7 @@ private slots: if(!detached) { - delete outWSConnection; + outWSConnection = WSConnections(); delete outSock; outSock = 0; } @@ -988,7 +988,7 @@ private slots: { int code = outSock->peerCloseCode(); QString reason = outSock->peerCloseReason(); - delete outWSConnection; + outWSConnection = WSConnections(); delete outSock; outSock = 0; @@ -1005,7 +1005,7 @@ private slots: if(detached) { - delete outWSConnection; + outWSConnection = WSConnections(); delete outSock; outSock = 0; @@ -1032,7 +1032,7 @@ private slots: break; } - delete outWSConnection; + outWSConnection = WSConnections(); delete outSock; outSock = 0; @@ -1043,7 +1043,7 @@ private slots: { cleanupInSock(); - delete outWSConnection; + outWSConnection = WSConnections(); delete outSock; outSock = 0; @@ -1144,7 +1144,7 @@ private slots: { if(outSock) { - delete outWSConnection; + outWSConnection = WSConnections(); delete outSock; outSock = 0; }