Skip to content

Commit

Permalink
boostification of &QZmq::Valve::readyRead (#47888)
Browse files Browse the repository at this point in the history
  • Loading branch information
sima-fastly authored Jan 24, 2024
1 parent 42ac17c commit b7f2c7f
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 36 deletions.
14 changes: 9 additions & 5 deletions src/cpp/handler/handlerengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,10 @@ class HandlerEngine::Private : public QObject
Connection connectionsRefreshedConnection;
Connection unsubscribedConnection;
Connection reportedConnection;
Connection pullConnection;
Connection controlValveConnection;
Connection inSubValveConnection;
Connection proxyStatConnection;

Private(HandlerEngine *_q) :
QObject(_q),
Expand Down Expand Up @@ -1410,7 +1414,7 @@ class HandlerEngine::Private : public QObject
}

inPullValve = new QZmq::Valve(inPullSock, this);
connect(inPullValve, &QZmq::Valve::readyRead, this, &Private::inPull_readyRead);
pullConnection = inPullValve->readyRead.connect(boost::bind(&Private::inPull_readyRead, this, boost::placeholders::_1));

log_info("in pull: %s", qPrintable(config.pushInSpec));
}
Expand All @@ -1437,7 +1441,7 @@ class HandlerEngine::Private : public QObject
}

inSubValve = new QZmq::Valve(inSubSock, this);
connect(inSubValve, &QZmq::Valve::readyRead, this, &Private::inSub_readyRead);
inSubValveConnection = inSubValve->readyRead.connect(boost::bind(&Private::inSub_readyRead, this, boost::placeholders::_1));

log_info("in sub: %s", qPrintable(config.pushInSubSpecs.join(", ")));
}
Expand Down Expand Up @@ -1471,7 +1475,7 @@ class HandlerEngine::Private : public QObject
}

wsControlInValve = new QZmq::Valve(wsControlInSock, this);
connect(wsControlInValve, &QZmq::Valve::readyRead, this, &Private::wsControlIn_readyRead);
controlValveConnection = wsControlInValve->readyRead.connect(boost::bind(&Private::wsControlIn_readyRead, this, boost::placeholders::_1));

log_info("ws control in: %s", qPrintable(config.wsControlInSpec));

Expand Down Expand Up @@ -1551,7 +1555,7 @@ class HandlerEngine::Private : public QObject
}

proxyStatsValve = new QZmq::Valve(proxyStatsSock, this);
connect(proxyStatsValve, &QZmq::Valve::readyRead, this, &Private::proxyStats_readyRead);
proxyStatConnection = proxyStatsValve->readyRead.connect(boost::bind(&Private::proxyStats_readyRead, this, boost::placeholders::_1));

log_info("proxy stats: %s", qPrintable(config.proxyStatsSpecs.join(", ")));
}
Expand Down Expand Up @@ -2436,7 +2440,6 @@ class HandlerEngine::Private : public QObject
deferreds += report;
}

private slots:
QVariant parseJsonOrTnetstring(const QByteArray &message, bool *ok = 0, QString *errorMessage = 0) {
QVariant data;
bool ok_;
Expand Down Expand Up @@ -3070,6 +3073,7 @@ private slots:
}
}

private slots:
void hs_subscribe(HttpSession *hs, const QString &channel)
{
Instruct::HoldMode mode = hs->holdMode();
Expand Down
16 changes: 9 additions & 7 deletions src/cpp/m2adapter/m2adapterapp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ class M2AdapterApp::Private : public QObject
Connection quitConnection;
Connection hupConnection;
map<QZmq::Socket*, Connection> rrConnection;
Connection m2InValveConnection;
Connection zhttpInValveConnection;
Connection zwsInValveConnection;

Private(M2AdapterApp *_q) :
QObject(_q),
Expand Down Expand Up @@ -654,7 +657,7 @@ class M2AdapterApp::Private : public QObject
}

m2_in_valve = new QZmq::Valve(m2_in_sock, this);
connect(m2_in_valve, &QZmq::Valve::readyRead, this, &Private::m2_in_readyRead);
m2InValveConnection = m2_in_valve->readyRead.connect(boost::bind(&Private::m2_in_readyRead, this, boost::placeholders::_1));

m2_out_sock = new QZmq::Socket(QZmq::Socket::Pub, this);
m2_out_sock->setShutdownWaitTime(0);
Expand All @@ -674,7 +677,7 @@ class M2AdapterApp::Private : public QObject
sock->setShutdownWaitTime(0);
sock->setHwm(1); // queue up 1 outstanding request at most
sock->setWriteQueueEnabled(false);
rrConnection[sock] = sock->readyRead.connect(boost::bind(&Private::m2_control_readyRead, this));
rrConnection[sock] = sock->readyRead.connect(boost::bind(&Private::m2_control_readyRead, this, sock));

