From 7f938e3d50f304022a264f0b81568a1fc73af4fb Mon Sep 17 00:00:00 2001 From: Justin Karneges Date: Wed, 11 Dec 2024 15:57:06 -0800 Subject: [PATCH] wip --- src/handler/filter.cpp | 155 ++++++++++++++++++++++++++++++++-- src/handler/filter.h | 49 ++++++++++- src/handler/handlerengine.cpp | 84 ++---------------- src/handler/httpsession.cpp | 8 +- src/handler/wssession.cpp | 131 +++++++++++++++++++++++++++- src/handler/wssession.h | 13 ++- 6 files changed, 346 insertions(+), 94 deletions(-) diff --git a/src/handler/filter.cpp b/src/handler/filter.cpp index f2ea7614..538f7e76 100644 --- a/src/handler/filter.cpp +++ b/src/handler/filter.cpp @@ -1,5 +1,6 @@ /* * Copyright (C) 2016-2019 Fanout, Inc. + * Copyright (C) 2024 Fastly, Inc. * * This file is part of Pushpin. * @@ -28,7 +29,7 @@ namespace { -class SkipSelfFilter : public Filter +class SkipSelfFilter : public Filter, public Filter::MessageFilter { public: SkipSelfFilter() : @@ -36,6 +37,16 @@ class SkipSelfFilter : public Filter { } + virtual void start(const Filter::Context &context, const QByteArray &content) + { + setContext(context); + + Result r; + r.sendAction = sendAction(); + r.content = content; + finished(r); + } + virtual SendAction sendAction() const { QString user = context().subscriptionMeta.value("user"); @@ -47,7 +58,7 @@ class SkipSelfFilter : public Filter } }; -class SkipUsersFilter : public Filter +class SkipUsersFilter : public Filter, public Filter::MessageFilter { public: SkipUsersFilter() : @@ -55,6 +66,16 @@ class SkipUsersFilter : public Filter { } + virtual void start(const Filter::Context &context, const QByteArray &content) + { + setContext(context); + + Result r; + r.sendAction = sendAction(); + r.content = content; + finished(r); + } + virtual SendAction sendAction() const { QString user = context().subscriptionMeta.value("user"); @@ -74,7 +95,7 @@ class SkipUsersFilter : public Filter } }; -class RequireSubFilter : public Filter +class RequireSubFilter : public Filter, public Filter::MessageFilter { public: RequireSubFilter() : @@ -82,6 +103,16 @@ class RequireSubFilter : public Filter { } + virtual void start(const Filter::Context &context, const QByteArray &content) + { + setContext(context); + + Result r; + r.sendAction = sendAction(); + r.content = content; + finished(r); + } + virtual SendAction sendAction() const { QString require_sub = context().publishMeta.value("require_sub"); @@ -92,7 +123,7 @@ class RequireSubFilter : public Filter } }; -class BuildIdFilter : public Filter +class BuildIdFilter : public Filter, public Filter::MessageFilter { public: IdFormat::ContentRenderer *idContentRenderer; @@ -162,6 +193,16 @@ class BuildIdFilter : public Filter return true; } + virtual void start(const Filter::Context &context, const QByteArray &content) + { + setContext(context); + + Result r; + r.sendAction = sendAction(); + r.content = process(content); + finished(r); + } + virtual QByteArray update(const QByteArray &data) { if(!ensureInit()) @@ -223,7 +264,7 @@ class VarSubstFormatHandler : public Format::Handler } }; -class VarSubstFilter : public Filter +class VarSubstFilter : public Filter, public Filter::MessageFilter { public: VarSubstFilter() : @@ -231,6 +272,16 @@ class VarSubstFilter : public Filter { } + virtual void start(const Filter::Context &context, const QByteArray &content) + { + setContext(context); + + Result r; + r.sendAction = sendAction(); + r.content = process(content); + finished(r); + } + virtual QByteArray update(const QByteArray &data) { VarSubstFormatHandler handler; @@ -255,6 +306,10 @@ class VarSubstFilter : public Filter } +Filter::MessageFilter::~MessageFilter() +{ +} + Filter::Filter(const QString &name) : name_(name) { @@ -308,6 +363,22 @@ Filter *Filter::create(const QString &name) return 0; } +Filter::MessageFilter *Filter::createMessageFilter(const QString &name) +{ + if(name == "skip-self") + return new SkipSelfFilter; + else if(name == "skip-users") + return new SkipUsersFilter; + else if(name == "require-sub") + return new RequireSubFilter; + else if(name == "build-id") + return new BuildIdFilter; + else if(name == "var-subst") + return new VarSubstFilter; + else + return 0; +} + QStringList Filter::names() { return (QStringList() @@ -327,9 +398,81 @@ Filter::Targets Filter::targets(const QString &name) else if(name == "require-sub") return Filter::MessageDelivery; else if(name == "build-id") - return Filter::Targets(Filter::MessageContent | Filter::ProxyContent); + return Filter::Targets(Filter::MessageContent | Filter::ResponseContent); else if(name == "var-subst") return Filter::MessageContent; else return Filter::Targets(0); } + +Filter::MessageFilterStack::MessageFilterStack(const QStringList &filterNames) +{ + foreach(const QString &name, filterNames) + { + MessageFilter *f = createMessageFilter(name); + if(f) + filters_ += f; + } +} + +Filter::MessageFilterStack::~MessageFilterStack() +{ + qDeleteAll(filters_); +} + +void Filter::MessageFilterStack::start(const Filter::Context &context, const QByteArray &content) +{ + context_ = context; + content_ = content; + lastSendAction_ = Send; + + nextFilter(); +} + +void Filter::MessageFilterStack::nextFilter() +{ + if(filters_.isEmpty()) + { + Result r; + r.sendAction = lastSendAction_; + r.content = content_; + finished(r); + return; + } + + finishedConnection_ = filters_[0]->finished.connect(boost::bind(&MessageFilterStack::filterFinished, this, boost::placeholders::_1)), + + // may call filterFinished immediately + filters_[0]->start(context_, content_); +} + +void Filter::MessageFilterStack::filterFinished(const Result &result) +{ + if(!result.errorMessage.isNull()) + { + qDeleteAll(filters_); + filters_.clear(); + + Result r; + r.errorMessage = result.errorMessage; + finished(r); + return; + } + + lastSendAction_ = result.sendAction; + content_ = result.content; + + switch(lastSendAction_) + { + case Send: + filters_.removeFirst(); + break; + case Drop: + qDeleteAll(filters_); + filters_.clear(); + break; + } + + // will emit finished if there are no remaining filters + nextFilter(); +} diff --git a/src/handler/filter.h b/src/handler/filter.h index 48b87285..479d74c4 100644 --- a/src/handler/filter.h +++ b/src/handler/filter.h @@ -1,5 +1,6 @@ /* * Copyright (C) 2016-2019 Fanout, Inc. + * Copyright (C) 2024 Fastly, Inc. * * This file is part of Pushpin. * @@ -26,6 +27,8 @@ #include #include #include +#include +#include class Filter { @@ -40,7 +43,7 @@ class Filter { MessageDelivery = 0x01, MessageContent = 0x02, - ProxyContent = 0x04, + ResponseContent = 0x04, }; class Context @@ -51,10 +54,49 @@ class Filter QHash publishMeta; }; - Filter(const QString &name = QString()); + class MessageFilter + { + public: + class Result + { + public: + SendAction sendAction; + QByteArray content; + QString errorMessage; // non-null on error + }; + + virtual ~MessageFilter(); + + // may emit finished immediately + virtual void start(const Filter::Context &context, const QByteArray &content = QByteArray()) = 0; + + boost::signals2::signal finished; + }; + + class MessageFilterStack : public MessageFilter + { + public: + MessageFilterStack(const QStringList &filterNames); + ~MessageFilterStack(); + + // reimplemented + virtual void start(const Filter::Context &context, const QByteArray &content = QByteArray()); + + private: + QList filters_; + Filter::Context context_; + QByteArray content_; + SendAction lastSendAction_; + boost::signals2::scoped_connection finishedConnection_; + + void nextFilter(); + void filterFinished(const Result &result); + }; + virtual ~Filter(); const QString & name() const { return name_; } + const Context & context() const { return context_; } QString errorMessage() const { return errorMessage_; } @@ -69,10 +111,13 @@ class Filter QByteArray process(const QByteArray &data); static Filter *create(const QString &name); + static MessageFilter *createMessageFilter(const QString &name); static QStringList names(); static Targets targets(const QString &name); protected: + Filter(const QString &name = QString()); + void setError(const QString &s) { errorMessage_ = s; } private: diff --git a/src/handler/handlerengine.cpp b/src/handler/handlerengine.cpp index 650eb9ec..dd44292e 100644 --- a/src/handler/handlerengine.cpp +++ b/src/handler/handlerengine.cpp @@ -930,13 +930,13 @@ class AcceptWorker : public Deferred if(!responseSent) { - // apply ProxyContent filters of all channels + // apply ResponseContent filters of all channels QStringList allFilters; foreach(const Instruct::Channel &c, instruct.channels) { foreach(const QString &filter, c.filters) { - if((Filter::targets(filter) & Filter::ProxyContent) && !allFilters.contains(filter)) + if((Filter::targets(filter) & Filter::ResponseContent) && !allFilters.contains(filter)) allFilters += filter; } } @@ -1793,73 +1793,7 @@ class HandlerEngine::Private : public QObject { WsSession *s = qobject_cast(target); - if(f.haveContentFilters) - { - // ensure content filters match - QStringList contentFilters; - foreach(const QString &f, s->channelFilters[item.channel]) - { - if(Filter::targets(f) & Filter::MessageContent) - contentFilters += f; - } - if(contentFilters != f.contentFilters) - { - QString errorMessage = QString("content filter mismatch: subscription=%1 message=%2").arg(contentFilters.join(","), f.contentFilters.join(",")); - log_debug("%s", qPrintable(errorMessage)); - return; - } - } - - Filter::Context fc; - fc.subscriptionMeta = s->meta; - fc.publishMeta = item.meta; - - FilterStack filters(fc, s->channelFilters[item.channel]); - - if(filters.sendAction() == Filter::Drop) - return; - - // TODO: hint support for websockets? - if(f.action != PublishFormat::Send && f.action != PublishFormat::Close && f.action != PublishFormat::Refresh) - return; - - WsControlPacket::Item i; - i.cid = s->cid.toUtf8(); - - if(f.action == PublishFormat::Send) - { - QByteArray body = filters.process(f.body); - if(body.isNull()) - { - log_debug("filter error: %s", qPrintable(filters.errorMessage())); - return; - } - - i.type = WsControlPacket::Item::Send; - - switch(f.messageType) - { - case PublishFormat::Text: i.contentType = "text"; break; - case PublishFormat::Binary: i.contentType = "binary"; break; - case PublishFormat::Ping: i.contentType = "ping"; break; - case PublishFormat::Pong: i.contentType = "pong"; break; - default: return; // unrecognized type, skip - } - - i.message = body; - } - else if(f.action == PublishFormat::Close) - { - i.type = WsControlPacket::Item::Close; - i.code = f.code; - i.reason = f.reason; - } - else if(f.action == PublishFormat::Refresh) - { - i.type = WsControlPacket::Item::Refresh; - } - - writeWsControlItems(s->peer, QList() << i); + s->publish(item); } } @@ -2686,7 +2620,7 @@ class HandlerEngine::Private : public QObject { s = new WsSession(this); wsSessionConnectionMap[s] = { - s->send.connect(boost::bind(&Private::wssession_send, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, s)), + s->send.connect(boost::bind(&Private::wssession_send, this, boost::placeholders::_1, s)), s->expired.connect(boost::bind(&Private::wssession_expired, this, s)), s->error.connect(boost::bind(&Private::wssession_error, this, s)) }; @@ -3224,16 +3158,8 @@ private slots: writeRetryPacket(addr, rp); } - void wssession_send(int reqId, const QByteArray &type, const QByteArray &message, WsSession *s) + void wssession_send(const WsControlPacket::Item &i, WsSession *s) { - WsControlPacket::Item i; - i.cid = s->cid.toUtf8(); - i.requestId = QByteArray::number(reqId); - i.type = WsControlPacket::Item::Send; - i.contentType = type; - i.message = message; - i.queue = true; - writeWsControlItems(s->peer, QList() << i); } diff --git a/src/handler/httpsession.cpp b/src/handler/httpsession.cpp index a1e12cd3..d97c3396 100644 --- a/src/handler/httpsession.cpp +++ b/src/handler/httpsession.cpp @@ -298,13 +298,13 @@ class HttpSession::Private : public QObject if(!instruct.response.body.isEmpty()) { - // apply ProxyContent filters of all channels + // apply ResponseContent filters of all channels QStringList allFilters; foreach(const Instruct::Channel &c, instruct.channels) { foreach(const QString &filter, c.filters) { - if((Filter::targets(filter) & Filter::ProxyContent) && !allFilters.contains(filter)) + if((Filter::targets(filter) & Filter::ResponseContent) && !allFilters.contains(filter)) allFilters += filter; } } @@ -1558,13 +1558,13 @@ private slots: // won't be used for anything else instruct = i; - // apply ProxyContent filters of all channels + // apply ResponseContent filters of all channels QStringList allFilters; foreach(const Instruct::Channel &c, instruct.channels) { foreach(const QString &filter, c.filters) { - if((Filter::targets(filter) & Filter::ProxyContent) && !allFilters.contains(filter)) + if((Filter::targets(filter) & Filter::ResponseContent) && !allFilters.contains(filter)) allFilters += filter; } } diff --git a/src/handler/wssession.cpp b/src/handler/wssession.cpp index fc646596..f0b15595 100644 --- a/src/handler/wssession.cpp +++ b/src/handler/wssession.cpp @@ -26,13 +26,18 @@ #include #include #include "log.h" +#include "filter.h" +#include "filterstack.h" +#include "publishitem.h" +#include "publishformat.h" #define WSCONTROL_REQUEST_TIMEOUT 8000 WsSession::WsSession(QObject *parent) : QObject(parent), nextReqId(0), - logLevel(LOG_LEVEL_DEBUG) + logLevel(LOG_LEVEL_DEBUG), + filters(0) { expireTimer = new QTimer(this); expireTimer->setSingleShot(true); @@ -49,6 +54,8 @@ WsSession::WsSession(QObject *parent) : WsSession::~WsSession() { + delete filters; + expireTimer->disconnect(this); expireTimer->setParent(0); expireTimer->deleteLater(); @@ -94,6 +101,54 @@ void WsSession::ack(int reqId) } } +void WsSession::publish(const PublishItem &item) +{ + pendingItems += item; + + if(!filters) + processNextItem(); +} + +void WsSession::processNextItem() +{ + if(pendingItems.isEmpty()) + return; + + PublishItem item = pendingItems[0]; + + const PublishFormat &f = item.format; + + if(f.haveContentFilters) + { + // ensure content filters match + QStringList contentFilters; + foreach(const QString &f, channelFilters[item.channel]) + { + if(Filter::targets(f) & Filter::MessageContent) + contentFilters += f; + } + if(contentFilters != f.contentFilters) + { + QString errorMessage = QString("content filter mismatch: subscription=%1 message=%2").arg(contentFilters.join(","), f.contentFilters.join(",")); + log_debug("%s", qPrintable(errorMessage)); + + pendingItems.removeFirst(); + processNextItem(); + return; + } + } + + filters = new Filter::MessageFilterStack(channelFilters[item.channel]); + filtersFinishedConnection = filters->finished.connect(boost::bind(&WsSession::filtersFinished, this, boost::placeholders::_1)); + + Filter::Context fc; + fc.subscriptionMeta = meta; + fc.publishMeta = item.meta; + + // may call filtersFinished immediately + filters->start(fc, f.body); +} + void WsSession::setupRequestTimer() { if(!pendingRequests.isEmpty()) @@ -120,6 +175,70 @@ void WsSession::setupRequestTimer() } } +void WsSession::filtersFinished(const Filter::MessageFilter::Result &result) +{ + PublishItem item = pendingItems.takeFirst(); + + filtersFinishedConnection.disconnect(); + + delete filters; + filters = 0; + + if(!result.errorMessage.isNull()) + { + log_debug("filter error: %s", qPrintable(result.errorMessage)); + processNextItem(); + return; + } + + afterFilters(item, result.sendAction, result.content); + + processNextItem(); +} + +void WsSession::afterFilters(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content) +{ + if(sendAction == Filter::Drop) + return; + + const PublishFormat &f = item.format; + + // TODO: hint support for websockets? + if(f.action != PublishFormat::Send && f.action != PublishFormat::Close && f.action != PublishFormat::Refresh) + return; + + WsControlPacket::Item i; + i.cid = cid.toUtf8(); + + if(f.action == PublishFormat::Send) + { + i.type = WsControlPacket::Item::Send; + + switch(f.messageType) + { + case PublishFormat::Text: i.contentType = "text"; break; + case PublishFormat::Binary: i.contentType = "binary"; break; + case PublishFormat::Ping: i.contentType = "ping"; break; + case PublishFormat::Pong: i.contentType = "pong"; break; + default: return; // unrecognized type, skip + } + + i.message = content; + } + else if(f.action == PublishFormat::Close) + { + i.type = WsControlPacket::Item::Close; + i.code = f.code; + i.reason = f.reason; + } + else if(f.action == PublishFormat::Refresh) + { + i.type = WsControlPacket::Item::Refresh; + } + + send(i); +} + void WsSession::expireTimer_timeout() { log_debug("timing out ws session: %s", qPrintable(cid)); @@ -137,7 +256,15 @@ void WsSession::delayedTimer_timeout() pendingRequests[reqId] = QDateTime::currentMSecsSinceEpoch() + WSCONTROL_REQUEST_TIMEOUT; setupRequestTimer(); - send(reqId, delayedType, message); + WsControlPacket::Item i; + i.cid = cid.toUtf8(); + i.requestId = QByteArray::number(reqId); + i.type = WsControlPacket::Item::Send; + i.contentType = delayedType; + i.message = message; + i.queue = true; + + send(i); } void WsSession::requestTimer_timeout() diff --git a/src/handler/wssession.h b/src/handler/wssession.h index efd684fd..971ed05f 100644 --- a/src/handler/wssession.h +++ b/src/handler/wssession.h @@ -28,6 +28,8 @@ #include #include #include "packet/httprequestdata.h" +#include "packet/wscontrolpacket.h" +#include "filter.h" #include using Signal = boost::signals2::signal; @@ -35,6 +37,8 @@ using Connection = boost::signals2::scoped_connection; class QTimer; +class PublishItem; + class WsSession : public QObject { Q_OBJECT @@ -62,6 +66,9 @@ class WsSession : public QObject QTimer *expireTimer; QTimer *delayedTimer; QTimer *requestTimer; + QList pendingItems; + Filter::MessageFilter *filters; + Connection filtersFinishedConnection; WsSession(QObject *parent = 0); ~WsSession(); @@ -70,13 +77,17 @@ class WsSession : public QObject void flushDelayed(); void sendDelayed(const QByteArray &type, const QByteArray &message, int timeout); void ack(int reqId); + void publish(const PublishItem &item); - boost::signals2::signal send; + boost::signals2::signal send; Signal expired; Signal error; private: + void processNextItem(); void setupRequestTimer(); + void filtersFinished(const Filter::MessageFilter::Result &result); + void afterFilters(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content); private slots: void expireTimer_timeout();