Skip to content

Commit

Permalink
when retrying, send to the same proxy that sent the accept
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges committed Jan 26, 2024
1 parent 54b0c4e commit 3ab04d6
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 30 deletions.
11 changes: 8 additions & 3 deletions src/cpp/handler/handlerapp.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2015-2022 Fanout, Inc.
* Copyright (C) 2024 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -257,7 +258,11 @@ class HandlerApp::Private : public QObject
QString proxy_accept_spec = settings.value("handler/proxy_accept_spec").toString();
if(!proxy_accept_spec.isEmpty())
proxy_accept_specs += proxy_accept_spec;
QStringList proxy_retry_out_specs = settings.value("handler/proxy_retry_out_specs").toStringList();
trimlist(&proxy_retry_out_specs);
QString proxy_retry_out_spec = settings.value("handler/proxy_retry_out_spec").toString();
if(!proxy_retry_out_spec.isEmpty())
proxy_retry_out_specs += proxy_retry_out_spec;
QString ws_control_in_spec = settings.value("handler/proxy_ws_control_in_spec").toString();
QString ws_control_out_spec = settings.value("handler/proxy_ws_control_out_spec").toString();
QString stats_spec = settings.value("handler/stats_spec").toString();
Expand Down Expand Up @@ -306,9 +311,9 @@ class HandlerApp::Private : public QObject
return;
}

if(proxy_inspect_specs.isEmpty() || proxy_accept_specs.isEmpty() || proxy_retry_out_spec.isEmpty())
if(proxy_inspect_specs.isEmpty() || proxy_accept_specs.isEmpty() || proxy_retry_out_specs.isEmpty())
{
log_error("must set proxy_inspect_specs, proxy_accept_specs, and proxy_retry_out_spec");
log_error("must set proxy_inspect_specs, proxy_accept_specs, and proxy_retry_out_specs");
q->quit(0);
return;
}
Expand All @@ -331,7 +336,7 @@ class HandlerApp::Private : public QObject
config.clientInSpecs = intreq_in_specs;
config.inspectSpecs = proxy_inspect_specs;
config.acceptSpecs = proxy_accept_specs;
config.retryOutSpec = proxy_retry_out_spec;
config.retryOutSpecs = proxy_retry_out_specs;
config.wsControlInSpec = ws_control_in_spec;
config.wsControlOutSpec = ws_control_out_spec;
config.statsSpec = stats_spec;
Expand Down
42 changes: 26 additions & 16 deletions src/cpp/handler/handlerengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ class AcceptWorker : public Deferred
}

Signal sessionsReady;
boost::signals2::signal<void(const RetryRequestPacket&)> retryPacketReady;
boost::signals2::signal<void(const QByteArray &,const RetryRequestPacket&)> retryPacketReady;

private:
static HttpRequestData parseRequestData(const QVariantHash &args, const QString &field)
Expand Down Expand Up @@ -1054,7 +1054,7 @@ class AcceptWorker : public Deferred
rp.route = route.toUtf8();
rp.retrySeq = stats->lastRetrySeq();

emit retryPacketReady(rp);
emit retryPacketReady(reqFrom, rp);

setFinished(true);
return;
Expand Down Expand Up @@ -1093,6 +1093,7 @@ class AcceptWorker : public Deferred
implicitChannelsSet += channel;

HttpSession::AcceptData adata;
adata.from = reqFrom;
adata.requestData = origRequestData;
adata.logicalPeerAddress = rs.logicalPeerAddress;
adata.debug = rs.debug;
Expand Down Expand Up @@ -1455,20 +1456,24 @@ class HandlerEngine::Private : public QObject
log_info("in sub: %s", qPrintable(config.pushInSubSpecs.join(", ")));
}

