diff --git a/examples/config/pushpin.conf b/examples/config/pushpin.conf index 738b9d23..0b0ee6da 100644 --- a/examples/config/pushpin.conf +++ b/examples/config/pushpin.conf @@ -137,6 +137,9 @@ message_hwm=25000 # set to report blocks counts in stats (content size / block size) #message_block_size= +# max time (milliseconds) for out-of-order messages to wait +message_wait=5000 + # time (seconds) to cache message ids id_cache_ttl=60 diff --git a/src/handler/app.cpp b/src/handler/app.cpp index c8afbc68..f6fec267 100644 --- a/src/handler/app.cpp +++ b/src/handler/app.cpp @@ -278,6 +278,7 @@ class App::Private : public QObject int messageRate = settings.value("handler/message_rate", -1).toInt(); int messageHwm = settings.value("handler/message_hwm", -1).toInt(); int messageBlockSize = settings.value("handler/message_block_size", -1).toInt(); + int messageWait = settings.value("handler/message_wait", 5000).toInt(); int idCacheTtl = settings.value("handler/id_cache_ttl", 0).toInt(); int clientMaxconn = settings.value("runner/client_maxconn", 50000).toInt(); int connectionSubscriptionMax = settings.value("handler/connection_subscription_max", 20).toInt(); @@ -339,6 +340,7 @@ class App::Private : public QObject config.messageRate = messageRate; config.messageHwm = messageHwm; config.messageBlockSize = messageBlockSize; + config.messageWait = messageWait; config.idCacheTtl = idCacheTtl; config.connectionsMax = clientMaxconn; config.connectionSubscriptionMax = connectionSubscriptionMax; diff --git a/src/handler/engine.cpp b/src/handler/engine.cpp index d03b5352..591483bf 100644 --- a/src/handler/engine.cpp +++ b/src/handler/engine.cpp @@ -1257,6 +1257,7 @@ class Engine::Private : public QObject updateLimiter->setRate(10); updateLimiter->setBatchWaitEnabled(true); + sequencer->setWaitMax(config.messageWait); sequencer->setIdCacheTtl(config.idCacheTtl); zhttpIn = new ZhttpManager(this); diff --git a/src/handler/engine.h b/src/handler/engine.h index dfa09396..f5a8af7c 100644 --- a/src/handler/engine.h +++ b/src/handler/engine.h @@ -70,6 +70,7 @@ class Engine : public QObject int messageRate; int messageHwm; int messageBlockSize; + int messageWait; int idCacheTtl; int connectionsMax; int connectionSubscriptionMax; @@ -89,6 +90,7 @@ class Engine : public QObject messageRate(-1), messageHwm(-1), messageBlockSize(-1), + messageWait(-1), idCacheTtl(-1), connectionsMax(-1), connectionSubscriptionMax(-1), diff --git a/src/handler/sequencer.cpp b/src/handler/sequencer.cpp index 8d11a648..3953ac92 100644 --- a/src/handler/sequencer.cpp +++ b/src/handler/sequencer.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016-2017 Fanout, Inc. + * Copyright (C) 2016-2021 Fanout, Inc. * * This file is part of Pushpin. * @@ -35,7 +35,7 @@ #include "publishlastids.h" #define CHANNEL_PENDING_MAX 100 -#define PENDING_EXPIRE 5000 +#define DEFAULT_PENDING_EXPIRE 5000 #define EXPIRE_INTERVAL 1000 class Sequencer::Private : public QObject @@ -74,6 +74,7 @@ class Sequencer::Private : public QObject QHash pendingItemsByChannel; QMap, PendingItem*> pendingItemsByTime; QTimer *expireTimer; + int pendingExpireMSecs; int idCacheTtl; QHash, CachedId*> idCacheById; QMap, CachedId*> idCacheByExpireTime; @@ -82,6 +83,7 @@ class Sequencer::Private : public QObject QObject(_q), q(_q), lastIds(_publishLastIds), + pendingExpireMSecs(DEFAULT_PENDING_EXPIRE), idCacheTtl(-1) { expireTimer = new QTimer(this); @@ -237,7 +239,7 @@ private slots: void expireTimer_timeout() { qint64 now = QDateTime::currentMSecsSinceEpoch(); - qint64 threshold = now - PENDING_EXPIRE; + qint64 threshold = now - pendingExpireMSecs; while(!pendingItemsByTime.isEmpty()) { @@ -279,6 +281,11 @@ Sequencer::~Sequencer() delete d; } +void Sequencer::setWaitMax(int msecs) +{ + d->pendingExpireMSecs = msecs; +} + void Sequencer::setIdCacheTtl(int secs) { d->idCacheTtl = secs; diff --git a/src/handler/sequencer.h b/src/handler/sequencer.h index 2f8fafc9..f7b6e99e 100644 --- a/src/handler/sequencer.h +++ b/src/handler/sequencer.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016-2017 Fanout, Inc. + * Copyright (C) 2016-2021 Fanout, Inc. * * This file is part of Pushpin. * @@ -42,6 +42,7 @@ class Sequencer : public QObject Sequencer(PublishLastIds *publishLastIds, QObject *parent = 0); ~Sequencer(); + void setWaitMax(int msecs); void setIdCacheTtl(int secs); // seq = false means ID cache handling only