diff --git a/src/handler/filter.cpp b/src/handler/filter.cpp index 4af5e903..3449b5dc 100644 --- a/src/handler/filter.cpp +++ b/src/handler/filter.cpp @@ -23,9 +23,13 @@ #include "filter.h" +#include +#include #include "log.h" #include "format.h" #include "idformat.h" +#include "zhttpmanager.h" +#include "zhttprequest.h" namespace { @@ -304,6 +308,202 @@ class VarSubstFilter : public Filter, public Filter::MessageFilter } }; +class HttpFilter : public Filter::MessageFilter +{ +public: + enum Mode + { + Check, + Modify + }; + + Mode mode; + std::unique_ptr req; + boost::signals2::scoped_connection readyReadConnection; + boost::signals2::scoped_connection errorConnection; + QByteArray origContent; + bool haveResponseHeader; + QByteArray responseBody; + + HttpFilter(Mode _mode) : + mode(_mode), + haveResponseHeader(false) + { + } + + virtual void start(const Filter::Context &context, const QByteArray &content) + { + QUrl url = QUrl(context.subscriptionMeta.value("url"), QUrl::StrictMode); + if(!url.isValid()) + { + Result r; + r.errorMessage = "invalid or missing url value"; + finished(r); + return; + } + + QUrl currentUri = context.currentUri; + if(currentUri.scheme() == "wss") + currentUri.setScheme("https"); + else if(currentUri.scheme() == "ws") + currentUri.setScheme("http"); + + QUrl destUri = currentUri.resolved(url); + + origContent = content; + + req.reset(context.zhttpOut->createRequest()); + readyReadConnection = req->readyRead.connect(boost::bind(&HttpFilter::req_readyRead, this)); + errorConnection = req->error.connect(boost::bind(&HttpFilter::req_error, this)); + + int currentPort = currentUri.port(currentUri.scheme() == "https" ? 443 : 80); + int destPort = destUri.port(destUri.scheme() == "https" ? 443 : 80); + + QVariantHash passthroughData; + + passthroughData["route"] = context.route.toUtf8(); + + // if dest link points to the same service as the current request, + // then we can assume the network would send the request back to + // us, so we can handle it internally. if the link points to a + // different service, then we can't make this assumption and need + // to make the request over the network. note that such a request + // could still end up looping back to us + if(destUri.scheme() == currentUri.scheme() && destUri.host() == currentUri.host() && destPort == currentPort) + { + // tell the proxy that we prefer the request to be handled + // internally, using the same route + passthroughData["prefer-internal"] = true; + } + + // needed in case internal routing is not used + if(context.trusted) + passthroughData["trusted"] = true; + + req->setPassthroughData(passthroughData); + + HttpHeaders headers; + + { + QVariantMap vmap; + QHashIterator it(context.subscriptionMeta); + while(it.hasNext()) + { + it.next(); + vmap[it.key()] = it.value(); + } + + QJsonDocument doc = QJsonDocument(QJsonObject::fromVariantMap(vmap)); + headers += HttpHeader("Sub-Meta", doc.toJson(QJsonDocument::Compact)); + } + + { + QVariantMap vmap; + QHashIterator it(context.publishMeta); + while(it.hasNext()) + { + it.next(); + vmap[it.key()] = it.value(); + } + + QJsonDocument doc = QJsonDocument(QJsonObject::fromVariantMap(vmap)); + headers += HttpHeader("Pub-Meta", doc.toJson(QJsonDocument::Compact)); + } + + { + QHashIterator it(context.prevIds); + while(it.hasNext()) + { + it.next(); + const QString &name = it.key(); + const QString &prevId = it.value(); + + if(!prevId.isNull()) + headers += HttpHeader("Grip-Last", name.toUtf8() + "; last-id=" + prevId.toUtf8()); + } + } + + req->start("POST", destUri, headers); + + if(mode == Modify) + req->writeBody(content); + + req->endBody(); + } + + void req_readyRead() + { + if(!haveResponseHeader) + { + haveResponseHeader = true; + + int code = req->responseCode(); + switch(code) + { + case 200: + case 204: + break; + default: + Result r; + r.errorMessage = QString("unexpected network request status: code=%1").arg(code); + finished(r); + return; + } + } + + QByteArray body = req->readBody(); + + if(mode == Modify) + responseBody += body; + + if(!req->isFinished()) + return; + + Result r; + + if(req->responseHeaders().get("Action") == "drop") + { + // drop + r.sendAction = Filter::Drop; + } + else + { + // accept + r.sendAction = Filter::Send; + + switch(mode) + { + case Check: + // as-is + r.content = origContent; + break; + case Modify: + switch(req->responseCode()) + { + case 204: + // as-is + r.content = origContent; + break; + default: + // replace content + r.content = responseBody; + break; + } + break; + } + } + + finished(r); + } + + void req_error() + { + Result r; + r.errorMessage = "network request failed"; + finished(r); + } +}; + } Filter::MessageFilter::~MessageFilter() = default; @@ -371,6 +571,10 @@ Filter::MessageFilter *Filter::createMessageFilter(const QString &name) return new BuildIdFilter; else if(name == "var-subst") return new VarSubstFilter; + else if(name == "http-check") + return new HttpFilter(HttpFilter::Check); + else if(name == "http-modify") + return new HttpFilter(HttpFilter::Modify); else return 0; } @@ -382,7 +586,9 @@ QStringList Filter::names() << "skip-users" << "require-sub" << "build-id" - << "var-subst"); + << "var-subst" + << "http-check" + << "http-modify"); } Filter::Targets Filter::targets(const QString &name) @@ -397,6 +603,10 @@ Filter::Targets Filter::targets(const QString &name) return Filter::Targets(Filter::MessageContent | Filter::ResponseContent); else if(name == "var-subst") return Filter::MessageContent; + else if(name == "http-check") + return Filter::MessageDelivery; + else if(name == "http-modify") + return Filter::Targets(Filter::MessageDelivery | Filter::MessageContent); else return Filter::Targets(0); } diff --git a/src/handler/filtertest.cpp b/src/handler/filtertest.cpp index a82ac6d6..5dc8f11f 100644 --- a/src/handler/filtertest.cpp +++ b/src/handler/filtertest.cpp @@ -20,16 +20,147 @@ * $FANOUT_END_LICENSE$ */ +#include +#include #include +#include +#include +#include #include +#include "qzmqsocket.h" +#include "qzmqvalve.h" +#include "log.h" +#include "tnetstring.h" +#include "rtimer.h" +#include "zhttprequestpacket.h" +#include "zhttpresponsepacket.h" +#include "zhttpmanager.h" #include "filter.h" +class HttpFilterServer +{ +public: + std::unique_ptr zhttpIn; + std::unordered_map> reqs; + + HttpFilterServer(const QDir &workDir) + { + zhttpIn = std::make_unique(); + zhttpIn->setInstanceId("filter-test-server"); + zhttpIn->setBind(true); + zhttpIn->setServerInSpecs(QStringList() << QString("ipc://%1").arg(workDir.filePath("filter-test-in"))); + zhttpIn->setServerInStreamSpecs(QStringList() << QString("ipc://%1").arg(workDir.filePath("filter-test-in-stream"))); + zhttpIn->setServerOutSpecs(QStringList() << QString("ipc://%1").arg(workDir.filePath("filter-test-out"))); + zhttpIn->requestReady.connect(boost::bind(&HttpFilterServer::zhttpIn_requestReady, this)); + } + + void zhttpIn_requestReady() + { + ZhttpRequest *req = zhttpIn->takeNextRequest(); + if(!req) + return; + + req->readyRead.connect(boost::bind(&HttpFilterServer::req_readyRead, this, req)); + req->bytesWritten.connect(boost::bind(&HttpFilterServer::req_bytesWritten, this, req, boost::placeholders::_1)); + + reqs.emplace(std::make_pair(req, std::unique_ptr(req))); + + req_readyRead(req); + } + + void req_readyRead(ZhttpRequest *req) + { + if(!req->isInputFinished()) + return; + + handle(req); + } + + void req_bytesWritten(ZhttpRequest *req, int written) + { + if(!req->isFinished()) + return; + + reqs.erase(req); + } + + void respond(ZhttpRequest *req, int code, const QByteArray &reason, const HttpHeaders &headers, const QByteArray &body) + { + req->beginResponse(code, reason, headers); + req->writeBody(body); + req->endBody(); + } + + void respondOk(ZhttpRequest *req, int code, bool accept, const QByteArray &body) + { + HttpHeaders headers; + if(!accept) + headers += HttpHeader("Action", "drop"); + + respond(req, code, "OK", headers, body); + } + + void respondError(ZhttpRequest *req, int code, const QByteArray &reason, const QByteArray &body) + { + respond(req, code, reason, HttpHeaders(), body); + } + + void handle(ZhttpRequest *req) + { + if(req->requestMethod() != "POST") + { + respondError(req, 400, "Bad Request", "Method must be POST\n"); + return; + } + + QUrl uri = req->requestUri(); + QByteArray body = req->readBody(); + + if(uri.path() == "/filter/accept") + { + respondOk(req, 200, true, ""); + } + else if(uri.path() == "/filter/drop") + { + respondOk(req, 200, false, ""); + } + else if(uri.path() == "/filter/modify") + { + if(req->requestHeaders().get("Grip-Last") != "test; last-id=a") + { + respondError(req, 400, "Bad Request", "Unexpected Grip-Last"); + return; + } + + QJsonDocument subMetaDoc = QJsonDocument::fromJson(req->requestHeaders().get("Sub-Meta")); + QJsonObject subMeta = subMetaDoc.object(); + QJsonDocument pubMetaDoc = QJsonDocument::fromJson(req->requestHeaders().get("Pub-Meta")); + QJsonObject pubMeta = pubMetaDoc.object(); + + QString prepend = subMeta["prepend"].toString(); + QString append = pubMeta["append"].toString(); + + if(!prepend.isEmpty() || !append.isEmpty()) + respondOk(req, 200, true, prepend.toUtf8() + body + append.toUtf8()); + else + respondOk(req, 204, true, ""); + } + else + { + respondError(req, 400, "Bad Request", "Bad Request\n"); + } + } +}; + class FilterTest : public QObject { Q_OBJECT private: - std::tuple runMessageFilters(const QStringList &filterNames, const Filter::Context &context, const QByteArray &content) + std::unique_ptr filterServer; + std::unique_ptr zhttpOut; + + Filter::MessageFilter::Result runMessageFilters(const QStringList &filterNames, const Filter::Context &context, const QByteArray &content) { Filter::MessageFilterStack fs(filterNames); @@ -43,10 +174,41 @@ class FilterTest : public QObject fs.start(context, content); - return {finished, r}; + while(!finished) + QTest::qWait(10); + + return r; } private slots: + void initTestCase() + { + log_setOutputLevel(LOG_LEVEL_WARNING); + + QDir outDir(qgetenv("OUT_DIR")); + QDir workDir(QDir::current().relativeFilePath(outDir.filePath("test-work"))); + + RTimer::init(100); + + filterServer = std::make_unique(workDir); + + zhttpOut = std::make_unique(); + zhttpOut->setInstanceId("filter-test-client"); + zhttpOut->setClientOutSpecs(QStringList() << QString("ipc://%1").arg(workDir.filePath("filter-test-in"))); + zhttpOut->setClientOutStreamSpecs(QStringList() << QString("ipc://%1").arg(workDir.filePath("filter-test-in-stream"))); + zhttpOut->setClientInSpecs(QStringList() << QString("ipc://%1").arg(workDir.filePath("filter-test-out"))); + + QTest::qWait(500); + } + + void cleanupTestCase() + { + zhttpOut.reset(); + filterServer.reset(); + + RTimer::deinit(); + } + void messageFilters() { QStringList filterNames = QStringList() << "skip-self" << "var-subst"; @@ -57,8 +219,7 @@ private slots: QByteArray content = "hello %(user)s"; { - auto [finished, r] = runMessageFilters(filterNames, context, content); - QVERIFY(finished); + auto r = runMessageFilters(filterNames, context, content); QVERIFY(r.errorMessage.isNull()); QCOMPARE(r.sendAction, Filter::Send); QCOMPARE(r.content, "hello alice"); @@ -66,11 +227,74 @@ private slots: { context.publishMeta["sender"] = "alice"; - auto [finished, r] = runMessageFilters(filterNames, context, content); - QVERIFY(finished); + auto r = runMessageFilters(filterNames, context, content); + QVERIFY(r.errorMessage.isNull()); + QCOMPARE(r.sendAction, Filter::Drop); + } + } + + void httpCheck() + { + QStringList filterNames = QStringList() << "http-check"; + + Filter::Context context; + context.subscriptionMeta["url"] = "/filter/accept"; + context.zhttpOut = zhttpOut.get(); + context.currentUri = "http://localhost/"; + + QByteArray content = "hello world"; + + { + auto r = runMessageFilters(filterNames, context, content); + QVERIFY(r.errorMessage.isNull()); + QCOMPARE(r.sendAction, Filter::Send); + QCOMPARE(r.content, "hello world"); + } + + context.subscriptionMeta["url"] = "/filter/drop"; + + { + auto r = runMessageFilters(filterNames, context, content); QVERIFY(r.errorMessage.isNull()); QCOMPARE(r.sendAction, Filter::Drop); } + + context.subscriptionMeta["url"] = "/filter/error"; + + { + auto r = runMessageFilters(filterNames, context, content); + QCOMPARE(r.errorMessage, "unexpected network request status: code=400"); + } + } + + void httpModify() + { + QStringList filterNames = QStringList() << "http-modify"; + + Filter::Context context; + context.prevIds["test"] = "a"; + context.subscriptionMeta["url"] = "/filter/modify"; + context.zhttpOut = zhttpOut.get(); + context.currentUri = "http://localhost/"; + + QByteArray content = "hello world"; + + { + auto r = runMessageFilters(filterNames, context, content); + QVERIFY(r.errorMessage.isNull()); + QCOMPARE(r.sendAction, Filter::Send); + QCOMPARE(r.content, "hello world"); + } + + context.subscriptionMeta["prepend"] = "<<<"; + context.publishMeta["append"] = ">>>"; + + { + auto r = runMessageFilters(filterNames, context, content); + QVERIFY(r.errorMessage.isNull()); + QCOMPARE(r.sendAction, Filter::Send); + QCOMPARE(r.content, "<<>>"); + } } }; diff --git a/src/proxy/proxyutil.cpp b/src/proxy/proxyutil.cpp index ec39ee2a..41f9d8dc 100644 --- a/src/proxy/proxyutil.cpp +++ b/src/proxy/proxyutil.cpp @@ -177,7 +177,7 @@ void manipulateRequestHeaders(const char *logprefix, void *object, HttpRequestDa requestData->headers.removeAll("Grip-Feature"); requestData->headers += HttpHeader("Grip-Feature", - "status, session, link:next, link:gone, filter:skip-self, filter:skip-users, filter:require-sub, filter:build-id, filter:var-subst"); + "status, session, link:next, link:gone, filter:skip-self, filter:skip-users, filter:require-sub, filter:build-id, filter:var-subst, filter:http-check, filter:http-modify"); if(!idata.sid.isEmpty()) {