if(!config.retryOutSpec.isEmpty())
if(!config.retryOutSpecs.isEmpty())
{
retrySock = new QZmq::Socket(QZmq::Socket::Push, this);
retrySock = new QZmq::Socket(QZmq::Socket::Router, this);
retrySock->setHwm(DEFAULT_HWM);
retrySock->setShutdownWaitTime(RETRY_WAIT_TIME);
retrySock->setRouterMandatoryEnabled(true);

QString errorMessage;
if(!ZUtil::setupSocket(retrySock, config.retryOutSpec, false, config.ipcFileMode, &errorMessage))
foreach(const QString &spec, config.retryOutSpecs)
{
log_error("%s", qPrintable(errorMessage));
return false;
QString errorMessage;
if(!ZUtil::setupSocket(retrySock, spec, false, config.ipcFileMode, &errorMessage))
{
log_error("%s", qPrintable(errorMessage));
return false;
}
}

log_info("retry: %s", qPrintable(config.retryOutSpec));
log_info("retry: %s", qPrintable(config.retryOutSpecs.join(", ")));
}

if(!config.wsControlInSpec.isEmpty() && !config.wsControlOutSpec.isEmpty())
Expand Down Expand Up @@ -1622,7 +1627,7 @@ class HandlerEngine::Private : public QObject
sequencer->addItem(item, seq);
}

void writeRetryPacket(const RetryRequestPacket &packet)
void writeRetryPacket(const QByteArray &instanceAddress, const RetryRequestPacket &packet)
{
if(!retrySock)
{
Expand All @@ -1633,9 +1638,13 @@ class HandlerEngine::Private : public QObject
QVariant vout = packet.toVariant();

if(log_outputLevel() >= LOG_LEVEL_DEBUG)
log_debug("OUT retry: %s", qPrintable(TnetString::variantToString(vout, -1)));
log_debug("OUT retry: to=%s %s", instanceAddress.data(), qPrintable(TnetString::variantToString(vout, -1)));

retrySock->write(QList<QByteArray>() << TnetString::fromVariant(vout));
QList<QByteArray> msg;
msg += instanceAddress;
msg += QByteArray();
msg += TnetString::fromVariant(vout);
retrySock->write(msg);
}

void writeWsControlItems(const QList<WsControlPacket::Item> &items)
Expand Down Expand Up @@ -1969,7 +1978,7 @@ class HandlerEngine::Private : public QObject
AcceptWorker *w = new AcceptWorker(req, stateClient, &cs, zhttpIn, zhttpOut, stats, updateLimiter, httpSessionUpdateManager, config.connectionSubscriptionMax, this);
finishedConnection[w] = w->finished.connect(boost::bind(&Private::acceptWorker_finished, this, boost::placeholders::_1, w));
sessionsReadyConnection[w] = w->sessionsReady.connect(boost::bind(&Private::acceptWorker_sessionsReady, this, w));
retryPacketReadyConnection[w] = w->retryPacketReady.connect(boost::bind(&Private::acceptWorker_retryPacketReady, this, boost::placeholders::_1));
retryPacketReadyConnection[w] = w->retryPacketReady.connect(boost::bind(&Private::acceptWorker_retryPacketReady, this, boost::placeholders::_1, boost::placeholders::_2));
acceptWorkers += w;

w->start();
Expand Down Expand Up @@ -2380,9 +2389,9 @@ class HandlerEngine::Private : public QObject
}
}

void acceptWorker_retryPacketReady(const RetryRequestPacket &packet)
void acceptWorker_retryPacketReady(const QByteArray &instanceAddress, const RetryRequestPacket &packet)
{
writeRetryPacket(packet);
writeRetryPacket(instanceAddress, packet);
}

void stats_connectionsRefreshed(const QList<QByteArray> &ids)
Expand Down Expand Up @@ -3131,6 +3140,7 @@ private slots:

void hs_finished(HttpSession *hs)
{
QByteArray addr = hs->retryToAddress();
RetryRequestPacket rp = hs->retryPacket();

cs.httpSessions.remove(hs->rid());
Expand All @@ -3141,7 +3151,7 @@ private slots:
hs->deleteLater();

if(!rp.requests.isEmpty())
writeRetryPacket(rp);
writeRetryPacket(addr, rp);
}

