From b7f2c7fc16fbe419c845998b171adbf780c3efb9 Mon Sep 17 00:00:00 2001 From: Sima <64804941+sima-fastly@users.noreply.github.com> Date: Tue, 23 Jan 2024 18:46:34 -0800 Subject: [PATCH] boostification of &QZmq::Valve::readyRead (#47888) --- src/cpp/handler/handlerengine.cpp | 14 +++++++++----- src/cpp/m2adapter/m2adapterapp.cpp | 16 +++++++++------- src/cpp/proxy/engine.cpp | 5 +++-- src/cpp/proxy/wscontrolmanager.cpp | 6 ++++-- src/cpp/qzmq/src/qzmqvalve.cpp | 2 +- src/cpp/qzmq/src/qzmqvalve.h | 7 +++++-- src/cpp/tests/handlerenginetest.cpp | 12 ++++++++---- src/cpp/tests/proxyenginetest.cpp | 17 +++++++++++------ src/cpp/zhttpmanager.cpp | 11 +++++++---- src/cpp/zrpcmanager.cpp | 7 ++++--- 10 files changed, 61 insertions(+), 36 deletions(-) diff --git a/src/cpp/handler/handlerengine.cpp b/src/cpp/handler/handlerengine.cpp index e4294ec4..1571fa3e 100644 --- a/src/cpp/handler/handlerengine.cpp +++ b/src/cpp/handler/handlerengine.cpp @@ -1256,6 +1256,10 @@ class HandlerEngine::Private : public QObject Connection connectionsRefreshedConnection; Connection unsubscribedConnection; Connection reportedConnection; + Connection pullConnection; + Connection controlValveConnection; + Connection inSubValveConnection; + Connection proxyStatConnection; Private(HandlerEngine *_q) : QObject(_q), @@ -1410,7 +1414,7 @@ class HandlerEngine::Private : public QObject } inPullValve = new QZmq::Valve(inPullSock, this); - connect(inPullValve, &QZmq::Valve::readyRead, this, &Private::inPull_readyRead); + pullConnection = inPullValve->readyRead.connect(boost::bind(&Private::inPull_readyRead, this, boost::placeholders::_1)); log_info("in pull: %s", qPrintable(config.pushInSpec)); } @@ -1437,7 +1441,7 @@ class HandlerEngine::Private : public QObject } inSubValve = new QZmq::Valve(inSubSock, this); - connect(inSubValve, &QZmq::Valve::readyRead, this, &Private::inSub_readyRead); + inSubValveConnection = inSubValve->readyRead.connect(boost::bind(&Private::inSub_readyRead, this, boost::placeholders::_1)); log_info("in sub: %s", qPrintable(config.pushInSubSpecs.join(", "))); } @@ -1471,7 +1475,7 @@ class HandlerEngine::Private : public QObject } wsControlInValve = new QZmq::Valve(wsControlInSock, this); - connect(wsControlInValve, &QZmq::Valve::readyRead, this, &Private::wsControlIn_readyRead); + controlValveConnection = wsControlInValve->readyRead.connect(boost::bind(&Private::wsControlIn_readyRead, this, boost::placeholders::_1)); log_info("ws control in: %s", qPrintable(config.wsControlInSpec)); @@ -1551,7 +1555,7 @@ class HandlerEngine::Private : public QObject } proxyStatsValve = new QZmq::Valve(proxyStatsSock, this); - connect(proxyStatsValve, &QZmq::Valve::readyRead, this, &Private::proxyStats_readyRead); + proxyStatConnection = proxyStatsValve->readyRead.connect(boost::bind(&Private::proxyStats_readyRead, this, boost::placeholders::_1)); log_info("proxy stats: %s", qPrintable(config.proxyStatsSpecs.join(", "))); } @@ -2436,7 +2440,6 @@ class HandlerEngine::Private : public QObject deferreds += report; } -private slots: QVariant parseJsonOrTnetstring(const QByteArray &message, bool *ok = 0, QString *errorMessage = 0) { QVariant data; bool ok_; @@ -3070,6 +3073,7 @@ private slots: } } +private slots: void hs_subscribe(HttpSession *hs, const QString &channel) { Instruct::HoldMode mode = hs->holdMode(); diff --git a/src/cpp/m2adapter/m2adapterapp.cpp b/src/cpp/m2adapter/m2adapterapp.cpp index 478c649c..78cb883a 100644 --- a/src/cpp/m2adapter/m2adapterapp.cpp +++ b/src/cpp/m2adapter/m2adapterapp.cpp @@ -453,6 +453,9 @@ class M2AdapterApp::Private : public QObject Connection quitConnection; Connection hupConnection; map rrConnection; + Connection m2InValveConnection; + Connection zhttpInValveConnection; + Connection zwsInValveConnection; Private(M2AdapterApp *_q) : QObject(_q), @@ -654,7 +657,7 @@ class M2AdapterApp::Private : public QObject } m2_in_valve = new QZmq::Valve(m2_in_sock, this); - connect(m2_in_valve, &QZmq::Valve::readyRead, this, &Private::m2_in_readyRead); + m2InValveConnection = m2_in_valve->readyRead.connect(boost::bind(&Private::m2_in_readyRead, this, boost::placeholders::_1)); m2_out_sock = new QZmq::Socket(QZmq::Socket::Pub, this); m2_out_sock->setShutdownWaitTime(0); @@ -674,7 +677,7 @@ class M2AdapterApp::Private : public QObject sock->setShutdownWaitTime(0); sock->setHwm(1); // queue up 1 outstanding request at most sock->setWriteQueueEnabled(false); - rrConnection[sock] = sock->readyRead.connect(boost::bind(&Private::m2_control_readyRead, this)); + rrConnection[sock] = sock->readyRead.connect(boost::bind(&Private::m2_control_readyRead, this, sock)); log_info("m2_control connect %s:%s", m2_send_idents[n].data(), qPrintable(spec)); sock->connectToAddress(spec); @@ -709,7 +712,7 @@ class M2AdapterApp::Private : public QObject } zhttp_in_valve = new QZmq::Valve(zhttp_in_sock, this); - connect(zhttp_in_valve, &QZmq::Valve::readyRead, this, &Private::zhttp_in_readyRead); + zhttpInValveConnection = zhttp_in_valve->readyRead.connect(boost::bind(&Private::zhttp_in_readyRead, this, boost::placeholders::_1)); zhttp_out_sock = new QZmq::Socket(QZmq::Socket::Push, this); zhttp_out_sock->setShutdownWaitTime(0); @@ -778,7 +781,7 @@ class M2AdapterApp::Private : public QObject } zws_in_valve = new QZmq::Valve(zws_in_sock, this); - connect(zws_in_valve, &QZmq::Valve::readyRead, this, &Private::zws_in_readyRead); + zwsInValveConnection = zws_in_valve->readyRead.connect(boost::bind(&Private::zws_in_readyRead, this, boost::placeholders::_1)); zws_out_sock = new QZmq::Socket(QZmq::Socket::Push, this); zws_out_sock->setShutdownWaitTime(0); @@ -2314,7 +2317,6 @@ class M2AdapterApp::Private : public QObject } } -private slots: void m2_in_readyRead(const QList &message) { if(message.count() != 1) @@ -2734,9 +2736,8 @@ private slots: } } - void m2_control_readyRead() + void m2_control_readyRead(QZmq::Socket *sock) { - QZmq::Socket *sock = (QZmq::Socket *)sender(); int index = -1; for(int n = 0; n < controlPorts.count(); ++n) { @@ -2821,6 +2822,7 @@ private slots: handleZhttpIn(WebSocket, message); } +private slots: void status_timeout() { int now = time.elapsed(); diff --git a/src/cpp/proxy/engine.cpp b/src/cpp/proxy/engine.cpp index 72329125..8ff4ebc4 100644 --- a/src/cpp/proxy/engine.cpp +++ b/src/cpp/proxy/engine.cpp @@ -121,6 +121,7 @@ class Engine::Private : public QObject map finishedConnection; map reqSessionDestroyedConnection; Connection connMaxConnection; + Connection rrConnection; Private(Engine *_q) : QObject(_q), @@ -282,7 +283,7 @@ class Engine::Private : public QObject } handler_retry_in_valve = new QZmq::Valve(handler_retry_in_sock, this); - connect(handler_retry_in_valve, &QZmq::Valve::readyRead, this, &Private::handler_retry_in_readyRead); + rrConnection = handler_retry_in_valve->readyRead.connect(boost::bind(&Private::handler_retry_in_readyRead, this, boost::placeholders::_1)); } if(handler_retry_in_valve) @@ -832,6 +833,7 @@ private slots: tryTakeNext(); } +private: void handler_retry_in_readyRead(const QList &message) { if(message.count() != 1) @@ -910,7 +912,6 @@ private slots: } } -private: void stats_connMax(const StatsPacket &packet) { if(accept->canWriteImmediately()) diff --git a/src/cpp/proxy/wscontrolmanager.cpp b/src/cpp/proxy/wscontrolmanager.cpp index 35eb5aa1..266416cd 100644 --- a/src/cpp/proxy/wscontrolmanager.cpp +++ b/src/cpp/proxy/wscontrolmanager.cpp @@ -71,6 +71,7 @@ class WsControlManager::Private : public QObject QMap, KeepAliveRegistration*> sessionsByLastRefresh; QSet sessionRefreshBuckets[SESSION_REFRESH_BUCKETS]; int currentSessionRefreshBucket; + Connection inValveConnection; Private(WsControlManager *_q) : QObject(_q), @@ -111,7 +112,7 @@ class WsControlManager::Private : public QObject } inValve = new QZmq::Valve(inSock, this); - connect(inValve, &QZmq::Valve::readyRead, this, &Private::in_readyRead); + inValveConnection = inValve->readyRead.connect(boost::bind(&Private::in_readyRead, this, boost::placeholders::_1)); inValve->open(); @@ -219,7 +220,7 @@ class WsControlManager::Private : public QObject refreshTimer->stop(); } -private slots: +private: void in_readyRead(const QList &message) { if(message.count() != 1) @@ -273,6 +274,7 @@ private slots: } } +private slots: void refresh_timeout() { qint64 now = QDateTime::currentMSecsSinceEpoch(); diff --git a/src/cpp/qzmq/src/qzmqvalve.cpp b/src/cpp/qzmq/src/qzmqvalve.cpp index 871aa172..e38497df 100644 --- a/src/cpp/qzmq/src/qzmqvalve.cpp +++ b/src/cpp/qzmq/src/qzmqvalve.cpp @@ -82,7 +82,7 @@ class Valve::Private : public QObject if(!msg.isEmpty()) { - emit q->readyRead(msg); + q->readyRead(msg); if(!self) return; } diff --git a/src/cpp/qzmq/src/qzmqvalve.h b/src/cpp/qzmq/src/qzmqvalve.h index 987db0fa..ca63a5e6 100644 --- a/src/cpp/qzmq/src/qzmqvalve.h +++ b/src/cpp/qzmq/src/qzmqvalve.h @@ -25,6 +25,10 @@ #define QZMQVALVE_H #include +#include + +using SignalList = boost::signals2::signal&)>; +using Connection = boost::signals2::scoped_connection; namespace QZmq { @@ -45,8 +49,7 @@ class Valve : public QObject void open(); void close(); -signals: - void readyRead(const QList &message); + SignalList readyRead; private: class Private; diff --git a/src/cpp/tests/handlerenginetest.cpp b/src/cpp/tests/handlerenginetest.cpp index f312072a..3dd6591a 100644 --- a/src/cpp/tests/handlerenginetest.cpp +++ b/src/cpp/tests/handlerenginetest.cpp @@ -61,6 +61,10 @@ class Wrapper : public QObject bool serverFailed; int serverOutSeq; QByteArray requestBody; + Connection zhttpClientInValveConnection; + Connection zhttpServerInValveConnection; + Connection zhttpServerInStreamValveConnection; + Connection proxyAcceptValveConnection; Wrapper(QObject *parent, QDir _workDir) : QObject(parent), @@ -77,16 +81,16 @@ class Wrapper : public QObject zhttpClientInSock = new QZmq::Socket(QZmq::Socket::Sub, this); zhttpClientInValve = new QZmq::Valve(zhttpClientInSock, this); - connect(zhttpClientInValve, &QZmq::Valve::readyRead, this, &Wrapper::zhttpClientIn_readyRead); + zhttpClientInValveConnection = zhttpClientInValve->readyRead.connect(boost::bind(&Wrapper::zhttpClientIn_readyRead, this, boost::placeholders::_1)); zhttpServerInSock = new QZmq::Socket(QZmq::Socket::Pull, this); zhttpServerInValve = new QZmq::Valve(zhttpServerInSock, this); - connect(zhttpServerInValve, &QZmq::Valve::readyRead, this, &Wrapper::zhttpServerIn_readyRead); + zhttpServerInValveConnection = zhttpServerInValve->readyRead.connect(boost::bind(&Wrapper::zhttpServerIn_readyRead, this, boost::placeholders::_1)); zhttpServerInStreamSock = new QZmq::Socket(QZmq::Socket::Router, this); zhttpServerInStreamSock->setIdentity("test-server"); zhttpServerInStreamValve = new QZmq::Valve(zhttpServerInStreamSock, this); - connect(zhttpServerInStreamValve, &QZmq::Valve::readyRead, this, &Wrapper::zhttpServerInStream_readyRead); + zhttpServerInStreamValveConnection = zhttpServerInStreamValve->readyRead.connect(boost::bind(&Wrapper::zhttpServerInStream_readyRead, this, boost::placeholders::_1)); zhttpServerOutSock = new QZmq::Socket(QZmq::Socket::Pub, this); @@ -94,7 +98,7 @@ class Wrapper : public QObject proxyAcceptSock = new QZmq::Socket(QZmq::Socket::Dealer, this); proxyAcceptValve = new QZmq::Valve(proxyAcceptSock, this); - connect(proxyAcceptValve, &QZmq::Valve::readyRead, this, &Wrapper::proxyAccept_readyRead); + proxyAcceptValveConnection = proxyAcceptValve->readyRead.connect(boost::bind(&Wrapper::proxyAccept_readyRead, this, boost::placeholders::_1)); // publish sockets diff --git a/src/cpp/tests/proxyenginetest.cpp b/src/cpp/tests/proxyenginetest.cpp index 42ef7c52..65eee9ea 100644 --- a/src/cpp/tests/proxyenginetest.cpp +++ b/src/cpp/tests/proxyenginetest.cpp @@ -85,6 +85,11 @@ class Wrapper : public QObject int clientReqsFinished; QByteArray requestBody; QHash responses; + Connection zhttpClientInValveConnection; + Connection zhttpServerInValveConnection; + Connection zhttpServerInStreamValveConnection; + Connection handlerAcceptValveConnection; + Connection handlerInspectValveConnection; Wrapper(QObject *parent, QDir _workDir) : QObject(parent), @@ -106,16 +111,16 @@ class Wrapper : public QObject zhttpClientInSock = new QZmq::Socket(QZmq::Socket::Sub, this); zhttpClientInValve = new QZmq::Valve(zhttpClientInSock, this); - connect(zhttpClientInValve, &QZmq::Valve::readyRead, this, &Wrapper::zhttpClientIn_readyRead); + zhttpClientInValveConnection = zhttpClientInValve->readyRead.connect(boost::bind(&Wrapper::zhttpClientIn_readyRead, this, boost::placeholders::_1)); zhttpServerInSock = new QZmq::Socket(QZmq::Socket::Pull, this); zhttpServerInValve = new QZmq::Valve(zhttpServerInSock, this); - connect(zhttpServerInValve, &QZmq::Valve::readyRead, this, &Wrapper::zhttpServerIn_readyRead); + zhttpServerInValveConnection = zhttpServerInValve->readyRead.connect(boost::bind(&Wrapper::zhttpServerIn_readyRead, this, boost::placeholders::_1)); zhttpServerInStreamSock = new QZmq::Socket(QZmq::Socket::Router, this); zhttpServerInStreamSock->setIdentity("test-server"); zhttpServerInStreamValve = new QZmq::Valve(zhttpServerInStreamSock, this); - connect(zhttpServerInStreamValve, &QZmq::Valve::readyRead, this, &Wrapper::zhttpServerInStream_readyRead); + zhttpServerInStreamValveConnection = zhttpServerInStreamValve->readyRead.connect(boost::bind(&Wrapper::zhttpServerInStream_readyRead, this, boost::placeholders::_1)); zhttpServerOutSock = new QZmq::Socket(QZmq::Socket::Pub, this); @@ -125,10 +130,10 @@ class Wrapper : public QObject handlerAcceptSock = new QZmq::Socket(QZmq::Socket::Router, this); handlerAcceptValve = new QZmq::Valve(handlerAcceptSock, this); - connect(handlerAcceptValve, &QZmq::Valve::readyRead, this, &Wrapper::handlerAccept_readyRead); + handlerAcceptValveConnection = handlerAcceptValve->readyRead.connect(boost::bind(&Wrapper::handlerAccept_readyRead, this, boost::placeholders::_1)); handlerInspectValve = new QZmq::Valve(handlerInspectSock, this); - connect(handlerInspectValve, &QZmq::Valve::readyRead, this, &Wrapper::handlerInspect_readyRead); + handlerInspectValveConnection = handlerInspectValve->readyRead.connect(boost::bind(&Wrapper::handlerInspect_readyRead, this, boost::placeholders::_1)); handlerRetryOutSock = new QZmq::Socket(QZmq::Socket::Push, this); } @@ -178,7 +183,7 @@ class Wrapper : public QObject responses.clear(); } -private slots: +private: void zhttpClientIn_readyRead(const QList &message) { log_debug("client in"); diff --git a/src/cpp/zhttpmanager.cpp b/src/cpp/zhttpmanager.cpp index b22fb0c7..400ded6d 100644 --- a/src/cpp/zhttpmanager.cpp +++ b/src/cpp/zhttpmanager.cpp @@ -109,6 +109,9 @@ class ZhttpManager::Private : public QObject Connection cossConnection; Connection sosConnection; Connection rrConnection; + Connection clientConnection; + Connection serverConnection; + Connection serverStreamConnection; Private(ZhttpManager *_q) : QObject(_q), @@ -221,7 +224,7 @@ class ZhttpManager::Private : public QObject } client_in_valve = new QZmq::Valve(client_in_sock, this); - connect(client_in_valve, &QZmq::Valve::readyRead, this, &Private::client_in_readyRead); + clientConnection = client_in_valve->readyRead.connect(boost::bind(&Private::client_in_readyRead, this, boost::placeholders::_1)); client_in_valve->open(); @@ -266,7 +269,7 @@ class ZhttpManager::Private : public QObject } server_in_valve = new QZmq::Valve(server_in_sock, this); - connect(server_in_valve, &QZmq::Valve::readyRead, this, &Private::server_in_readyRead); + serverConnection = server_in_valve->readyRead.connect(boost::bind(&Private::server_in_readyRead, this, boost::placeholders::_1)); server_in_valve->open(); @@ -290,7 +293,7 @@ class ZhttpManager::Private : public QObject } server_in_stream_valve = new QZmq::Valve(server_in_stream_sock, this); - connect(server_in_stream_valve, &QZmq::Valve::readyRead, this, &Private::server_in_stream_readyRead); + serverStreamConnection = server_in_stream_valve->readyRead.connect(boost::bind(&Private::server_in_stream_readyRead, this, boost::placeholders::_1)); server_in_stream_valve->open(); @@ -554,7 +557,6 @@ class ZhttpManager::Private : public QObject } } -public slots: void client_in_readyRead(const QList &msg) { if(msg.count() != 1) @@ -793,6 +795,7 @@ public slots: } } +public slots: void refresh_timeout() { QHash > clientSessionsBySender[2]; // index corresponds to type diff --git a/src/cpp/zrpcmanager.cpp b/src/cpp/zrpcmanager.cpp index 193d70f1..41447401 100644 --- a/src/cpp/zrpcmanager.cpp +++ b/src/cpp/zrpcmanager.cpp @@ -67,6 +67,8 @@ class ZrpcManager::Private : public QObject QZmq::Valve *serverValve; QHash clientReqsById; QList pending; + Connection clientValveConnection; + Connection serverValveConnection; Private(ZrpcManager *_q) : QObject(_q), @@ -104,7 +106,7 @@ class ZrpcManager::Private : public QObject } clientValve = new QZmq::Valve(clientSock, this); - connect(clientValve, &QZmq::Valve::readyRead, this, &Private::client_readyRead); + clientValveConnection = clientValve->readyRead.connect(boost::bind(&Private::client_readyRead, this, boost::placeholders::_1)); clientValve->open(); @@ -129,7 +131,7 @@ class ZrpcManager::Private : public QObject } serverValve = new QZmq::Valve(serverSock, this); - connect(serverValve, &QZmq::Valve::readyRead, this, &Private::server_readyRead); + serverValveConnection = serverValve->readyRead.connect(boost::bind(&Private::server_readyRead, this, boost::placeholders::_1)); serverValve->open(); @@ -166,7 +168,6 @@ class ZrpcManager::Private : public QObject serverSock->write(message); } -private slots: void client_readyRead(const QList &message) { if(message.count() != 2)