Skip to content

Commit

Permalink
add message_wait option
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges committed Dec 1, 2021
1 parent 91bb07a commit d55e90d
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 4 deletions.
3 changes: 3 additions & 0 deletions examples/config/pushpin.conf
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ message_hwm=25000
# set to report blocks counts in stats (content size / block size)
#message_block_size=

# max time (milliseconds) for out-of-order messages to wait
message_wait=5000

# time (seconds) to cache message ids
id_cache_ttl=60

Expand Down
2 changes: 2 additions & 0 deletions src/handler/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ class App::Private : public QObject
int messageRate = settings.value("handler/message_rate", -1).toInt();
int messageHwm = settings.value("handler/message_hwm", -1).toInt();
int messageBlockSize = settings.value("handler/message_block_size", -1).toInt();
int messageWait = settings.value("handler/message_wait", 5000).toInt();
int idCacheTtl = settings.value("handler/id_cache_ttl", 0).toInt();
int clientMaxconn = settings.value("runner/client_maxconn", 50000).toInt();
int connectionSubscriptionMax = settings.value("handler/connection_subscription_max", 20).toInt();
Expand Down Expand Up @@ -339,6 +340,7 @@ class App::Private : public QObject
config.messageRate = messageRate;
config.messageHwm = messageHwm;
config.messageBlockSize = messageBlockSize;
config.messageWait = messageWait;
config.idCacheTtl = idCacheTtl;
config.connectionsMax = clientMaxconn;
config.connectionSubscriptionMax = connectionSubscriptionMax;
Expand Down
1 change: 1 addition & 0 deletions src/handler/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,7 @@ class Engine::Private : public QObject
updateLimiter->setRate(10);
updateLimiter->setBatchWaitEnabled(true);

sequencer->setWaitMax(config.messageWait);
sequencer->setIdCacheTtl(config.idCacheTtl);

zhttpIn = new ZhttpManager(this);
Expand Down
2 changes: 2 additions & 0 deletions src/handler/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Engine : public QObject
int messageRate;
int messageHwm;
int messageBlockSize;
int messageWait;
int idCacheTtl;
int connectionsMax;
int connectionSubscriptionMax;
Expand All @@ -89,6 +90,7 @@ class Engine : public QObject
messageRate(-1),
messageHwm(-1),
messageBlockSize(-1),
messageWait(-1),
idCacheTtl(-1),
connectionsMax(-1),
connectionSubscriptionMax(-1),
Expand Down
13 changes: 10 additions & 3 deletions src/handler/sequencer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2016-2017 Fanout, Inc.
* Copyright (C) 2016-2021 Fanout, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -35,7 +35,7 @@
#include "publishlastids.h"

#define CHANNEL_PENDING_MAX 100
#define PENDING_EXPIRE 5000
#define DEFAULT_PENDING_EXPIRE 5000
#define EXPIRE_INTERVAL 1000

class Sequencer::Private : public QObject
Expand Down Expand Up @@ -74,6 +74,7 @@ class Sequencer::Private : public QObject
QHash<QString, ChannelPendingItems> pendingItemsByChannel;
QMap<QPair<qint64, PendingItem*>, PendingItem*> pendingItemsByTime;
QTimer *expireTimer;
int pendingExpireMSecs;
int idCacheTtl;
QHash<QPair<QString, QString>, CachedId*> idCacheById;
QMap<QPair<qint64, CachedId*>, CachedId*> idCacheByExpireTime;
Expand All @@ -82,6 +83,7 @@ class Sequencer::Private : public QObject
QObject(_q),
q(_q),
lastIds(_publishLastIds),
pendingExpireMSecs(DEFAULT_PENDING_EXPIRE),
idCacheTtl(-1)
{
expireTimer = new QTimer(this);
Expand Down Expand Up @@ -237,7 +239,7 @@ private slots:
void expireTimer_timeout()
{
qint64 now = QDateTime::currentMSecsSinceEpoch();
qint64 threshold = now - PENDING_EXPIRE;
qint64 threshold = now - pendingExpireMSecs;

while(!pendingItemsByTime.isEmpty())
{
Expand Down Expand Up @@ -279,6 +281,11 @@ Sequencer::~Sequencer()
delete d;
}

void Sequencer::setWaitMax(int msecs)
{
d->pendingExpireMSecs = msecs;
}

void Sequencer::setIdCacheTtl(int secs)
{
d->idCacheTtl = secs;
Expand Down
3 changes: 2 additions & 1 deletion src/handler/sequencer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2016-2017 Fanout, Inc.
* Copyright (C) 2016-2021 Fanout, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -42,6 +42,7 @@ class Sequencer : public QObject
Sequencer(PublishLastIds *publishLastIds, QObject *parent = 0);
~Sequencer();

void setWaitMax(int msecs);
void setIdCacheTtl(int secs);

// seq = false means ID cache handling only
Expand Down

0 comments on commit d55e90d

Please sign in to comment.