Skip to content

Commit

Permalink
proxy/handler: replace QPointer with shared_ptr/weak_ptr
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges committed Feb 14, 2025
1 parent 76a1f77 commit 74f65fe
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 128 deletions.
100 changes: 53 additions & 47 deletions src/handler/handlerengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

#include <assert.h>
#include <algorithm>
#include <QPointer>
#include <QTimer>
#include <QUrlQuery>
#include <QJsonDocument>
Expand Down Expand Up @@ -394,8 +393,8 @@ class Subscription;
class CommonState
{
public:
QHash<ZhttpRequest::Rid, HttpSession*> httpSessions;
QHash<QString, WsSession*> wsSessions;
QHash<ZhttpRequest::Rid, std::shared_ptr<HttpSession>> httpSessions;
QHash<QString, std::shared_ptr<WsSession>> wsSessions;
QHash<QString, QSet<HttpSession*> > responseSessionsByChannel;
QHash<QString, QSet<HttpSession*> > streamSessionsByChannel;
QHash<QString, QSet<WsSession*> > wsSessionsByChannel;
Expand Down Expand Up @@ -437,7 +436,7 @@ class AcceptWorker : public Deferred
bool responseSent;
QString sid;
LastIds lastIds;
QList<HttpSession*> sessions;
QList<std::shared_ptr<HttpSession>> sessions;
int connectionSubscriptionMax;
QSet<QByteArray> needRemoveFromStats;
map<Deferred*, Connection> finishedConnection;
Expand Down Expand Up @@ -826,12 +825,12 @@ class AcceptWorker : public Deferred
afterSessionCalls();
}

QList<HttpSession*> takeSessions()
QList<std::shared_ptr<HttpSession>> takeSessions()
{
QList<HttpSession*> out = sessions;
QList<std::shared_ptr<HttpSession>> out = sessions;
sessions.clear();

foreach(HttpSession *hs, out)
foreach(const std::shared_ptr<HttpSession> &hs, out)
hs->setParent(0);

return out;
Expand Down Expand Up @@ -1129,7 +1128,7 @@ class AcceptWorker : public Deferred
QByteArray cid = rid.first + ':' + rid.second;
needRemoveFromStats.remove(cid);

sessions += new HttpSession(httpReq, adata, instruct, zhttpOut, stats, updateLimiter, filterLimiter, &cs->publishLastIds, httpSessionUpdateManager, connectionSubscriptionMax, this);
sessions += std::make_shared<HttpSession>(httpReq, adata, instruct, zhttpOut, stats, updateLimiter, filterLimiter, &cs->publishLastIds, httpSessionUpdateManager, connectionSubscriptionMax, this);
}

// engine should directly connect to this and register the holds
Expand Down Expand Up @@ -1213,12 +1212,12 @@ class HandlerEngine::Private : public QObject
class PublishAction : public RateLimiter::Action
{
public:
HandlerEngine::Private *ep;
QPointer<QObject> target;
std::weak_ptr<HandlerEngine::Private> ep;
std::weak_ptr<QObject> target;
PublishItem item;
QList<QByteArray> exposeHeaders;

PublishAction(HandlerEngine::Private *_ep, QObject *_target, const PublishItem &_item, const QList<QByteArray> &_exposeHeaders = QList<QByteArray>()) :
PublishAction(const std::weak_ptr<HandlerEngine::Private> _ep, const std::weak_ptr<QObject> _target, const PublishItem &_item, const QList<QByteArray> &_exposeHeaders = QList<QByteArray>()) :
ep(_ep),
target(_target),
item(_item),
Expand All @@ -1228,10 +1227,15 @@ class HandlerEngine::Private : public QObject

virtual bool execute()
{
if(!target)
auto epl = ep.lock();
if(!epl)
return false;

ep->publishSend(target, item, exposeHeaders);
auto targetl = target.lock();
if(!targetl)
return false;

epl->publishSend(targetl, item, exposeHeaders);
return true;
}
};
Expand Down Expand Up @@ -1326,8 +1330,8 @@ class HandlerEngine::Private : public QObject
qDeleteAll(inspectWorkers);
qDeleteAll(acceptWorkers);
qDeleteAll(deferreds);
qDeleteAll(cs.wsSessions);
qDeleteAll(cs.httpSessions);
cs.wsSessions.clear();
cs.httpSessions.clear();
qDeleteAll(cs.subs);
}

