From 0cd5135b27b589bf61c917236456d2164f65bce9 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 25 Oct 2024 16:11:48 +0100 Subject: [PATCH 01/11] Move localQueue options into their own object --- perfTest/graphile.config.js | 4 ++- src/index.ts | 50 +++++++++++++++++++------------------ src/localQueue.ts | 6 ++--- src/main.ts | 2 +- 4 files changed, 33 insertions(+), 29 deletions(-) diff --git a/perfTest/graphile.config.js b/perfTest/graphile.config.js index 841ba7d6..62ee0b4c 100644 --- a/perfTest/graphile.config.js +++ b/perfTest/graphile.config.js @@ -15,7 +15,9 @@ const preset = { fileExtensions: [".js", ".cjs", ".mjs"], // fileExtensions: [".js", ".cjs", ".mjs", ".ts", ".cts", ".mts"], gracefulShutdownAbortTimeout: 2500, - localQueueSize: -1, + localQueue: { + size: -1, + }, completeJobBatchDelay: -1, failJobBatchDelay: -1, }, diff --git a/src/index.ts b/src/index.ts index 96ac478c..9b6a3626 100644 --- a/src/index.ts +++ b/src/index.ts @@ -155,31 +155,33 @@ declare global { events?: WorkerEvents; - /** - * To enable processing jobs in batches, set this to an integer larger - * than 1. This will result in jobs being fetched by the pool rather than - * the worker, the pool will fetch (and lock!) `localQueueSize` jobs up - * front, and each time a worker requests a job it will be served from - * this list until the list is exhausted, at which point a new set of - * jobs will be fetched (and locked). - * - * This setting can help reduce the load on your database from looking - * for jobs, but is only really effective when there are often many jobs - * queued and ready to go, and can increase the latency of job execution - * because a single worker may lock jobs into its queue leaving other - * workers idle. - * - * @default `-1` - */ - localQueueSize?: number; + localQueue?: { + /** + * To enable processing jobs in batches, set this to an integer larger + * than 1. This will result in jobs being fetched by the pool rather than + * the worker, the pool will fetch (and lock!) `localQueue.size` jobs up + * front, and each time a worker requests a job it will be served from + * this list until the list is exhausted, at which point a new set of + * jobs will be fetched (and locked). + * + * This setting can help reduce the load on your database from looking + * for jobs, but is only really effective when there are often many jobs + * queued and ready to go, and can increase the latency of job execution + * because a single worker may lock jobs into its queue leaving other + * workers idle. + * + * @default `-1` + */ + size: number; - /** - * How long should jobs sit in the local queue before they are returned - * to the database? Defaults to 5 minutes. - * - * @default `300000` - */ - localQueueTtl?: number; + /** + * How long should jobs sit in the local queue before they are returned + * to the database? Defaults to 5 minutes. + * + * @default `300000` + */ + ttl?: number; + }; /** * The time in milliseconds to wait after a `completeJob` call to see if diff --git a/src/localQueue.ts b/src/localQueue.ts index 8299b895..e82d0193 100644 --- a/src/localQueue.ts +++ b/src/localQueue.ts @@ -23,8 +23,8 @@ const RELEASED = "RELEASED"; * relieving the workers of this responsibility. * * The local queue trades latency for throughput: jobs may sit in the local - * queue for a longer time (maximum `localQueueSize` jobs waiting maximum - * `localQueueTTL` milliseconds), but fewer requests to the database are made + * queue for a longer time (maximum `localQueue.size` jobs waiting maximum + * `localQueue.ttl` milliseconds), but fewer requests to the database are made * for jobs since more jobs are fetched at once, enabling the worker to reach * higher levels of performance (and reducing read stress on the DB). * @@ -107,7 +107,7 @@ export class LocalQueue { private readonly continuous: boolean, ) { this.ttl = - compiledSharedOptions.resolvedPreset.worker.localQueueTtl ?? 5 * MINUTE; + compiledSharedOptions.resolvedPreset.worker.localQueue?.ttl ?? 5 * MINUTE; this.pollInterval = compiledSharedOptions.resolvedPreset.worker.pollInterval ?? 2 * SECOND; this.setModePolling(); diff --git a/src/main.ts b/src/main.ts index 5f86221d..0686c19d 100644 --- a/src/main.ts +++ b/src/main.ts @@ -536,7 +536,7 @@ export function _runTaskList( worker: { concurrentJobs: baseConcurrency, gracefulShutdownAbortTimeout, - localQueueSize = -1, + localQueue: { size: localQueueSize = -1 } = {}, completeJobBatchDelay = -1, failJobBatchDelay = -1, }, From 16c63bfcd3fd0a6ada2c7d0cdb8172016cee5930 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 25 Oct 2024 16:14:02 +0100 Subject: [PATCH 02/11] Introduce STARTING mode --- src/localQueue.ts | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/localQueue.ts b/src/localQueue.ts index e82d0193..fd397616 100644 --- a/src/localQueue.ts +++ b/src/localQueue.ts @@ -11,6 +11,7 @@ import { GetJobFunction, Job, TaskList, WorkerPool } from "./interfaces"; import { getJob as baseGetJob } from "./sql/getJob"; import { returnJob } from "./sql/returnJob"; +const STARTING = "STARTING"; const POLLING = "POLLING"; const WAITING = "WAITING"; const TTL_EXPIRED = "TTL_EXPIRED"; @@ -30,18 +31,24 @@ const RELEASED = "RELEASED"; * * The local queue is always in one of these modes: * + * - STARTING mode * - POLLING mode * - WAITING mode * - TTL_EXPIRED mode * - RELEASED mode * + * ## STARTING mode + * + * STARTING mode is the initial state of the local queue. + * + * Immediately move to POLLING mode. + * * ## POLLING mode * - * POLLING mode is the initial state of the local queue. The queue will only be - * in POLLING mode when it contains no cached jobs. + * The queue will only be in POLLING mode when it contains no cached jobs. * - * When the queue enters POLLING mode (and when it starts) it will trigger a - * fetch of jobs from the database. + * When the queue enters POLLING mode it will trigger a fetch of jobs from the + * database. * * If no jobs were returned then it will wait `pollInterval` ms and then fetch * again. @@ -94,7 +101,12 @@ export class LocalQueue { // Set true to fetch immediately after a fetch completes; typically only used // when the queue is pulsed during a fetch. fetchAgain = false; - mode: typeof POLLING | typeof WAITING | typeof TTL_EXPIRED | typeof RELEASED; + mode: + | typeof STARTING + | typeof POLLING + | typeof WAITING + | typeof TTL_EXPIRED + | typeof RELEASED = STARTING; private promise = defer(); private backgroundCount = 0; From 31c2e186e147c74821803ceb79525f0564bd583c Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 25 Oct 2024 17:30:46 +0100 Subject: [PATCH 03/11] Add localQueue refetchDelay feature: be kind on DB when queue is near-empty --- src/index.ts | 42 +++++++++++++ src/localQueue.ts | 148 ++++++++++++++++++++++++++++++++++++++++------ src/main.ts | 2 +- 3 files changed, 174 insertions(+), 18 deletions(-) diff --git a/src/index.ts b/src/index.ts index 9b6a3626..cd33c952 100644 --- a/src/index.ts +++ b/src/index.ts @@ -181,6 +181,48 @@ declare global { * @default `300000` */ ttl?: number; + + /** + * When running at very high scale (multiple worker instances, each + * with some level of concurrency), Worker's polling can cause + * significant load on the database when there are too few jobs in the + * database to keep all worker pools busy - each time a new job comes + * in, each pool may request it, multiplying up the load. To reduce + * this impact, when a pool receives no (or few) results to its query + * for new jobs, we can instigate a "refetch delay" to cause the pool + * to wait before issuing its next poll for jobs, even when new job + * notifications come in. + */ + refetchDelay?: { + /** + * How long in milliseconds to wait, on average, before asking for + * more jobs when a previous fetch results in insufficient jobs to + * fill the local queue. (Causes the local queue to (mostly) ignore + * "new job" notifications.) + * + * When new jobs are coming in but the workers are mostly idle, you + * can expect on average `(1000/durationMs) * INSTANCE_COUNT` "get jobs" + * queries per second to be issued to your database. Increasing this + * decreases database load at the cost of increased latency when there + * are insufficient jobs in the database to keep the local queue full. + */ + durationMs: number; + /** + * How many jobs should a fetch return to trigger the refetchDelay? + * Must be less than the local queue size + * + * @default {0} + */ + threshold?: number; + /** + * How many new jobs, on average, can the pool that's in idle fetch + * delay be notified of before it aborts the refetch delay and fetches + * anyway + * + * @default {5 * localQueue.size} + */ + abortThreshold?: number; + }; }; /** diff --git a/src/localQueue.ts b/src/localQueue.ts index fd397616..e0dc196c 100644 --- a/src/localQueue.ts +++ b/src/localQueue.ts @@ -47,17 +47,25 @@ const RELEASED = "RELEASED"; * * The queue will only be in POLLING mode when it contains no cached jobs. * - * When the queue enters POLLING mode it will trigger a fetch of jobs from the - * database. + * When the queue enters POLLING mode: * - * If no jobs were returned then it will wait `pollInterval` ms and then fetch - * again. + * - if any refetch delay has expired it will trigger a fetch of jobs from the + * database, + * - otherwise it will trigger a refetch to happen once the refetch delay has + * completed. * - * If a "new job" notification is received during the polling interval then the - * timer will be cancelled, and a fetch will be fired immediately. + * When jobs are fetched: * - * If jobs are returned from a POLLING mode fetch then the queue immediately - * enters WAITING mode. + * - if no jobs were returned then it will wait `pollInterval` ms and then + * fetch again. + * - if fewer than `Math.ceil(Math.min(localQueueRefetchDelay.threshold, localQueueSize))` + * jobs were returned then a refetch delay will be set (if configured). + * - if jobs are returned from a POLLING mode fetch then the queue immediately + * enters WAITING mode. + * + * When a "new job" notification is received, once any required refetch delay + * has expired (or immediately if it has already expired) the timer will be + * cancelled, and a fetch will be fired immediately. * * ## WAITING mode * @@ -110,6 +118,12 @@ export class LocalQueue { private promise = defer(); private backgroundCount = 0; + /** If `localQueueRefetchDelay` is configured; set this true if the fetch resulted in a queue size lower than the threshold. */ + private refetchDelayActive = false; + private refetchDelayFetchOnComplete = false; + private refetchDelayTimer: NodeJS.Timeout | null = null; + private refetchDelayCounter: number = 0; + constructor( private readonly compiledSharedOptions: CompiledSharedOptions, private readonly tasks: TaskList, @@ -122,6 +136,17 @@ export class LocalQueue { compiledSharedOptions.resolvedPreset.worker.localQueue?.ttl ?? 5 * MINUTE; this.pollInterval = compiledSharedOptions.resolvedPreset.worker.pollInterval ?? 2 * SECOND; + const localQueueRefetchDelayDuration = + compiledSharedOptions.resolvedPreset.worker.localQueue?.refetchDelay + ?.durationMs; + if ( + localQueueRefetchDelayDuration != null && + localQueueRefetchDelayDuration > this.pollInterval + ) { + throw new Error( + `Invalid configuration; 'preset.worker.localQueue.refetchDelay.durationMs' (${localQueueRefetchDelayDuration}) must not be larger than 'preset.worker.pollInterval' (${this.pollInterval})`, + ); + } this.setModePolling(); } @@ -250,6 +275,14 @@ export class LocalQueue { } private fetch = (): void => { + if (this.fetchTimer) { + clearTimeout(this.fetchTimer); + this.fetchTimer = null; + } + if (this.refetchDelayActive) { + this.refetchDelayFetchOnComplete = true; + return; + } this.background( this._fetch().catch((e) => { // This should not happen @@ -262,6 +295,9 @@ export class LocalQueue { private async _fetch() { let fetchedMax = false; + let fetchedUnderRefetchDelayThreshold = false; + const refetchDelayOptions = + this.compiledSharedOptions.resolvedPreset.worker.localQueue?.refetchDelay; try { assert.equal(this.mode, POLLING, "Can only fetch when in polling mode"); assert.equal( @@ -269,12 +305,19 @@ export class LocalQueue { false, "Cannot fetch when a fetch is already in progress", ); - if (this.fetchTimer) { - clearTimeout(this.fetchTimer); - this.fetchTimer = null; - } + assert.equal( + this.refetchDelayActive, + false, + "Can not fetch when fetches are meant to be delayed", + ); + assert.equal( + this.jobQueue.length, + 0, + "Should not fetch when job queue isn't empty", + ); this.fetchAgain = false; this.fetchInProgress = true; + this.refetchDelayCounter = 0; // The ONLY await in this function. const jobs = await baseGetJob( @@ -289,10 +332,18 @@ export class LocalQueue { assert.equal( this.jobQueue.length, 0, - "Should not fetch when job queue isn't empty", + "Should not fetch when job queue isn't empty (recheck)", ); const jobCount = jobs.length; fetchedMax = jobCount >= this.getJobBatchSize; + fetchedUnderRefetchDelayThreshold = + !fetchedMax && + !!refetchDelayOptions && + jobCount < Math.floor(refetchDelayOptions.threshold ?? 0); + + // NOTE: we don't need to handle `this.mode === RELEASED` here because + // being in that mode guarantees the workerQueue is empty. + const workerCount = Math.min(jobCount, this.workerQueue.length); const workers = this.workerQueue.splice(0, workerCount); for (let i = 0; i < jobCount; i++) { @@ -316,11 +367,31 @@ export class LocalQueue { // Finally, now that there is no fetch in progress, choose what to do next if (this.mode === "RELEASED") { this.returnJobs(); - } else if (this.jobQueue.length > 0) { + return; + } + + if (fetchedUnderRefetchDelayThreshold) { + const ms = + (0.5 + Math.random()) * (refetchDelayOptions?.durationMs ?? 100); + + this.fetchAgain = false; + this.refetchDelayActive = true; + this.refetchDelayFetchOnComplete = false; + // NOTE: this.refetchDelayCounter is set at the beginning of fetch() to allow for pulse() during fetch() + this.refetchDelayTimer = setTimeout(this.refetchDelayCompleteOrAbort, ms); + } + + if (this.jobQueue.length > 0) { this.setModeWaiting(); } else { if (fetchedMax || this.fetchAgain) { - // Maximal fetch; trigger immediate refetch + // Maximal fetch and all jobs instantly consumed; trigger immediate refetch + // OR: new jobs came in during fetch(); trigger immediate refetch + assert.equal( + this.refetchDelayActive, + false, + "refetchDelayActive should imply didn't fetch max and fetchAgain is false", + ); this.fetch(); } else if (this.continuous) { // Set up the timer @@ -329,10 +400,49 @@ export class LocalQueue { this.setModeReleased(); } } + + // In case the counter was incremented sufficiently during fetch() + this.checkRefetchDelayAbortThreshold(); + } + + private refetchDelayCompleteOrAbort = (): void => { + if (this.refetchDelayTimer) { + clearTimeout(this.refetchDelayTimer); + this.refetchDelayTimer = null; + } + this.refetchDelayActive = false; + if (this.mode === POLLING && this.refetchDelayFetchOnComplete) { + // Cancel poll, do now + if (this.fetchTimer) { + clearTimeout(this.fetchTimer); + this.fetchTimer = null; + } + this.fetch(); + } + }; + + private checkRefetchDelayAbortThreshold() { + if (!this.refetchDelayActive || this.mode === "RELEASED") { + return; + } + const refetchDelayOptions = + this.compiledSharedOptions.resolvedPreset.worker.localQueue?.refetchDelay; + const threshold = Math.min( + refetchDelayOptions?.abortThreshold ?? Infinity, + 5 * this.getJobBatchSize, + ); + if (this.refetchDelayCounter >= threshold) { + this.refetchDelayFetchOnComplete = true; + this.refetchDelayCompleteOrAbort(); + } } /** Called when a new job becomes available in the DB */ - public pulse() { + public pulse(count: number) { + this.refetchDelayCounter += count; + + this.checkRefetchDelayAbortThreshold(); + // The only situation when this affects anything is if we're running in polling mode. if (this.mode === POLLING) { if (this.fetchInProgress) { @@ -399,6 +509,11 @@ export class LocalQueue { const oldMode = this.mode; this.mode = RELEASED; + if (this.refetchDelayTimer != null) { + clearTimeout(this.refetchDelayTimer); + this.refetchDelayTimer = null; + } + if (oldMode === POLLING) { // Release pending workers const workers = this.workerQueue.splice(0, this.workerQueue.length); @@ -422,7 +537,6 @@ export class LocalQueue { } else if (oldMode === TTL_EXPIRED) { // No action necessary } - if (this.backgroundCount === 0) { this.promise.resolve(); } diff --git a/src/main.ts b/src/main.ts index 0686c19d..c588b543 100644 --- a/src/main.ts +++ b/src/main.ts @@ -660,7 +660,7 @@ export function _runTaskList( }, nudge(this: WorkerPool, count: number) { if (localQueue) { - localQueue.pulse(); + localQueue.pulse(count); } else { let n = count; // Nudge up to `n` workers From 20a581fa453219f5fdde48c5de821d44455b83c8 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 25 Oct 2024 17:35:35 +0100 Subject: [PATCH 04/11] Randomize the abort threshold --- src/localQueue.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/localQueue.ts b/src/localQueue.ts index e0dc196c..7aa1389f 100644 --- a/src/localQueue.ts +++ b/src/localQueue.ts @@ -123,6 +123,7 @@ export class LocalQueue { private refetchDelayFetchOnComplete = false; private refetchDelayTimer: NodeJS.Timeout | null = null; private refetchDelayCounter: number = 0; + private refetchDelayAbortThreshold: number = Infinity; constructor( private readonly compiledSharedOptions: CompiledSharedOptions, @@ -373,10 +374,17 @@ export class LocalQueue { if (fetchedUnderRefetchDelayThreshold) { const ms = (0.5 + Math.random()) * (refetchDelayOptions?.durationMs ?? 100); + const threshold = + (0.5 + Math.random()) * + Math.min( + refetchDelayOptions?.abortThreshold ?? Infinity, + 5 * this.getJobBatchSize, + ); this.fetchAgain = false; this.refetchDelayActive = true; this.refetchDelayFetchOnComplete = false; + this.refetchDelayAbortThreshold = threshold; // NOTE: this.refetchDelayCounter is set at the beginning of fetch() to allow for pulse() during fetch() this.refetchDelayTimer = setTimeout(this.refetchDelayCompleteOrAbort, ms); } @@ -425,13 +433,7 @@ export class LocalQueue { if (!this.refetchDelayActive || this.mode === "RELEASED") { return; } - const refetchDelayOptions = - this.compiledSharedOptions.resolvedPreset.worker.localQueue?.refetchDelay; - const threshold = Math.min( - refetchDelayOptions?.abortThreshold ?? Infinity, - 5 * this.getJobBatchSize, - ); - if (this.refetchDelayCounter >= threshold) { + if (this.refetchDelayCounter >= this.refetchDelayAbortThreshold) { this.refetchDelayFetchOnComplete = true; this.refetchDelayCompleteOrAbort(); } From d4df163392245b6c36b63a264e5ef92ab70b0c6c Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 25 Oct 2024 17:42:49 +0100 Subject: [PATCH 05/11] Cleaner branching without setting fetchAgain by accident --- src/localQueue.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/localQueue.ts b/src/localQueue.ts index 7aa1389f..931a7145 100644 --- a/src/localQueue.ts +++ b/src/localQueue.ts @@ -410,7 +410,7 @@ export class LocalQueue { } // In case the counter was incremented sufficiently during fetch() - this.checkRefetchDelayAbortThreshold(); + this.handleCheckRefetchDelayAbortThreshold(); } private refetchDelayCompleteOrAbort = (): void => { @@ -429,24 +429,25 @@ export class LocalQueue { } }; - private checkRefetchDelayAbortThreshold() { + private handleCheckRefetchDelayAbortThreshold(): boolean { if (!this.refetchDelayActive || this.mode === "RELEASED") { - return; + return false; } if (this.refetchDelayCounter >= this.refetchDelayAbortThreshold) { this.refetchDelayFetchOnComplete = true; this.refetchDelayCompleteOrAbort(); + return true; } + return false; } /** Called when a new job becomes available in the DB */ public pulse(count: number) { this.refetchDelayCounter += count; - this.checkRefetchDelayAbortThreshold(); - - // The only situation when this affects anything is if we're running in polling mode. - if (this.mode === POLLING) { + if (this.handleCheckRefetchDelayAbortThreshold()) { + /* handled */ + } else if (this.mode === POLLING) { if (this.fetchInProgress) { this.fetchAgain = true; } else if (this.fetchTimer) { @@ -515,6 +516,7 @@ export class LocalQueue { clearTimeout(this.refetchDelayTimer); this.refetchDelayTimer = null; } + this.refetchDelayActive = false; if (oldMode === POLLING) { // Release pending workers From 2ff4688a9f2130461c9b66a938e1d36318d214bc Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 1 Nov 2024 12:25:31 +0000 Subject: [PATCH 06/11] Lint --- src/localQueue.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/localQueue.ts b/src/localQueue.ts index 931a7145..6db94465 100644 --- a/src/localQueue.ts +++ b/src/localQueue.ts @@ -270,7 +270,7 @@ export class LocalQueue { `Failed to return jobs from local queue to database queue`, { error: e }, ); - } + }, ), ); } From d18ff7bcc98399e07f1a51b92b754720bad04272 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 11 Nov 2024 15:42:54 +0000 Subject: [PATCH 07/11] Don't trigger refetch once setting mode to released --- src/localQueue.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/localQueue.ts b/src/localQueue.ts index 6db94465..a1abe0d5 100644 --- a/src/localQueue.ts +++ b/src/localQueue.ts @@ -406,6 +406,7 @@ export class LocalQueue { this.fetchTimer = setTimeout(this.fetch, this.pollInterval); } else { this.setModeReleased(); + return; } } From 77f09afd67472d01e22bab8d5078d784640173c3 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 11 Nov 2024 15:43:40 +0000 Subject: [PATCH 08/11] Fix bug and clarify variable name --- src/localQueue.ts | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/localQueue.ts b/src/localQueue.ts index a1abe0d5..90edbcb7 100644 --- a/src/localQueue.ts +++ b/src/localQueue.ts @@ -295,8 +295,19 @@ export class LocalQueue { }; private async _fetch() { + /** + * Did we fetch the maximum number of records that we could? (If so, we + * should consider fetching again straight away so there's always jobs to + * be done.) + */ let fetchedMax = false; - let fetchedUnderRefetchDelayThreshold = false; + /** + * Did we fetch more jobs than the refetch delay threshold? (Greater than, + * not equal to.) If false, we should start a refetch delay. + * + * Initialized to `true` so on error we don't enable refetch delay. + */ + let refetchDelayThresholdSurpassed = true; const refetchDelayOptions = this.compiledSharedOptions.resolvedPreset.worker.localQueue?.refetchDelay; try { @@ -337,10 +348,13 @@ export class LocalQueue { ); const jobCount = jobs.length; fetchedMax = jobCount >= this.getJobBatchSize; - fetchedUnderRefetchDelayThreshold = - !fetchedMax && - !!refetchDelayOptions && - jobCount < Math.floor(refetchDelayOptions.threshold ?? 0); + refetchDelayThresholdSurpassed = + // If we've fetched the maximum, we've met the requirement + fetchedMax || + // If refetch delay is disabled, we've met the requirement + !refetchDelayOptions || + // If we fetched more than (**not** equal to) `threshold` jobs, we've met the requirement + jobCount > Math.floor(refetchDelayOptions.threshold ?? 0); // NOTE: we don't need to handle `this.mode === RELEASED` here because // being in that mode guarantees the workerQueue is empty. @@ -371,7 +385,7 @@ export class LocalQueue { return; } - if (fetchedUnderRefetchDelayThreshold) { + if (!refetchDelayThresholdSurpassed) { const ms = (0.5 + Math.random()) * (refetchDelayOptions?.durationMs ?? 100); const threshold = From 95c8cc2bd9a14175b188b94aa13dc53c3e8911ae Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 11 Nov 2024 15:43:56 +0000 Subject: [PATCH 09/11] Comments and variable renames for clarity --- src/localQueue.ts | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/localQueue.ts b/src/localQueue.ts index 90edbcb7..8a453c07 100644 --- a/src/localQueue.ts +++ b/src/localQueue.ts @@ -329,6 +329,9 @@ export class LocalQueue { ); this.fetchAgain = false; this.fetchInProgress = true; + // NOTE: this.refetchDelayCounter is set here allow for pulse() during + // fetch(). If the refetch delay threshold is surpassed then this value + // is harmlessly ignored. this.refetchDelayCounter = 0; // The ONLY await in this function. @@ -386,9 +389,11 @@ export class LocalQueue { } if (!refetchDelayThresholdSurpassed) { - const ms = + /** How long to avoid any refetches for */ + const refetchDelayMs = (0.5 + Math.random()) * (refetchDelayOptions?.durationMs ?? 100); - const threshold = + /** How many notifications do we need to receive before we abort the "no refetches" behavior? */ + const abortThreshold = (0.5 + Math.random()) * Math.min( refetchDelayOptions?.abortThreshold ?? Infinity, @@ -398,9 +403,13 @@ export class LocalQueue { this.fetchAgain = false; this.refetchDelayActive = true; this.refetchDelayFetchOnComplete = false; - this.refetchDelayAbortThreshold = threshold; - // NOTE: this.refetchDelayCounter is set at the beginning of fetch() to allow for pulse() during fetch() - this.refetchDelayTimer = setTimeout(this.refetchDelayCompleteOrAbort, ms); + this.refetchDelayAbortThreshold = abortThreshold; + // NOTE: this.refetchDelayCounter is set at the beginning of fetch() + // (i.e. above) to allow for pulse() during fetch() + this.refetchDelayTimer = setTimeout( + this.refetchDelayCompleteOrAbort, + refetchDelayMs, + ); } if (this.jobQueue.length > 0) { From a90f9bcff324e28d9537e5de344041e79d9eeddf Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Wed, 13 Nov 2024 14:37:31 +0000 Subject: [PATCH 10/11] Clarify and fix behavior of refetch delay --- src/localQueue.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/localQueue.ts b/src/localQueue.ts index 8a453c07..ef3baeaa 100644 --- a/src/localQueue.ts +++ b/src/localQueue.ts @@ -438,7 +438,7 @@ export class LocalQueue { } private refetchDelayCompleteOrAbort = (): void => { - if (this.refetchDelayTimer) { + if (this.refetchDelayTimer != null) { clearTimeout(this.refetchDelayTimer); this.refetchDelayTimer = null; } @@ -453,6 +453,10 @@ export class LocalQueue { } }; + /** + * If no refetch delay is active, returns false; otherwise returns true and + * checks to see if we need to abort the delay and trigger a fetch. + */ private handleCheckRefetchDelayAbortThreshold(): boolean { if (!this.refetchDelayActive || this.mode === "RELEASED") { return false; @@ -460,9 +464,8 @@ export class LocalQueue { if (this.refetchDelayCounter >= this.refetchDelayAbortThreshold) { this.refetchDelayFetchOnComplete = true; this.refetchDelayCompleteOrAbort(); - return true; } - return false; + return true; } /** Called when a new job becomes available in the DB */ @@ -470,11 +473,12 @@ export class LocalQueue { this.refetchDelayCounter += count; if (this.handleCheckRefetchDelayAbortThreshold()) { - /* handled */ + // Refetch delay was enabled; we've incremented the counter and taken + // action if necessary. No further action necessary. } else if (this.mode === POLLING) { if (this.fetchInProgress) { this.fetchAgain = true; - } else if (this.fetchTimer) { + } else if (this.fetchTimer != null) { clearTimeout(this.fetchTimer); this.fetchTimer = null; this.fetch(); From d5f35df28fe380ea7d238792a703955a3646c39f Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Wed, 13 Nov 2024 14:39:08 +0000 Subject: [PATCH 11/11] Reduce diff --- src/localQueue.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/localQueue.ts b/src/localQueue.ts index ef3baeaa..a284a294 100644 --- a/src/localQueue.ts +++ b/src/localQueue.ts @@ -569,6 +569,7 @@ export class LocalQueue { } else if (oldMode === TTL_EXPIRED) { // No action necessary } + if (this.backgroundCount === 0) { this.promise.resolve(); }