void wssession_send(int reqId, const QByteArray &type, const QByteArray &message, WsSession *s)
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/handler/handlerengine.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2015-2023 Fanout, Inc.
* Copyright (C) 2024 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -50,7 +51,7 @@ class HandlerEngine : public QObject
QStringList clientInSpecs;
QStringList inspectSpecs;
QStringList acceptSpecs;
QString retryOutSpec;
QStringList retryOutSpecs;
QString wsControlInSpec;
QString wsControlOutSpec;
QString statsSpec;
Expand Down
7 changes: 7 additions & 0 deletions src/cpp/handler/httpsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class HttpSession::Private : public QObject
Priority needUpdatePriority;
UpdateAction *pendingAction;
QList<PublishItem> publishQueue;
QByteArray retryToAddress;
RetryRequestPacket retryPacket;
LogUtil::Config logConfig;
FilterStack *responseFilters;
Expand Down Expand Up @@ -1138,6 +1139,7 @@ class HttpSession::Private : public QObject
rp.route = adata.route.toUtf8();
rp.retrySeq = stats->lastRetrySeq();

retryToAddress = adata.from;
retryPacket = rp;
}
else
Expand Down Expand Up @@ -1599,6 +1601,11 @@ QHash<QString, QString> HttpSession::meta() const
return d->instruct.meta;
}

QByteArray HttpSession::retryToAddress() const
{
return d->retryToAddress;
}