Expand Down Expand Up @@ -1747,9 +1751,8 @@ class HandlerEngine::Private : public QObject

log_debug("removed ws session: %s", qPrintable(s->cid));

cs.wsSessions.remove(s->cid);
wsSessionConnectionMap.erase(s);
delete s;
cs.wsSessions.remove(s->cid);
}

void httpControlRespond(SimpleHttpRequest *req, int code, const QByteArray &reason, const QString &body, const QByteArray &contentType = QByteArray(), const HttpHeaders &headers = HttpHeaders(), int items = -1)
Expand All @@ -1770,19 +1773,19 @@ class HandlerEngine::Private : public QObject
log_debug("%s", qPrintable(msg));
}

void publishSend(QObject *target, const PublishItem &item, const QList<QByteArray> &exposeHeaders)
void publishSend(const std::shared_ptr<QObject> &target, const PublishItem &item, const QList<QByteArray> &exposeHeaders)
{
const PublishFormat &f = item.format;

if(f.type == PublishFormat::HttpResponse || f.type == PublishFormat::HttpStream)
{
HttpSession *hs = dynamic_cast<HttpSession*>(target);
HttpSession *hs = dynamic_cast<HttpSession*>(target.get());

hs->publish(item, exposeHeaders);
}
else if(f.type == PublishFormat::WebSocketMessage)
{
WsSession *s = dynamic_cast<WsSession*>(target);
WsSession *s = dynamic_cast<WsSession*>(target.get());

s->publish(item);
}
Expand Down Expand Up @@ -1810,7 +1813,7 @@ class HandlerEngine::Private : public QObject
}
else
{
foreach(HttpSession *hs, cs.httpSessions)
foreach(const std::shared_ptr<HttpSession> &hs, cs.httpSessions)
hs->update();
}
}
Expand Down Expand Up @@ -2169,11 +2172,13 @@ class HandlerEngine::Private : public QObject
else
blocks = blocksForData(f.body.size());

