Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

boostification of all websocket signals #47899

Merged
merged 6 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/cpp/handler/httpsessionupdatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ class HttpSessionUpdateManager::Private : public QObject
bucket->key = key;
bucket->sessions += hs;
bucket->timer = new QTimer(this);
connect(bucket->timer, &QTimer::timeout, this, &Private::timer_timeout);
QObject::connect(bucket->timer, &QTimer::timeout, [this, timer=bucket->timer]() {
this->timer_timeout(timer);
});

buckets[key] = bucket;
bucketsByTimer[bucket->timer] = bucket;
Expand All @@ -137,10 +139,9 @@ class HttpSessionUpdateManager::Private : public QObject
removeBucket(bucket);
}

private slots:
void timer_timeout()
private:
void timer_timeout(QTimer *timer)
{
QTimer *timer = (QTimer *)sender();
Bucket *bucket = bucketsByTimer.value(timer);
if(!bucket)
return;
Expand Down
30 changes: 20 additions & 10 deletions src/cpp/proxy/sockjsmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ static QByteArray serializeJsonString(const QString &s)
return tmp.mid(1, tmp.length() - 2);
}

struct WSConnections {
Connection closedConnection;
Connection errorConnection;
};

struct ZhttpReqConnections{
Connection readyReadConnection;
Connection bytesWrittenConnection;
Expand Down Expand Up @@ -123,6 +128,7 @@ class SockJsManager::Private : public QObject
owner->reqConnectionMap.erase(req);
delete req;
}
owner->wsConnectionMap.erase(sock);
delete sock;

if(timer)
Expand All @@ -146,6 +152,7 @@ class SockJsManager::Private : public QObject
QByteArray iframeHtmlEtag;
QSet<ZhttpRequest*> discardedRequests;
map<ZhttpRequest*, ZhttpReqConnections> reqConnectionMap;
map<ZWebSocket*, WSConnections> wsConnectionMap;

Private(SockJsManager *_q, const QString &sockJsUrl) :
QObject(_q),
Expand Down Expand Up @@ -198,7 +205,10 @@ class SockJsManager::Private : public QObject
{
// if there's a close value, hang around for a little bit
s->timer = new QTimer(this);
connect(s->timer, &QTimer::timeout, this, &Private::timer_timeout);
QObject::connect(s->timer, &QTimer::timeout, [this, timer=s->timer]() {
this->timer_timeout(timer);
});

s->timer->setSingleShot(true);
sessionsByTimer.insert(s->timer, s);
s->timer->start(5000);
Expand Down Expand Up @@ -277,8 +287,10 @@ class SockJsManager::Private : public QObject
s->asUri.setPath(QString::fromUtf8(encPath.mid(0, basePathStart) + "/websocket"), QUrl::StrictMode);
s->route = route;

connect(sock, &ZWebSocket::closed, this, &Private::sock_closed);
connect(sock, &ZWebSocket::error, this, &Private::sock_error);
wsConnectionMap[sock] = {
sock->closed.connect(boost::bind(&Private::sock_closed, this, sock)),
sock->error.connect(boost::bind(&Private::sock_error, this, sock))
};

sessions += s;
sessionsBySocket.insert(s->sock, s);
Expand Down Expand Up @@ -589,6 +601,7 @@ class SockJsManager::Private : public QObject
return s->ext;
}

private:
void req_readyRead(ZhttpRequest *req)
{
// for a request to have been discardable, we must have read the
Expand Down Expand Up @@ -646,10 +659,8 @@ class SockJsManager::Private : public QObject
removeSession(s);
}

private slots:
void sock_closed()
void sock_closed(ZWebSocket *sock)
{
ZWebSocket *sock = (ZWebSocket *)sender();
Session *s = sessionsBySocket.value(sock);
assert(s);

Expand All @@ -659,9 +670,8 @@ private slots:
removeSession(s);
}

void sock_error()
void sock_error(ZWebSocket *sock)
{
ZWebSocket *sock = (ZWebSocket *)sender();
Session *s = sessionsBySocket.value(sock);
assert(s);

Expand All @@ -671,9 +681,9 @@ private slots:
removeSession(s);
}