RetryRequestPacket HttpSession::retryPacket() const
{
return d->retryPacket;
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/handler/httpsession.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2016-2023 Fanout, Inc.
* Copyright (C) 2024 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -53,6 +54,7 @@ class HttpSession : public QObject
class AcceptData
{
public:
QByteArray from;
QHostAddress logicalPeerAddress;
bool debug;
bool isRetry;
Expand Down Expand Up @@ -95,6 +97,7 @@ class HttpSession : public QObject
QString sid() const;
QHash<QString, Instruct::Channel> channels() const;
QHash<QString, QString> meta() const;
QByteArray retryToAddress() const;
RetryRequestPacket retryPacket() const;

void start();
Expand Down
10 changes: 7 additions & 3 deletions src/cpp/proxy/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <assert.h>
#include "qzmqsocket.h"
#include "qzmqvalve.h"
#include "qzmqreqmessage.h"
#include "tnetstring.h"
#include "packet/httpresponsedata.h"
#include "packet/retryrequestpacket.h"
Expand Down Expand Up @@ -272,8 +273,9 @@ class Engine::Private : public QObject

if(!config.retryInSpec.isEmpty())
{
handler_retry_in_sock = new QZmq::Socket(QZmq::Socket::Pull, this);
handler_retry_in_sock = new QZmq::Socket(QZmq::Socket::Router, this);

handler_retry_in_sock->setIdentity(config.clientId);
handler_retry_in_sock->setHwm(DEFAULT_HWM);

QString errorMessage;
Expand Down Expand Up @@ -837,14 +839,16 @@ private slots:
private:
void handler_retry_in_readyRead(const QList<QByteArray> &message)
{
if(message.count() != 1)
QZmq::ReqMessage req(message);

if(req.content().count() != 1)
{
log_warning("retry: received message with parts != 1, skipping");
return;
}

bool ok;
QVariant data = TnetString::toVariant(message[0], 0, &ok);
QVariant data = TnetString::toVariant(req.content()[0], 0, &ok);
if(!ok)
{
log_warning("retry: received message with invalid format (tnetstring parse failed), skipping");
Expand Down
14 changes: 14 additions & 0 deletions src/cpp/qzmq/src/qzmqsocket.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2012-2020 Justin Karneges
* Copyright (C) 2024 Fastly, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the
Expand Down Expand Up @@ -92,6 +93,14 @@ static void set_immediate(void *sock, bool on)
assert(ret == 0);
}

static void set_router_mandatory(void *sock, bool on)
{
int v = on ? 1 : 0;
size_t opt_len = sizeof(v);
int ret = wzmq_setsockopt(sock, WZMQ_ROUTER_MANDATORY, &v, opt_len);
assert(ret == 0);
}

#else

static void set_immediate(void *sock, bool on)
Expand Down Expand Up @@ -708,6 +717,11 @@ void Socket::setImmediateEnabled(bool on)
set_immediate(d->sock, on);
}

void Socket::setRouterMandatoryEnabled(bool on)
{
set_router_mandatory(d->sock, on);
}

void Socket::setTcpKeepAliveEnabled(bool on)
{
set_tcp_keepalive(d->sock, on ? 1 : 0);
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/qzmq/src/qzmqsocket.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2012-2015 Justin Karneges
* Copyright (C) 2024 Fastly, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the
Expand Down Expand Up @@ -85,6 +86,7 @@ class Socket : public QObject
void setReceiveHwm(int hwm);

void setImmediateEnabled(bool on);
void setRouterMandatoryEnabled(bool on);

void setTcpKeepAliveEnabled(bool on);
void setTcpKeepAliveParameters(int idle = -1, int count = -1, int interval = -1);
Expand Down
10 changes: 8 additions & 2 deletions src/cpp/tests/proxyenginetest.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2013-2022 Fanout, Inc.
* Copyright (C) 2024 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -135,7 +136,7 @@ class Wrapper : public QObject
handlerInspectValve = new QZmq::Valve(handlerInspectSock, this);
handlerInspectValveConnection = handlerInspectValve->readyRead.connect(boost::bind(&Wrapper::handlerInspect_readyRead, this, boost::placeholders::_1));

handlerRetryOutSock = new QZmq::Socket(QZmq::Socket::Push, this);
handlerRetryOutSock = new QZmq::Socket(QZmq::Socket::Router, this);
}

void startHttp()
Expand Down Expand Up @@ -540,7 +541,12 @@ class Wrapper : public QObject
vretry["request-data"] = vaccept["request-data"];
QByteArray buf = TnetString::fromVariant(vretry);
log_debug("retrying: %s", qPrintable(TnetString::variantToString(vretry, -1)));
handlerRetryOutSock->write(QList<QByteArray>() << buf);

QList<QByteArray> msg;
msg.append("proxy");
msg.append(QByteArray());
msg.append(buf);
handlerRetryOutSock->write(msg);
return;
}
}
Expand Down
18 changes: 17 additions & 1 deletion src/ffi.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) 2021-2022 Fanout, Inc.
* Copyright (C) 2023 Fastly, Inc.
* Copyright (C) 2023-2024 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -383,6 +383,7 @@ const WZMQ_TCP_KEEPALIVE: libc::c_int = 10;
const WZMQ_TCP_KEEPALIVE_IDLE: libc::c_int = 11;
const WZMQ_TCP_KEEPALIVE_CNT: libc::c_int = 12;
const WZMQ_TCP_KEEPALIVE_INTVL: libc::c_int = 13;
const WZMQ_ROUTER_MANDATORY: libc::c_int = 14;

// NOTE: must match values in wzmq.h
const WZMQ_DONTWAIT: libc::c_int = 0x01;
Expand Down Expand Up @@ -725,6 +726,21 @@ pub unsafe extern "C" fn wzmq_setsockopt(
return -1;
}
}
WZMQ_ROUTER_MANDATORY => {
if option_len as u32 != libc::c_int::BITS / 8 {
return -1;
}

let x = match (option_value as *mut libc::c_int).as_ref() {
Some(x) => x,
None => return -1,
};

if let Err(e) = sock.set_router_mandatory(*x != 0) {
set_errno(e.to_raw());
return -1;
}
}
WZMQ_SNDHWM => {
if option_len as u32 != libc::c_int::BITS / 8 {
return -1;
Expand Down
Loading

0 comments on commit 3ab04d6

Please sign in to comment.