log_info("m2_control connect %s:%s", m2_send_idents[n].data(), qPrintable(spec));
sock->connectToAddress(spec);
Expand Down Expand Up @@ -709,7 +712,7 @@ class M2AdapterApp::Private : public QObject
}

zhttp_in_valve = new QZmq::Valve(zhttp_in_sock, this);
connect(zhttp_in_valve, &QZmq::Valve::readyRead, this, &Private::zhttp_in_readyRead);
zhttpInValveConnection = zhttp_in_valve->readyRead.connect(boost::bind(&Private::zhttp_in_readyRead, this, boost::placeholders::_1));

zhttp_out_sock = new QZmq::Socket(QZmq::Socket::Push, this);
zhttp_out_sock->setShutdownWaitTime(0);
Expand Down Expand Up @@ -778,7 +781,7 @@ class M2AdapterApp::Private : public QObject
}

zws_in_valve = new QZmq::Valve(zws_in_sock, this);
connect(zws_in_valve, &QZmq::Valve::readyRead, this, &Private::zws_in_readyRead);
zwsInValveConnection = zws_in_valve->readyRead.connect(boost::bind(&Private::zws_in_readyRead, this, boost::placeholders::_1));

zws_out_sock = new QZmq::Socket(QZmq::Socket::Push, this);
zws_out_sock->setShutdownWaitTime(0);
Expand Down Expand Up @@ -2314,7 +2317,6 @@ class M2AdapterApp::Private : public QObject
}
}

private slots:
void m2_in_readyRead(const QList<QByteArray> &message)
{
if(message.count() != 1)
Expand Down Expand Up @@ -2734,9 +2736,8 @@ private slots:
}
}

void m2_control_readyRead()
void m2_control_readyRead(QZmq::Socket *sock)
{
QZmq::Socket *sock = (QZmq::Socket *)sender();
int index = -1;
for(int n = 0; n < controlPorts.count(); ++n)
{
Expand Down Expand Up @@ -2821,6 +2822,7 @@ private slots:
handleZhttpIn(WebSocket, message);
}

private slots:
void status_timeout()
{
int now = time.elapsed();
Expand Down
5 changes: 3 additions & 2 deletions src/cpp/proxy/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class Engine::Private : public QObject
map<ProxySession*, Connection> finishedConnection;
map<ProxySession*, Connection> reqSessionDestroyedConnection;
Connection connMaxConnection;
Connection rrConnection;

Private(Engine *_q) :
QObject(_q),
Expand Down Expand Up @@ -282,7 +283,7 @@ class Engine::Private : public QObject
}

handler_retry_in_valve = new QZmq::Valve(handler_retry_in_sock, this);
connect(handler_retry_in_valve, &QZmq::Valve::readyRead, this, &Private::handler_retry_in_readyRead);
rrConnection = handler_retry_in_valve->readyRead.connect(boost::bind(&Private::handler_retry_in_readyRead, this, boost::placeholders::_1));
}

if(handler_retry_in_valve)
Expand Down Expand Up @@ -832,6 +833,7 @@ private slots:
tryTakeNext();
}

private:
void handler_retry_in_readyRead(const QList<QByteArray> &message)
{
if(message.count() != 1)
Expand Down Expand Up @@ -910,7 +912,6 @@ private slots:
}
}

private:
void stats_connMax(const StatsPacket &packet)
{
if(accept->canWriteImmediately())
Expand Down
6 changes: 4 additions & 2 deletions src/cpp/proxy/wscontrolmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class WsControlManager::Private : public QObject
QMap<QPair<qint64, KeepAliveRegistration*>, KeepAliveRegistration*> sessionsByLastRefresh;
QSet<KeepAliveRegistration*> sessionRefreshBuckets[SESSION_REFRESH_BUCKETS];
int currentSessionRefreshBucket;
Connection inValveConnection;

Private(WsControlManager *_q) :
QObject(_q),
Expand Down Expand Up @@ -111,7 +112,7 @@ class WsControlManager::Private : public QObject
}

inValve = new QZmq::Valve(inSock, this);
connect(inValve, &QZmq::Valve::readyRead, this, &Private::in_readyRead);
inValveConnection = inValve->readyRead.connect(boost::bind(&Private::in_readyRead, this, boost::placeholders::_1));

inValve->open();

Expand Down Expand Up @@ -219,7 +220,7 @@ class WsControlManager::Private : public QObject
refreshTimer->stop();
}

private slots:
private:
void in_readyRead(const QList<QByteArray> &message)
{
if(message.count() != 1)
Expand Down Expand Up @@ -273,6 +274,7 @@ private slots:
}
}