void timer_timeout()
private:
void timer_timeout(QTimer *timer)
{
QTimer *timer = (QTimer *)sender();
Session *s = sessionsByTimer.value(timer);
assert(s);

Expand Down
72 changes: 46 additions & 26 deletions src/cpp/proxy/sockjssession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,22 @@
#include "zwebsocket.h"
#include "sockjsmanager.h"

using std::map;

#define BUFFER_SIZE 200000
#define KEEPALIVE_TIMEOUT 25
#define UNCONNECTED_TIMEOUT 5

struct WSConnections {
Connection connectedConnection;
Connection readyReadConnection;
Connection framesWrittenConnection;
Connection writeBytesChangedConnection;
Connection closedConnection;
Connection peerClosedConnection;
Connection sockErrorConnection;
};

class SockJsSession::Private : public QObject
{
Q_OBJECT
Expand Down Expand Up @@ -148,6 +160,7 @@ class SockJsSession::Private : public QObject
bool updating;
Connection bytesWrittenConnection;
Connection errorConnection;
WSConnections wsConnection;

Private(SockJsSession *_q) :
QObject(_q),
Expand Down Expand Up @@ -231,6 +244,7 @@ class SockJsSession::Private : public QObject
}
requests.clear();

wsConnection = WSConnections();
delete sock;
sock = 0;

Expand Down Expand Up @@ -261,12 +275,14 @@ class SockJsSession::Private : public QObject
}
else
{
connect(sock, &ZWebSocket::readyRead, this, &Private::sock_readyRead);
connect(sock, &ZWebSocket::framesWritten, this, &Private::sock_framesWritten);
connect(sock, &ZWebSocket::writeBytesChanged, this, &Private::sock_writeBytesChanged);
connect(sock, &ZWebSocket::closed, this, &Private::sock_closed);
connect(sock, &ZWebSocket::peerClosed, this, &Private::sock_peerClosed);
connect(sock, &ZWebSocket::error, this, &Private::sock_error);
wsConnection = WSConnections{
sock->readyRead.connect(boost::bind(&Private::sock_readyRead, this)),
sock->framesWritten.connect(boost::bind(&Private::sock_framesWritten, this, boost::placeholders::_1, boost::placeholders::_2)),
sock->writeBytesChanged.connect(boost::bind(&Private::sock_writeBytesChanged, this)),
sock->closed.connect(boost::bind(&Private::sock_closed, this)),
sock->peerClosed.connect(boost::bind(&Private::sock_peerClosed, this)),
sock->error.connect(boost::bind(&Private::sock_error, this))
};
}
}

Expand Down Expand Up @@ -560,7 +576,7 @@ class SockJsSession::Private : public QObject
state = Idle;
applyLinger();
cleanup();
QMetaObject::invokeMethod(q, "closed", Qt::QueuedConnection);
QMetaObject::invokeMethod(q, "doClosed", Qt::QueuedConnection);
}
else
tryWrite();
Expand Down Expand Up @@ -622,7 +638,7 @@ class SockJsSession::Private : public QObject
if(bytes > 0)
{
QPointer<QObject> self = this;
emit q->writeBytesChanged();
q->writeBytesChanged();
if(!self)
return;
}
Expand Down Expand Up @@ -692,7 +708,7 @@ class SockJsSession::Private : public QObject

if(emitReadyRead)
{
emit q->readyRead();
q->readyRead();
if(!self)
return false;
}
Expand Down Expand Up @@ -799,15 +815,15 @@ class SockJsSession::Private : public QObject
{
state = Idle;
cleanup();
emit q->error();
q->error();

// stop signals
return false;
}

if(emitReadyRead)
{
emit q->readyRead();
q->readyRead();
if(!self)
return false;
}
Expand Down Expand Up @@ -850,7 +866,7 @@ class SockJsSession::Private : public QObject
pendingWrittenBytes = 0;
}

emit q->framesWritten(count, contentBytes);
q->framesWritten(count, contentBytes);
}

QVariant applyLinger()
Expand Down Expand Up @@ -901,7 +917,7 @@ class SockJsSession::Private : public QObject
state = Idle;
removeRequestItem(ri);
cleanup();
emit q->closed();
q->closed();
return;
}
else if(ri->type == RequestItem::Receive)
Expand All @@ -918,7 +934,7 @@ class SockJsSession::Private : public QObject
state = Idle;
removeRequestItem(ri);
cleanup();
emit q->closed();
q->closed();
return;
}
}
Expand Down Expand Up @@ -951,25 +967,24 @@ class SockJsSession::Private : public QObject
if(close && !peerClosed)
{
peerClosed = true;
emit q->peerClosed();
q->peerClosed();
return;
}

state = Idle;
cleanup();

if(close)
emit q->closed();
q->closed();
else
emit q->error();
q->error();
}
else
{
removeRequestItem(ri);
}
}

private slots:
void sock_readyRead()
{
if(mode == WebSocketFramed)
Expand All @@ -978,7 +993,7 @@ private slots:
}
else // WebSocketPassthrough
{
emit q->readyRead();
q->readyRead();
}
}

Expand All @@ -989,14 +1004,14 @@ private slots:

void sock_writeBytesChanged()
{
emit q->writeBytesChanged();
q->writeBytesChanged();
}

void sock_peerClosed()
{
peerCloseCode = sock->peerCloseCode();
peerCloseReason = sock->peerCloseReason();
emit q->peerClosed();
q->peerClosed();
}

void sock_closed()
Expand All @@ -1005,15 +1020,15 @@ private slots:
peerCloseReason = sock->peerCloseReason();
state = Idle;
cleanup();
emit q->closed();
q->closed();
}

void sock_error()
{
state = Idle;
errorCondition = sock->errorCondition();
cleanup();
emit q->error();
q->error();
}

void doUpdate()
Expand All @@ -1024,7 +1039,7 @@ private slots:
{
state = Idle;
cleanup();
emit q->error();
q->error();
return;
}

Expand All @@ -1040,11 +1055,16 @@ private slots:
pendingWrittenFrames = 0;
pendingWrittenBytes = 0;

emit q->framesWritten(count, contentBytes);
q->framesWritten(count, contentBytes);
}
}
}

private slots:
void doClosed(){
q->closed();
}

void keepAliveTimer_timeout()
{
assert(mode != WebSocketPassthrough);
Expand All @@ -1064,7 +1084,7 @@ private slots:
// timeout while unconnected
state = Idle;
cleanup();
emit q->error();
q->error();
}
}
else
Expand Down
Loading
Loading