Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handler: read config before starting event loop #48137

Merged
merged 2 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 70 additions & 67 deletions src/handler/handlerapp.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) 2015-2022 Fanout, Inc.
* Copyright (C) 2024 Fastly, Inc.
* Copyright (C) 2024-2025 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -29,8 +29,13 @@
#include <QStringList>
#include <QFile>
#include <QFileInfo>
#include "timer.h"
#include "defercall.h"
#include "processquit.h"
#include "log.h"
#include "httpsession.h"
#include "wssession.h"
#include "httpsessionupdatemanager.h"
#include "settings.h"
#include "handlerengine.h"
#include "config.h"
Expand Down Expand Up @@ -169,48 +174,30 @@ static CommandLineParseResult parseCommandLine(QCommandLineParser *parser, ArgsD
return CommandLineOk;
}

class HandlerApp::Private : public QObject
class HandlerApp::Private
{
Q_OBJECT

public:
HandlerApp *q;
ArgsData args;
HandlerEngine *engine;
Connection quitConnection;
Connection hupConnection;

Private(HandlerApp *_q) :
QObject(_q),
q(_q),
engine(0)
{
quitConnection = ProcessQuit::instance()->quit.connect(boost::bind(&Private::doQuit, this));
hupConnection = ProcessQuit::instance()->hup.connect(boost::bind(&Private::reload, this));
}

void start()
static int run()
{
QCoreApplication::setApplicationName("pushpin-handler");
QCoreApplication::setApplicationVersion(Config::get().version);

QCommandLineParser parser;
parser.setApplicationDescription("Pushpin handler component.");

ArgsData args;
QString errorMessage;
switch(parseCommandLine(&parser, &args, &errorMessage))
{
case CommandLineOk:
break;
case CommandLineError:
fprintf(stderr, "%s\n\n%s", qPrintable(errorMessage), qPrintable(parser.helpText()));
q->quit(1);
return;
return 1;
case CommandLineVersionRequested:
printf("%s %s\n", qPrintable(QCoreApplication::applicationName()),
qPrintable(QCoreApplication::applicationVersion()));
q->quit(0);
return;
return 0;
case CommandLineHelpRequested:
parser.showHelp();
Q_UNREACHABLE();
Expand All @@ -226,8 +213,7 @@ class HandlerApp::Private : public QObject
if(!log_setFile(args.logFile))
{
log_error("failed to open log file: %s", qPrintable(args.logFile));
q->quit(1);
return;
return 1;
}
}

Expand All @@ -243,8 +229,7 @@ class HandlerApp::Private : public QObject
if(!file.open(QIODevice::ReadOnly))
{
log_error("failed to open %s, and --config not passed", qPrintable(configFile));
q->quit(0);
return;
return 1;
}
}

Expand Down Expand Up @@ -340,15 +325,13 @@ class HandlerApp::Private : public QObject
if(m2a_in_stream_specs.isEmpty() || m2a_out_specs.isEmpty())
{
log_error("must set m2a_in_stream_specs and m2a_out_specs");
q->quit(0);
return;
return 1;
}

if(proxy_inspect_specs.isEmpty() || proxy_accept_specs.isEmpty() || proxy_retry_out_specs.isEmpty())
{
log_error("must set proxy_inspect_specs, proxy_accept_specs, and proxy_retry_out_specs");
q->quit(0);
return;
return 1;
}

HandlerEngine::Configuration config;
Expand Down Expand Up @@ -402,53 +385,73 @@ class HandlerApp::Private : public QObject
config.prometheusPort = prometheusPort;
config.prometheusPrefix = prometheusPrefix;

engine = new HandlerEngine(this);
if(!engine->start(config))
{
q->quit(0);
return;
}

log_info("started");
return runLoop(config);
}

private:
void reload()
static int runLoop(const HandlerEngine::Configuration &config)
{
log_info("reloading");
log_rotate();
engine->reload();
}
// includes worst-case subscriptions and update registrations
int timersPerSession = qMax(TIMERS_PER_HTTPSESSION, TIMERS_PER_WSSESSION) +
(config.connectionSubscriptionMax * TIMERS_PER_SUBSCRIPTION) +
TIMERS_PER_UNIQUE_UPDATE_REGISTRATION;