private slots:
void refresh_timeout()
{
qint64 now = QDateTime::currentMSecsSinceEpoch();
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/qzmq/src/qzmqvalve.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class Valve::Private : public QObject

if(!msg.isEmpty())
{
emit q->readyRead(msg);
q->readyRead(msg);
if(!self)
return;
}
Expand Down
7 changes: 5 additions & 2 deletions src/cpp/qzmq/src/qzmqvalve.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
#define QZMQVALVE_H

#include <QObject>
#include <boost/signals2.hpp>

using SignalList = boost::signals2::signal<void(const QList<QByteArray>&)>;
using Connection = boost::signals2::scoped_connection;

namespace QZmq {

Expand All @@ -45,8 +49,7 @@ class Valve : public QObject
void open();
void close();

signals:
void readyRead(const QList<QByteArray> &message);
SignalList readyRead;

private:
class Private;
Expand Down
12 changes: 8 additions & 4 deletions src/cpp/tests/handlerenginetest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class Wrapper : public QObject
bool serverFailed;
int serverOutSeq;
QByteArray requestBody;
Connection zhttpClientInValveConnection;
Connection zhttpServerInValveConnection;
Connection zhttpServerInStreamValveConnection;
Connection proxyAcceptValveConnection;

Wrapper(QObject *parent, QDir _workDir) :
QObject(parent),
Expand All @@ -77,24 +81,24 @@ class Wrapper : public QObject

zhttpClientInSock = new QZmq::Socket(QZmq::Socket::Sub, this);
zhttpClientInValve = new QZmq::Valve(zhttpClientInSock, this);
connect(zhttpClientInValve, &QZmq::Valve::readyRead, this, &Wrapper::zhttpClientIn_readyRead);
zhttpClientInValveConnection = zhttpClientInValve->readyRead.connect(boost::bind(&Wrapper::zhttpClientIn_readyRead, this, boost::placeholders::_1));

zhttpServerInSock = new QZmq::Socket(QZmq::Socket::Pull, this);
zhttpServerInValve = new QZmq::Valve(zhttpServerInSock, this);
connect(zhttpServerInValve, &QZmq::Valve::readyRead, this, &Wrapper::zhttpServerIn_readyRead);
zhttpServerInValveConnection = zhttpServerInValve->readyRead.connect(boost::bind(&Wrapper::zhttpServerIn_readyRead, this, boost::placeholders::_1));

zhttpServerInStreamSock = new QZmq::Socket(QZmq::Socket::Router, this);
zhttpServerInStreamSock->setIdentity("test-server");
zhttpServerInStreamValve = new QZmq::Valve(zhttpServerInStreamSock, this);
connect(zhttpServerInStreamValve, &QZmq::Valve::readyRead, this, &Wrapper::zhttpServerInStream_readyRead);
zhttpServerInStreamValveConnection = zhttpServerInStreamValve->readyRead.connect(boost::bind(&Wrapper::zhttpServerInStream_readyRead, this, boost::placeholders::_1));

zhttpServerOutSock = new QZmq::Socket(QZmq::Socket::Pub, this);

// proxy sockets

proxyAcceptSock = new QZmq::Socket(QZmq::Socket::Dealer, this);
proxyAcceptValve = new QZmq::Valve(proxyAcceptSock, this);
connect(proxyAcceptValve, &QZmq::Valve::readyRead, this, &Wrapper::proxyAccept_readyRead);
proxyAcceptValveConnection = proxyAcceptValve->readyRead.connect(boost::bind(&Wrapper::proxyAccept_readyRead, this, boost::placeholders::_1));

// publish sockets

Expand Down
17 changes: 11 additions & 6 deletions src/cpp/tests/proxyenginetest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ class Wrapper : public QObject
int clientReqsFinished;
QByteArray requestBody;
QHash<QByteArray, HttpResponseData> responses;
Connection zhttpClientInValveConnection;
Connection zhttpServerInValveConnection;
Connection zhttpServerInStreamValveConnection;
Connection handlerAcceptValveConnection;
Connection handlerInspectValveConnection;

Wrapper(QObject *parent, QDir _workDir) :
QObject(parent),
Expand All @@ -106,16 +111,16 @@ class Wrapper : public QObject

zhttpClientInSock = new QZmq::Socket(QZmq::Socket::Sub, this);
zhttpClientInValve = new QZmq::Valve(zhttpClientInSock, this);
connect(zhttpClientInValve, &QZmq::Valve::readyRead, this, &Wrapper::zhttpClientIn_readyRead);
zhttpClientInValveConnection = zhttpClientInValve->readyRead.connect(boost::bind(&Wrapper::zhttpClientIn_readyRead, this, boost::placeholders::_1));

zhttpServerInSock = new QZmq::Socket(QZmq::Socket::Pull, this);
zhttpServerInValve = new QZmq::Valve(zhttpServerInSock, this);
connect(zhttpServerInValve, &QZmq::Valve::readyRead, this, &Wrapper::zhttpServerIn_readyRead);
zhttpServerInValveConnection = zhttpServerInValve->readyRead.connect(boost::bind(&Wrapper::zhttpServerIn_readyRead, this, boost::placeholders::_1));

zhttpServerInStreamSock = new QZmq::Socket(QZmq::Socket::Router, this);
zhttpServerInStreamSock->setIdentity("test-server");
zhttpServerInStreamValve = new QZmq::Valve(zhttpServerInStreamSock, this);
connect(zhttpServerInStreamValve, &QZmq::Valve::readyRead, this, &Wrapper::zhttpServerInStream_readyRead);
zhttpServerInStreamValveConnection = zhttpServerInStreamValve->readyRead.connect(boost::bind(&Wrapper::zhttpServerInStream_readyRead, this, boost::placeholders::_1));

zhttpServerOutSock = new QZmq::Socket(QZmq::Socket::Pub, this);

Expand All @@ -125,10 +130,10 @@ class Wrapper : public QObject

handlerAcceptSock = new QZmq::Socket(QZmq::Socket::Router, this);
handlerAcceptValve = new QZmq::Valve(handlerAcceptSock, this);
connect(handlerAcceptValve, &QZmq::Valve::readyRead, this, &Wrapper::handlerAccept_readyRead);
handlerAcceptValveConnection = handlerAcceptValve->readyRead.connect(boost::bind(&Wrapper::handlerAccept_readyRead, this, boost::placeholders::_1));

handlerInspectValve = new QZmq::Valve(handlerInspectSock, this);
connect(handlerInspectValve, &QZmq::Valve::readyRead, this, &Wrapper::handlerInspect_readyRead);
handlerInspectValveConnection = handlerInspectValve->readyRead.connect(boost::bind(&Wrapper::handlerInspect_readyRead, this, boost::placeholders::_1));

handlerRetryOutSock = new QZmq::Socket(QZmq::Socket::Push, this);
}
Expand Down Expand Up @@ -178,7 +183,7 @@ class Wrapper : public QObject
responses.clear();
}

