From 3ab04d6b6ae2cac2ad1c1ad5201564c436bf8666 Mon Sep 17 00:00:00 2001 From: Justin Karneges Date: Fri, 26 Jan 2024 11:36:11 -0800 Subject: [PATCH] when retrying, send to the same proxy that sent the accept --- src/cpp/handler/handlerapp.cpp | 11 +++++--- src/cpp/handler/handlerengine.cpp | 42 +++++++++++++++++++------------ src/cpp/handler/handlerengine.h | 3 ++- src/cpp/handler/httpsession.cpp | 7 ++++++ src/cpp/handler/httpsession.h | 3 +++ src/cpp/proxy/engine.cpp | 10 +++++--- src/cpp/qzmq/src/qzmqsocket.cpp | 14 +++++++++++ src/cpp/qzmq/src/qzmqsocket.h | 2 ++ src/cpp/tests/proxyenginetest.cpp | 10 ++++++-- src/ffi.rs | 18 ++++++++++++- src/internal.conf | 6 ++--- src/rust/wzmq.h | 3 ++- 12 files changed, 99 insertions(+), 30 deletions(-) diff --git a/src/cpp/handler/handlerapp.cpp b/src/cpp/handler/handlerapp.cpp index b555bf0f..ea12cae5 100644 --- a/src/cpp/handler/handlerapp.cpp +++ b/src/cpp/handler/handlerapp.cpp @@ -1,5 +1,6 @@ /* * Copyright (C) 2015-2022 Fanout, Inc. + * Copyright (C) 2024 Fastly, Inc. * * This file is part of Pushpin. * @@ -257,7 +258,11 @@ class HandlerApp::Private : public QObject QString proxy_accept_spec = settings.value("handler/proxy_accept_spec").toString(); if(!proxy_accept_spec.isEmpty()) proxy_accept_specs += proxy_accept_spec; + QStringList proxy_retry_out_specs = settings.value("handler/proxy_retry_out_specs").toStringList(); + trimlist(&proxy_retry_out_specs); QString proxy_retry_out_spec = settings.value("handler/proxy_retry_out_spec").toString(); + if(!proxy_retry_out_spec.isEmpty()) + proxy_retry_out_specs += proxy_retry_out_spec; QString ws_control_in_spec = settings.value("handler/proxy_ws_control_in_spec").toString(); QString ws_control_out_spec = settings.value("handler/proxy_ws_control_out_spec").toString(); QString stats_spec = settings.value("handler/stats_spec").toString(); @@ -306,9 +311,9 @@ class HandlerApp::Private : public QObject return; } - if(proxy_inspect_specs.isEmpty() || proxy_accept_specs.isEmpty() || proxy_retry_out_spec.isEmpty()) + if(proxy_inspect_specs.isEmpty() || proxy_accept_specs.isEmpty() || proxy_retry_out_specs.isEmpty()) { - log_error("must set proxy_inspect_specs, proxy_accept_specs, and proxy_retry_out_spec"); + log_error("must set proxy_inspect_specs, proxy_accept_specs, and proxy_retry_out_specs"); q->quit(0); return; } @@ -331,7 +336,7 @@ class HandlerApp::Private : public QObject config.clientInSpecs = intreq_in_specs; config.inspectSpecs = proxy_inspect_specs; config.acceptSpecs = proxy_accept_specs; - config.retryOutSpec = proxy_retry_out_spec; + config.retryOutSpecs = proxy_retry_out_specs; config.wsControlInSpec = ws_control_in_spec; config.wsControlOutSpec = ws_control_out_spec; config.statsSpec = stats_spec; diff --git a/src/cpp/handler/handlerengine.cpp b/src/cpp/handler/handlerengine.cpp index 9eee0a11..528b634f 100644 --- a/src/cpp/handler/handlerengine.cpp +++ b/src/cpp/handler/handlerengine.cpp @@ -825,7 +825,7 @@ class AcceptWorker : public Deferred } Signal sessionsReady; - boost::signals2::signal retryPacketReady; + boost::signals2::signal retryPacketReady; private: static HttpRequestData parseRequestData(const QVariantHash &args, const QString &field) @@ -1054,7 +1054,7 @@ class AcceptWorker : public Deferred rp.route = route.toUtf8(); rp.retrySeq = stats->lastRetrySeq(); - emit retryPacketReady(rp); + emit retryPacketReady(reqFrom, rp); setFinished(true); return; @@ -1093,6 +1093,7 @@ class AcceptWorker : public Deferred implicitChannelsSet += channel; HttpSession::AcceptData adata; + adata.from = reqFrom; adata.requestData = origRequestData; adata.logicalPeerAddress = rs.logicalPeerAddress; adata.debug = rs.debug; @@ -1455,20 +1456,24 @@ class HandlerEngine::Private : public QObject log_info("in sub: %s", qPrintable(config.pushInSubSpecs.join(", "))); } - if(!config.retryOutSpec.isEmpty()) + if(!config.retryOutSpecs.isEmpty()) { - retrySock = new QZmq::Socket(QZmq::Socket::Push, this); + retrySock = new QZmq::Socket(QZmq::Socket::Router, this); retrySock->setHwm(DEFAULT_HWM); retrySock->setShutdownWaitTime(RETRY_WAIT_TIME); + retrySock->setRouterMandatoryEnabled(true); - QString errorMessage; - if(!ZUtil::setupSocket(retrySock, config.retryOutSpec, false, config.ipcFileMode, &errorMessage)) + foreach(const QString &spec, config.retryOutSpecs) { - log_error("%s", qPrintable(errorMessage)); - return false; + QString errorMessage; + if(!ZUtil::setupSocket(retrySock, spec, false, config.ipcFileMode, &errorMessage)) + { + log_error("%s", qPrintable(errorMessage)); + return false; + } } - log_info("retry: %s", qPrintable(config.retryOutSpec)); + log_info("retry: %s", qPrintable(config.retryOutSpecs.join(", "))); } if(!config.wsControlInSpec.isEmpty() && !config.wsControlOutSpec.isEmpty()) @@ -1622,7 +1627,7 @@ class HandlerEngine::Private : public QObject sequencer->addItem(item, seq); } - void writeRetryPacket(const RetryRequestPacket &packet) + void writeRetryPacket(const QByteArray &instanceAddress, const RetryRequestPacket &packet) { if(!retrySock) { @@ -1633,9 +1638,13 @@ class HandlerEngine::Private : public QObject QVariant vout = packet.toVariant(); if(log_outputLevel() >= LOG_LEVEL_DEBUG) - log_debug("OUT retry: %s", qPrintable(TnetString::variantToString(vout, -1))); + log_debug("OUT retry: to=%s %s", instanceAddress.data(), qPrintable(TnetString::variantToString(vout, -1))); - retrySock->write(QList() << TnetString::fromVariant(vout)); + QList msg; + msg += instanceAddress; + msg += QByteArray(); + msg += TnetString::fromVariant(vout); + retrySock->write(msg); } void writeWsControlItems(const QList &items) @@ -1969,7 +1978,7 @@ class HandlerEngine::Private : public QObject AcceptWorker *w = new AcceptWorker(req, stateClient, &cs, zhttpIn, zhttpOut, stats, updateLimiter, httpSessionUpdateManager, config.connectionSubscriptionMax, this); finishedConnection[w] = w->finished.connect(boost::bind(&Private::acceptWorker_finished, this, boost::placeholders::_1, w)); sessionsReadyConnection[w] = w->sessionsReady.connect(boost::bind(&Private::acceptWorker_sessionsReady, this, w)); - retryPacketReadyConnection[w] = w->retryPacketReady.connect(boost::bind(&Private::acceptWorker_retryPacketReady, this, boost::placeholders::_1)); + retryPacketReadyConnection[w] = w->retryPacketReady.connect(boost::bind(&Private::acceptWorker_retryPacketReady, this, boost::placeholders::_1, boost::placeholders::_2)); acceptWorkers += w; w->start(); @@ -2380,9 +2389,9 @@ class HandlerEngine::Private : public QObject } } - void acceptWorker_retryPacketReady(const RetryRequestPacket &packet) + void acceptWorker_retryPacketReady(const QByteArray &instanceAddress, const RetryRequestPacket &packet) { - writeRetryPacket(packet); + writeRetryPacket(instanceAddress, packet); } void stats_connectionsRefreshed(const QList &ids) @@ -3131,6 +3140,7 @@ private slots: void hs_finished(HttpSession *hs) { + QByteArray addr = hs->retryToAddress(); RetryRequestPacket rp = hs->retryPacket(); cs.httpSessions.remove(hs->rid()); @@ -3141,7 +3151,7 @@ private slots: hs->deleteLater(); if(!rp.requests.isEmpty()) - writeRetryPacket(rp); + writeRetryPacket(addr, rp); } void wssession_send(int reqId, const QByteArray &type, const QByteArray &message, WsSession *s) diff --git a/src/cpp/handler/handlerengine.h b/src/cpp/handler/handlerengine.h index 342fe29a..9be2d8be 100644 --- a/src/cpp/handler/handlerengine.h +++ b/src/cpp/handler/handlerengine.h @@ -1,5 +1,6 @@ /* * Copyright (C) 2015-2023 Fanout, Inc. + * Copyright (C) 2024 Fastly, Inc. * * This file is part of Pushpin. * @@ -50,7 +51,7 @@ class HandlerEngine : public QObject QStringList clientInSpecs; QStringList inspectSpecs; QStringList acceptSpecs; - QString retryOutSpec; + QStringList retryOutSpecs; QString wsControlInSpec; QString wsControlOutSpec; QString statsSpec; diff --git a/src/cpp/handler/httpsession.cpp b/src/cpp/handler/httpsession.cpp index 40dbf08b..ec770dcd 100644 --- a/src/cpp/handler/httpsession.cpp +++ b/src/cpp/handler/httpsession.cpp @@ -167,6 +167,7 @@ class HttpSession::Private : public QObject Priority needUpdatePriority; UpdateAction *pendingAction; QList publishQueue; + QByteArray retryToAddress; RetryRequestPacket retryPacket; LogUtil::Config logConfig; FilterStack *responseFilters; @@ -1138,6 +1139,7 @@ class HttpSession::Private : public QObject rp.route = adata.route.toUtf8(); rp.retrySeq = stats->lastRetrySeq(); + retryToAddress = adata.from; retryPacket = rp; } else @@ -1599,6 +1601,11 @@ QHash HttpSession::meta() const return d->instruct.meta; } +QByteArray HttpSession::retryToAddress() const +{ + return d->retryToAddress; +} + RetryRequestPacket HttpSession::retryPacket() const { return d->retryPacket; diff --git a/src/cpp/handler/httpsession.h b/src/cpp/handler/httpsession.h index e67c8705..46a72d8d 100644 --- a/src/cpp/handler/httpsession.h +++ b/src/cpp/handler/httpsession.h @@ -1,5 +1,6 @@ /* * Copyright (C) 2016-2023 Fanout, Inc. + * Copyright (C) 2024 Fastly, Inc. * * This file is part of Pushpin. * @@ -53,6 +54,7 @@ class HttpSession : public QObject class AcceptData { public: + QByteArray from; QHostAddress logicalPeerAddress; bool debug; bool isRetry; @@ -95,6 +97,7 @@ class HttpSession : public QObject QString sid() const; QHash channels() const; QHash meta() const; + QByteArray retryToAddress() const; RetryRequestPacket retryPacket() const; void start(); diff --git a/src/cpp/proxy/engine.cpp b/src/cpp/proxy/engine.cpp index 98f01a78..8303a953 100644 --- a/src/cpp/proxy/engine.cpp +++ b/src/cpp/proxy/engine.cpp @@ -26,6 +26,7 @@ #include #include "qzmqsocket.h" #include "qzmqvalve.h" +#include "qzmqreqmessage.h" #include "tnetstring.h" #include "packet/httpresponsedata.h" #include "packet/retryrequestpacket.h" @@ -272,8 +273,9 @@ class Engine::Private : public QObject if(!config.retryInSpec.isEmpty()) { - handler_retry_in_sock = new QZmq::Socket(QZmq::Socket::Pull, this); + handler_retry_in_sock = new QZmq::Socket(QZmq::Socket::Router, this); + handler_retry_in_sock->setIdentity(config.clientId); handler_retry_in_sock->setHwm(DEFAULT_HWM); QString errorMessage; @@ -837,14 +839,16 @@ private slots: private: void handler_retry_in_readyRead(const QList &message) { - if(message.count() != 1) + QZmq::ReqMessage req(message); + + if(req.content().count() != 1) { log_warning("retry: received message with parts != 1, skipping"); return; } bool ok; - QVariant data = TnetString::toVariant(message[0], 0, &ok); + QVariant data = TnetString::toVariant(req.content()[0], 0, &ok); if(!ok) { log_warning("retry: received message with invalid format (tnetstring parse failed), skipping"); diff --git a/src/cpp/qzmq/src/qzmqsocket.cpp b/src/cpp/qzmq/src/qzmqsocket.cpp index edd38b88..eedc65f5 100644 --- a/src/cpp/qzmq/src/qzmqsocket.cpp +++ b/src/cpp/qzmq/src/qzmqsocket.cpp @@ -1,5 +1,6 @@ /* * Copyright (C) 2012-2020 Justin Karneges + * Copyright (C) 2024 Fastly, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the @@ -92,6 +93,14 @@ static void set_immediate(void *sock, bool on) assert(ret == 0); } +static void set_router_mandatory(void *sock, bool on) +{ + int v = on ? 1 : 0; + size_t opt_len = sizeof(v); + int ret = wzmq_setsockopt(sock, WZMQ_ROUTER_MANDATORY, &v, opt_len); + assert(ret == 0); +} + #else static void set_immediate(void *sock, bool on) @@ -708,6 +717,11 @@ void Socket::setImmediateEnabled(bool on) set_immediate(d->sock, on); } +void Socket::setRouterMandatoryEnabled(bool on) +{ + set_router_mandatory(d->sock, on); +} + void Socket::setTcpKeepAliveEnabled(bool on) { set_tcp_keepalive(d->sock, on ? 1 : 0); diff --git a/src/cpp/qzmq/src/qzmqsocket.h b/src/cpp/qzmq/src/qzmqsocket.h index b890d061..d4796efe 100644 --- a/src/cpp/qzmq/src/qzmqsocket.h +++ b/src/cpp/qzmq/src/qzmqsocket.h @@ -1,5 +1,6 @@ /* * Copyright (C) 2012-2015 Justin Karneges + * Copyright (C) 2024 Fastly, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the @@ -85,6 +86,7 @@ class Socket : public QObject void setReceiveHwm(int hwm); void setImmediateEnabled(bool on); + void setRouterMandatoryEnabled(bool on); void setTcpKeepAliveEnabled(bool on); void setTcpKeepAliveParameters(int idle = -1, int count = -1, int interval = -1); diff --git a/src/cpp/tests/proxyenginetest.cpp b/src/cpp/tests/proxyenginetest.cpp index 65eee9ea..30af53f2 100644 --- a/src/cpp/tests/proxyenginetest.cpp +++ b/src/cpp/tests/proxyenginetest.cpp @@ -1,5 +1,6 @@ /* * Copyright (C) 2013-2022 Fanout, Inc. + * Copyright (C) 2024 Fastly, Inc. * * This file is part of Pushpin. * @@ -135,7 +136,7 @@ class Wrapper : public QObject handlerInspectValve = new QZmq::Valve(handlerInspectSock, this); handlerInspectValveConnection = handlerInspectValve->readyRead.connect(boost::bind(&Wrapper::handlerInspect_readyRead, this, boost::placeholders::_1)); - handlerRetryOutSock = new QZmq::Socket(QZmq::Socket::Push, this); + handlerRetryOutSock = new QZmq::Socket(QZmq::Socket::Router, this); } void startHttp() @@ -540,7 +541,12 @@ class Wrapper : public QObject vretry["request-data"] = vaccept["request-data"]; QByteArray buf = TnetString::fromVariant(vretry); log_debug("retrying: %s", qPrintable(TnetString::variantToString(vretry, -1))); - handlerRetryOutSock->write(QList() << buf); + + QList msg; + msg.append("proxy"); + msg.append(QByteArray()); + msg.append(buf); + handlerRetryOutSock->write(msg); return; } } diff --git a/src/ffi.rs b/src/ffi.rs index 47ba7603..5c71c239 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -1,6 +1,6 @@ /* * Copyright (C) 2021-2022 Fanout, Inc. - * Copyright (C) 2023 Fastly, Inc. + * Copyright (C) 2023-2024 Fastly, Inc. * * This file is part of Pushpin. * @@ -383,6 +383,7 @@ const WZMQ_TCP_KEEPALIVE: libc::c_int = 10; const WZMQ_TCP_KEEPALIVE_IDLE: libc::c_int = 11; const WZMQ_TCP_KEEPALIVE_CNT: libc::c_int = 12; const WZMQ_TCP_KEEPALIVE_INTVL: libc::c_int = 13; +const WZMQ_ROUTER_MANDATORY: libc::c_int = 14; // NOTE: must match values in wzmq.h const WZMQ_DONTWAIT: libc::c_int = 0x01; @@ -725,6 +726,21 @@ pub unsafe extern "C" fn wzmq_setsockopt( return -1; } } + WZMQ_ROUTER_MANDATORY => { + if option_len as u32 != libc::c_int::BITS / 8 { + return -1; + } + + let x = match (option_value as *mut libc::c_int).as_ref() { + Some(x) => x, + None => return -1, + }; + + if let Err(e) = sock.set_router_mandatory(*x != 0) { + set_errno(e.to_raw()); + return -1; + } + } WZMQ_SNDHWM => { if option_len as u32 != libc::c_int::BITS / 8 { return -1; diff --git a/src/internal.conf b/src/internal.conf index 715c2fc4..28599015 100644 --- a/src/internal.conf +++ b/src/internal.conf @@ -41,7 +41,7 @@ handler_inspect_spec=ipc://{rundir}/{ipc_prefix}inspect # bind DEALER for passing off requests (internal, used with handler) handler_accept_spec=ipc://{rundir}/{ipc_prefix}accept -# bind PULL for receiving retry requests (internal, used with handler) +# bind ROUTER for receiving retry requests (internal, used with handler) handler_retry_in_spec=ipc://{rundir}/{ipc_prefix}retry # bind PULL for reading handler WS control messages @@ -73,8 +73,8 @@ proxy_inspect_specs=ipc://{rundir}/{ipc_prefix}inspect # list of connect REP for receiving HTTP requests (internal, used with proxy) proxy_accept_specs=ipc://{rundir}/{ipc_prefix}accept -# connect PUSH for sending HTTP requests (internal, used with proxy) -proxy_retry_out_spec=ipc://{rundir}/{ipc_prefix}retry +# list of connect ROUTER for sending HTTP requests (internal, used with proxy) +proxy_retry_out_specs=ipc://{rundir}/{ipc_prefix}retry # bind PULL for reading proxy WS control messages proxy_ws_control_in_spec=ipc://{rundir}/{ipc_prefix}ws-control-out diff --git a/src/rust/wzmq.h b/src/rust/wzmq.h index 6d95e716..d8d6e4fe 100644 --- a/src/rust/wzmq.h +++ b/src/rust/wzmq.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023 Fastly, Inc. + * Copyright (C) 2023-2024 Fastly, Inc. * * This file is part of Pushpin. * @@ -58,6 +58,7 @@ #define WZMQ_TCP_KEEPALIVE_IDLE 11 #define WZMQ_TCP_KEEPALIVE_CNT 12 #define WZMQ_TCP_KEEPALIVE_INTVL 13 +#define WZMQ_ROUTER_MANDATORY 14 // NOTE: must match values in ffi.rs #define WZMQ_DONTWAIT 0x01