foreach(HttpSession *hs, responseSessions)
foreach(HttpSession *hsp, responseSessions)
{
std::shared_ptr<HttpSession> &hs = cs.httpSessions[hsp->rid()];

QString statsRoute = hs->statsRoute();

if(!publishLimiter->addAction(statsRoute, new PublishAction(this, hs, i, exposeHeaders), blocks != -1 ? blocks : 1))
if(!publishLimiter->addAction(statsRoute, new PublishAction(q->d, hs, i, exposeHeaders), blocks != -1 ? blocks : 1))
{
if(!statsRoute.isEmpty())
log_warning("exceeded publish hwm (%d) for route %s, dropping message", config.messageHwm, qPrintable(statsRoute));
Expand Down Expand Up @@ -2203,11 +2208,13 @@ class HandlerEngine::Private : public QObject
else
blocks = blocksForData(f.body.size());

foreach(HttpSession *hs, streamSessions)
foreach(HttpSession *hsp, streamSessions)
{
std::shared_ptr<HttpSession> &hs = cs.httpSessions[hsp->rid()];

QString statsRoute = hs->statsRoute();

if(!publishLimiter->addAction(statsRoute, new PublishAction(this, hs, i), blocks != -1 ? blocks : 1))
if(!publishLimiter->addAction(statsRoute, new PublishAction(q->d, hs, i), blocks != -1 ? blocks : 1))
{
if(!statsRoute.isEmpty())
log_warning("exceeded publish hwm (%d) for route %s, dropping message", config.messageHwm, qPrintable(statsRoute));
Expand Down Expand Up @@ -2237,11 +2244,13 @@ class HandlerEngine::Private : public QObject
else
blocks = blocksForData(f.body.size());

foreach(WsSession *s, wsSessions)
foreach(WsSession *sp, wsSessions)
{
std::shared_ptr<WsSession> &s = cs.wsSessions[sp->cid];

QString statsRoute = s->statsRoute;

if(!publishLimiter->addAction(statsRoute, new PublishAction(this, s, i), blocks != -1 ? blocks : 1))
if(!publishLimiter->addAction(statsRoute, new PublishAction(q->d, s, i), blocks != -1 ? blocks : 1))
{
if(!statsRoute.isEmpty())
log_warning("exceeded publish hwm (%d) for route %s, dropping message", config.messageHwm, qPrintable(statsRoute));
Expand Down Expand Up @@ -2346,8 +2355,8 @@ class HandlerEngine::Private : public QObject

void acceptWorker_sessionsReady(AcceptWorker *w)
{
QList<HttpSession*> sessions = w->takeSessions();
foreach(HttpSession *hs, sessions)
QList<std::shared_ptr<HttpSession>> sessions = w->takeSessions();
foreach(const std::shared_ptr<HttpSession> &hs, sessions)
{
// NOTE: for performance reasons we do not call hs->setParent and
// instead leave the object unparented
Expand Down Expand Up @@ -2379,7 +2388,7 @@ class HandlerEngine::Private : public QObject
assert(at != -1);
ZhttpRequest::Rid rid(id.mid(0, at), id.mid(at + 1));

HttpSession *hs = cs.httpSessions.value(rid);
HttpSession *hs = cs.httpSessions.value(rid).get();
if(hs && !hs->sid().isEmpty())
sidLastIds[hs->sid()] = LastIds();
}
Expand Down Expand Up @@ -2606,14 +2615,14 @@ class HandlerEngine::Private : public QObject

if(item.type == WsControlPacket::Item::Here)
{
WsSession *s = cs.wsSessions.value(item.cid);
std::shared_ptr<WsSession> s = cs.wsSessions.value(item.cid);
if(!s)
{
s = new WsSession(this);
wsSessionConnectionMap[s] = {
s->send.connect(boost::bind(&Private::wssession_send, this, boost::placeholders::_1, s)),
s->expired.connect(boost::bind(&Private::wssession_expired, this, s)),
s->error.connect(boost::bind(&Private::wssession_error, this, s))
s = std::make_shared<WsSession>(this);
wsSessionConnectionMap[s.get()] = {
s->send.connect(boost::bind(&Private::wssession_send, this, boost::placeholders::_1, s.get())),
s->expired.connect(boost::bind(&Private::wssession_expired, this, s.get())),
s->error.connect(boost::bind(&Private::wssession_error, this, s.get()))
};
s->peer = packet.from;
s->cid = QString::fromUtf8(item.cid);
Expand Down Expand Up @@ -2641,7 +2650,7 @@ class HandlerEngine::Private : public QObject
}

// any other type must be for a known cid
WsSession *s = cs.wsSessions.value(QString::fromUtf8(item.cid));
WsSession *s = cs.wsSessions.value(QString::fromUtf8(item.cid)).get();
if(!s)
{
// send cancel, causing the proxy to close the connection. client
Expand Down Expand Up @@ -3143,17 +3152,17 @@ private slots:
removeSessionChannel(hs, channel);
}

void hs_finished(HttpSession *hs)
void hs_finished(HttpSession *hsp)
{
QByteArray addr = hs->retryToAddress();
RetryRequestPacket rp = hs->retryPacket();
QByteArray addr = hsp->retryToAddress();
RetryRequestPacket rp = hsp->retryPacket();

cs.httpSessions.remove(hs->rid());
std::shared_ptr<HttpSession> hs = cs.httpSessions.take(hsp->rid());

hs->subscribeCallback().remove(this);
hs->unsubscribeCallback().remove(this);
hs->finishedCallback().remove(this);
DeferCall::deleteLater(hs);
DeferCall::deleteLater(new std::shared_ptr<HttpSession>(hs));

if(!rp.requests.isEmpty())
writeRetryPacket(addr, rp);
Expand Down Expand Up @@ -3186,13 +3195,10 @@ private slots:
HandlerEngine::HandlerEngine(QObject *parent) :
QObject(parent)
{
d = new Private(this);
d = std::make_shared<Private>(this);
}

HandlerEngine::~HandlerEngine()
{
delete d;
}
HandlerEngine::~HandlerEngine() = default;

bool HandlerEngine::start(const Configuration &config)
{
Expand Down
2 changes: 1 addition & 1 deletion src/handler/handlerengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class HandlerEngine : public QObject

private:
class Private;
Private *d;
std::shared_ptr<Private> d;
};

#endif
22 changes: 9 additions & 13 deletions src/handler/httpsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "httpsession.h"

#include <assert.h>
#include <QPointer>
#include <QJsonDocument>
#include <QJsonObject>
#include <QJsonArray>
Expand Down Expand Up @@ -280,7 +279,7 @@ class HttpSession::Private : public QObject
assert(state == NotStarted);

// set up implicit channels
QPointer<QObject> self = this;
std::weak_ptr<Private> self = q->d;
foreach(const QString &name, adata.implicitChannels)
{
if(!channels.contains(name))
Expand All @@ -292,7 +291,7 @@ class HttpSession::Private : public QObject

subscribeCallback.call({q, name});

assert(self); // deleting here would leak subscriptions/connections
assert(!self.expired()); // deleting here would leak subscriptions/connections
}
}

Expand Down Expand Up @@ -655,20 +654,20 @@ class HttpSession::Private : public QObject
}
}

QPointer<QObject> self = this;
std::weak_ptr<Private> self = q->d;

foreach(const QString &channel, channelsRemoved)
{
unsubscribeCallback.call({q, channel});

assert(self); // deleting here would leak subscriptions/connections
assert(!self.expired()); // deleting here would leak subscriptions/connections
}

foreach(const QString &channel, channelsAdded)
{
subscribeCallback.call({q, channel});

assert(self); // deleting here would leak subscriptions/connections
assert(!self.expired()); // deleting here would leak subscriptions/connections
}

if(instruct.holdMode == Instruct::ResponseHold)
Expand Down Expand Up @@ -1036,7 +1035,7 @@ class HttpSession::Private : public QObject
ZhttpRequest::Rid rid = req->rid();
QByteArray cid = rid.first + ':' + rid.second;

QPointer<QObject> self = this;
std::weak_ptr<Private> self = q->d;

QHashIterator<QString, Instruct::Channel> it(channels);
while(it.hasNext())
Expand All @@ -1046,7 +1045,7 @@ class HttpSession::Private : public QObject

unsubscribeCallback.call({q, channel});

assert(self); // deleting here would leak subscriptions/connections
assert(!self.expired()); // deleting here would leak subscriptions/connections
}

if(retry)
Expand Down Expand Up @@ -1691,13 +1690,10 @@ class HttpSession::Private : public QObject
HttpSession::HttpSession(ZhttpRequest *req, const HttpSession::AcceptData &adata, const Instruct &instruct, ZhttpManager *zhttpOut, StatsManager *stats, RateLimiter *updateLimiter, const std::shared_ptr<RateLimiter> &filterLimiter, PublishLastIds *publishLastIds, HttpSessionUpdateManager *updateManager, int connectionSubscriptionMax, QObject *parent) :
QObject(parent)
{
d = new Private(this, req, adata, instruct, zhttpOut, stats, updateLimiter, filterLimiter, publishLastIds, updateManager, connectionSubscriptionMax);
d = std::make_shared<Private>(this, req, adata, instruct, zhttpOut, stats, updateLimiter, filterLimiter, publishLastIds, updateManager, connectionSubscriptionMax);
}

HttpSession::~HttpSession()
{
delete d;
}
HttpSession::~HttpSession() = default;

Instruct::HoldMode HttpSession::holdMode() const
{
Expand Down
4 changes: 2 additions & 2 deletions src/handler/httpsession.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) 2016-2023 Fanout, Inc.
* Copyright (C) 2024 Fastly, Inc.
* Copyright (C) 2024-2025 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -123,7 +123,7 @@ class HttpSession : public QObject
private:
class Private;
friend class Private;
Private *d;
std::shared_ptr<Private> d;
};

#endif
Loading

0 comments on commit 74f65fe

Please sign in to comment.