private slots:
private:
void zhttpClientIn_readyRead(const QList<QByteArray> &message)
{
log_debug("client in");
Expand Down
11 changes: 7 additions & 4 deletions src/cpp/zhttpmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ class ZhttpManager::Private : public QObject
Connection cossConnection;
Connection sosConnection;
Connection rrConnection;
Connection clientConnection;
Connection serverConnection;
Connection serverStreamConnection;

Private(ZhttpManager *_q) :
QObject(_q),
Expand Down Expand Up @@ -221,7 +224,7 @@ class ZhttpManager::Private : public QObject
}

client_in_valve = new QZmq::Valve(client_in_sock, this);
connect(client_in_valve, &QZmq::Valve::readyRead, this, &Private::client_in_readyRead);
clientConnection = client_in_valve->readyRead.connect(boost::bind(&Private::client_in_readyRead, this, boost::placeholders::_1));

client_in_valve->open();

Expand Down Expand Up @@ -266,7 +269,7 @@ class ZhttpManager::Private : public QObject
}

server_in_valve = new QZmq::Valve(server_in_sock, this);
connect(server_in_valve, &QZmq::Valve::readyRead, this, &Private::server_in_readyRead);
serverConnection = server_in_valve->readyRead.connect(boost::bind(&Private::server_in_readyRead, this, boost::placeholders::_1));

server_in_valve->open();

Expand All @@ -290,7 +293,7 @@ class ZhttpManager::Private : public QObject
}

server_in_stream_valve = new QZmq::Valve(server_in_stream_sock, this);
connect(server_in_stream_valve, &QZmq::Valve::readyRead, this, &Private::server_in_stream_readyRead);
serverStreamConnection = server_in_stream_valve->readyRead.connect(boost::bind(&Private::server_in_stream_readyRead, this, boost::placeholders::_1));

server_in_stream_valve->open();

Expand Down Expand Up @@ -554,7 +557,6 @@ class ZhttpManager::Private : public QObject
}
}

public slots:
void client_in_readyRead(const QList<QByteArray> &msg)
{
if(msg.count() != 1)
Expand Down Expand Up @@ -793,6 +795,7 @@ public slots:
}
}

public slots:
void refresh_timeout()
{
QHash<QByteArray, QList<KeepAliveRegistration*> > clientSessionsBySender[2]; // index corresponds to type
Expand Down
Loading

0 comments on commit b7f2c7f

Please sign in to comment.