void doQuit()
{
log_info("stopping...");
// enough timers for sessions, plus an extra 100 for misc
Timer::init((config.connectionsMax * timersPerSession) + 100);

std::unique_ptr<HandlerEngine> engine;

DeferCall deferCall;
deferCall.defer([&] {
engine = std::make_unique<HandlerEngine>();

ProcessQuit::instance()->quit.connect([&] {
log_info("stopping...");

// remove the handler, so if we get another signal then we crash out
ProcessQuit::cleanup();
// remove the handler, so if we get another signal then we crash out
ProcessQuit::cleanup();

delete engine;
engine = 0;
engine.reset();

log_debug("stopped");
q->quit(0);
log_debug("stopped");

QCoreApplication::exit(0);
});

ProcessQuit::instance()->hup.connect([&] {
log_info("reloading");
log_rotate();
engine->reload();
});

if(!engine->start(config))
{
engine.reset();
QCoreApplication::exit(1);
return;
}

log_info("started");
});

int ret = QCoreApplication::exec();

// ensure deferred deletes are processed
QCoreApplication::instance()->sendPostedEvents();

// deinit here, after all event loop activity has completed
Timer::deinit();
DeferCall::cleanup();

return ret;
}
};

HandlerApp::HandlerApp(QObject *parent) :
QObject(parent)
{
d = new Private(this);
}
HandlerApp::HandlerApp() = default;

HandlerApp::~HandlerApp()
{
delete d;
}
HandlerApp::~HandlerApp() = default;

void HandlerApp::start()
int HandlerApp::run()
{
d->start();
return Private::run();
}

#include "handlerapp.moc"
19 changes: 5 additions & 14 deletions src/handler/handlerapp.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2016 Fanout, Inc.
* Copyright (C) 2025 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand All @@ -23,28 +24,18 @@
#ifndef HANDLERAPP_H
#define HANDLERAPP_H

#include <QObject>
#include <boost/signals2.hpp>
#include <memory>

using SignalInt = boost::signals2::signal<void(int)>;
using Connection = boost::signals2::scoped_connection;

class HandlerApp : public QObject
class HandlerApp
{
Q_OBJECT

public:
HandlerApp(QObject *parent = 0);
HandlerApp();
~HandlerApp();

void start();

SignalInt quit;
int run();

private:
class Private;
friend class Private;
Private *d;
};

#endif
14 changes: 0 additions & 14 deletions src/handler/handlerengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1156,8 +1156,6 @@ class AcceptWorker : public Deferred
}
};

#define TIMERS_PER_SUBSCRIPTION 1

class Subscription : public QObject
{
Q_OBJECT
Expand Down Expand Up @@ -1340,18 +1338,6 @@ class HandlerEngine::Private : public QObject
{
config = _config;

// destroy known timers and deinit, so we can reinit below
DeferCall::cleanup();
Timer::deinit();

// includes worst-case subscriptions and update registrations
int timersPerSession = qMax(TIMERS_PER_HTTPSESSION, TIMERS_PER_WSSESSION) +
(config.connectionSubscriptionMax * TIMERS_PER_SUBSCRIPTION) +
TIMERS_PER_UNIQUE_UPDATE_REGISTRATION;

// enough timers for sessions, plus an extra 100 for misc
Timer::init((config.connectionsMax * timersPerSession) + 100);

publishLimiter->setRate(config.messageRate);
publishLimiter->setHwm(config.messageHwm);

Expand Down
2 changes: 2 additions & 0 deletions src/handler/handlerengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <boost/signals2.hpp>
#include <map>

#define TIMERS_PER_SUBSCRIPTION 1

using std::map;
using Signal = boost::signals2::signal<void()>;
using Connection = boost::signals2::scoped_connection;
Expand Down
2 changes: 2 additions & 0 deletions src/handler/handlerenginetest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ private slots:
QDir outDir(qgetenv("OUT_DIR"));
QDir workDir(QDir::current().relativeFilePath(outDir.filePath("test-work")));

Timer::init(100);

wrapper = new Wrapper(this, workDir);
wrapper->startHttp();
wrapper->startProxy();
Expand Down
39 changes: 2 additions & 37 deletions src/handler/handlermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,51 +22,16 @@
*/

#include <QCoreApplication>
#include "timer.h"
#include "defercall.h"
#include "handlerapp.h"

class HandlerAppMain
{
public:
HandlerApp *app;

void start()
{
app = new HandlerApp;
app->quit.connect(boost::bind(&HandlerAppMain::app_quit, this, boost::placeholders::_1));
app->start();
}

void app_quit(int returnCode)
{
delete app;
QCoreApplication::exit(returnCode);
}
};

extern "C" {

int handler_main(int argc, char **argv)
{
QCoreApplication qapp(argc, argv);

// plenty to kick things off. will reinit after loading config
Timer::init(100);

HandlerAppMain appMain;
DeferCall deferCall;
deferCall.defer([&] { appMain.start(); });
int ret = qapp.exec();

// ensure deferred deletes are processed
QCoreApplication::instance()->sendPostedEvents();

// deinit here, after all event loop activity has completed
DeferCall::cleanup();
Timer::deinit();

return ret;
HandlerApp app;
return app.run();
}

}
Loading