From 093873b9f5216557370ec310ab6e16246fbdcd51 Mon Sep 17 00:00:00 2001 From: Timur Sevimli Date: Tue, 22 Aug 2023 14:13:41 +0300 Subject: [PATCH 1/5] Refactor queue from prototype to class syntax --- lib/queue.js | 427 ++++++++++++++++++++------------------------------- 1 file changed, 165 insertions(+), 262 deletions(-) diff --git a/lib/queue.js b/lib/queue.js index 2a2f6e2..87fbaa6 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -1,300 +1,203 @@ 'use strict'; -// Queue constructor -// concurrency - , asynchronous concurrency -function Queue(concurrency) { - this.paused = false; - this.concurrency = concurrency; - this.waitTimeout = 0; - this.processTimeout = 0; - this.throttleCount = 0; - this.throttleInterval = 1000; - this.count = 0; - this.tasks = []; - this.waiting = []; - this.factors = {}; - this.fifoMode = true; - this.roundRobinMode = false; - this.priorityMode = false; - this.onProcess = null; - this.onDone = null; - this.onSuccess = null; - this.onTimeout = null; - this.onFailure = null; - this.onDrain = null; -} - const QUEUE_TIMEOUT = 'Metasync: Queue timed out'; -// Set wait before processing timeout -// msec - , wait timeout for single item -// -// Returns: -Queue.prototype.wait = function (msec) { - this.waitTimeout = msec; - return this; -}; - -// Throttle to limit throughput -// Signature: count[, interval] -// count - , item count -// interval - , per interval, optional -// default: 1000 msec -// -// Returns: -Queue.prototype.throttle = function (count, interval = 1000) { - this.throttleCount = count; - this.throttleInterval = interval; - return this; -}; +class Queue { + constructor(concurrency) { + this.paused = false; + this.concurrency = concurrency; + this.waitTimeout = 0; + this.processTimeout = 0; + this.throttleCount = 0; + this.throttleInterval = 1000; + this.count = 0; + this.tasks = []; + this.waiting = []; + this.factors = {}; + this.fifoMode = true; + this.roundRobinMode = false; + this.priorityMode = false; + this.onProcess = null; + this.onDone = null; + this.onSuccess = null; + this.onTimeout = null; + this.onFailure = null; + this.onDrain = null; + } -// Add item to queue -// Signature: item[, factor[, priority]] -// item - , to be added -// factor - | , type, source, -// destination or path, optional -// priority - , optional -// -// Returns: -Queue.prototype.add = function (item, factor = 0, priority = 0) { - if (this.priorityMode && !this.roundRobinMode) { - priority = factor; - factor = 0; + wait(msec) { + this.waitTimeout = msec; + return this; } - const task = [item, factor, priority]; - const slot = this.count < this.concurrency; - if (!this.paused && slot && this.onProcess) { - this.next(task); + + throttle(count, interval = 1000) { + this.throttleCount = count; + this.throttleInterval = interval; return this; } - let tasks; - if (this.roundRobinMode) { - tasks = this.factors[factor]; - if (!tasks) { - tasks = []; - this.factors[factor] = tasks; - this.waiting.push(tasks); + + add(item, factor = 0, priority = 0) { + if (this.priorityMode && !this.roundRobinMode) { + priority = factor; + factor = 0; + } + const task = [item, factor, priority]; + const slot = this.count < this.concurrency; + if (!this.paused && slot && this.onProcess) { + this.next(task); + return this; + } + let tasks; + if (this.roundRobinMode) { + tasks = this.factors[factor]; + if (!tasks) { + tasks = []; + this.factors[factor] = tasks; + this.waiting.push(tasks); + } + } else { + tasks = this.tasks; } - } else { - tasks = this.tasks; - } - if (this.fifoMode) tasks.push(task); - else tasks.unshift(task); + if (this.fifoMode) tasks.push(task); + else tasks.unshift(task); - if (this.priorityMode) { - if (this.fifoMode) { - tasks.sort((a, b) => b[2] - a[2]); - } else { - tasks.sort((a, b) => a[2] - b[2]); + if (this.priorityMode) { + if (this.fifoMode) { + tasks.sort((a, b) => b[2] - a[2]); + } else { + tasks.sort((a, b) => a[2] - b[2]); + } } + return this; } - return this; -}; -// Process next item -// task - , next task [item, factor, priority] -// -// Returns: -Queue.prototype.next = function (task) { - const item = task[0]; - let timer; - this.count++; - if (this.processTimeout) { - timer = setTimeout(() => { - const err = new Error(QUEUE_TIMEOUT); - if (this.onTimeout) this.onTimeout(err); - }, this.processTimeout); - } - this.onProcess(item, (err, result) => { - if (this.onDone) this.onDone(err, result); - if (err) { - if (this.onFailure) this.onFailure(err); - } else if (this.onSuccess) { - this.onSuccess(result); - } - if (timer) { - clearTimeout(timer); - timer = null; + next(task) { + const item = task[0]; + let timer; + this.count++; + if (this.processTimeout) { + timer = setTimeout(() => { + const err = new Error(QUEUE_TIMEOUT); + if (this.onTimeout) this.onTimeout(err); + }, this.processTimeout); } - this.count--; - if (this.tasks.length > 0 || this.waiting.length > 0) { - this.takeNext(); - } else if (this.count === 0 && this.onDrain) { - this.onDrain(); - } - }); - return this; -}; - -// Prepare next item for processing -// -// Returns: -Queue.prototype.takeNext = function () { - if (this.paused || !this.onProcess) { + this.onProcess(item, (err, result) => { + if (this.onDone) this.onDone(err, result); + if (err) { + if (this.onFailure) this.onFailure(err); + } else if (this.onSuccess) { + this.onSuccess(result); + } + if (timer) { + clearTimeout(timer); + timer = null; + } + this.count--; + if (this.tasks.length > 0 || this.waiting.length > 0) { + this.takeNext(); + } else if (this.count === 0 && this.onDrain) { + this.onDrain(); + } + }); return this; } - let tasks; - if (this.roundRobinMode) { - tasks = this.waiting.shift(); - if (tasks.length > 1) { - this.waiting.push(tasks); + + takeNext() { + if (this.paused || !this.onProcess) { + return this; } - } else { - tasks = this.tasks; + let tasks; + if (this.roundRobinMode) { + tasks = this.waiting.shift(); + if (tasks.length > 1) { + this.waiting.push(tasks); + } + } else { + tasks = this.tasks; + } + const task = tasks.shift(); + if (task) this.next(task); + return this; } - const task = tasks.shift(); - if (task) this.next(task); - return this; -}; -// Pause queue -// This function is not completely implemented yet -// -// Returns: -Queue.prototype.pause = function () { - this.paused = true; - return this; -}; + pause() { + this.paused = true; + return this; + } -// Resume queue -// This function is not completely implemented yet -// -// Returns: -Queue.prototype.resume = function () { - this.paused = false; - return this; -}; + resume() { + this.paused = false; + return this; + } -// Clear queue -// -// Returns: -Queue.prototype.clear = function () { - this.count = 0; - this.tasks = []; - this.waiting = []; - this.factors = {}; - return this; -}; + clear() { + this.count = 0; + this.tasks = []; + this.waiting = []; + this.factors = {}; + return this; + } -// Set timeout interval and listener -// msec - , process timeout for single item -// onTimeout - -// -// Returns: -Queue.prototype.timeout = function (msec, onTimeout = null) { - this.processTimeout = msec; - if (onTimeout) this.onTimeout = onTimeout; - return this; -}; + timeout(msec, onTimeout = null) { + this.processTimeout = msec; + if (onTimeout) this.onTimeout = onTimeout; + return this; + } -// Set processing function -// fn - -// item - -// callback - -// err - | -// result - -// -// Returns: -Queue.prototype.process = function (fn) { - this.onProcess = fn; - return this; -}; + process(fn) { + this.onProcess = fn; + return this; + } -// Set listener on processing done -// fn - , done listener -// err - | -// result - -// -// Returns: -Queue.prototype.done = function (fn) { - this.onDone = fn; - return this; -}; + done(fn) { + this.onDone = fn; + return this; + } -// Set listener on processing success -// listener - , on success -// item - -// -// Returns: -Queue.prototype.success = function (listener) { - this.onSuccess = listener; - return this; -}; + success(listener) { + this.onSuccess = listener; + return this; + } -// Set listener on processing error -// listener - , on failure -// err - | -// -// Returns: -Queue.prototype.failure = function (listener) { - this.onFailure = listener; - return this; -}; + failure(listener) { + this.onFailure = listener; + return this; + } -// Set listener on drain Queue -// listener - , on drain -// -// Returns: -Queue.prototype.drain = function (listener) { - this.onDrain = listener; - return this; -}; + drain(listener) { + this.onDrain = listener; + return this; + } -// Switch to FIFO mode (default for Queue) -// -// Returns: -Queue.prototype.fifo = function () { - this.fifoMode = true; - return this; -}; + fifo() { + this.fifoMode = true; + return this; + } -// Switch to LIFO mode -// -// Returns: -Queue.prototype.lifo = function () { - this.fifoMode = false; - return this; -}; + lifo() { + this.fifoMode = false; + return this; + } -// Activate or deactivate priority mode -// flag - , default: true, false will -// disable priority mode -// -// Returns: -Queue.prototype.priority = function (flag = true) { - this.priorityMode = flag; - return this; -}; + priority(flag = true) { + this.priorityMode = flag; + return this; + } -// Activate or deactivate round robin mode -// flag - , default: true, false will -// disable roundRobin mode -// -// Returns: -Queue.prototype.roundRobin = function (flag = true) { - this.roundRobinMode = flag; - return this; -}; + roundRobin(flag = true) { + this.roundRobinMode = flag; + return this; + } -// Pipe processed items to different queue -// dest - , destination queue -// -// Returns: -Queue.prototype.pipe = function (dest) { - if (dest instanceof Queue) { - this.success((item) => { - dest.add(item); - }); + pipe(dest) { + if (dest instanceof Queue) { + this.success((item) => void dest.add(item)); + } + return this; } - return this; -}; +} -// Create Queue instance -// concurrency - , simultaneous and -// asynchronously executing tasks -// -// Returns: const queue = (concurrency) => new Queue(concurrency); module.exports = { queue, Queue }; From 334e278722d22c5cc1ec037faebc4e952394b13d Mon Sep 17 00:00:00 2001 From: Timur Sevimli Date: Sat, 26 Aug 2023 16:11:39 +0300 Subject: [PATCH 2/5] Optimise default types --- lib/throttle.js | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/throttle.js b/lib/throttle.js index b634039..c7dbda2 100644 --- a/lib/throttle.js +++ b/lib/throttle.js @@ -8,7 +8,7 @@ // // Returns: const throttle = (timeout, fn, ...args) => { - let timer; + let timer = null; let wait = false; const execute = args @@ -16,7 +16,7 @@ const throttle = (timeout, fn, ...args) => { : (...pars) => (pars ? fn(...pars) : fn()); const delayed = (...pars) => { - timer = undefined; + timer = null; if (wait) execute(...pars); }; @@ -38,12 +38,15 @@ const throttle = (timeout, fn, ...args) => { // fn - , to be debounced // args - , arguments for fn, optional const debounce = (timeout, fn, ...args) => { - let timer; + let timer = null; const debounced = () => (args ? fn(...args) : fn()); const wrapped = () => { - if (timer) clearTimeout(timer); + if (timer) { + clearTimeout(timer); + timer = null; + } timer = setTimeout(debounced, timeout); }; From f880372fa9ed038277d28e743adcd7de159bb5b5 Mon Sep 17 00:00:00 2001 From: Timur Sevimli Date: Sun, 4 Feb 2024 14:04:57 +0300 Subject: [PATCH 3/5] Re-added queue on prototypes --- lib/queue.js | 427 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 262 insertions(+), 165 deletions(-) diff --git a/lib/queue.js b/lib/queue.js index 87fbaa6..2a2f6e2 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -1,203 +1,300 @@ 'use strict'; +// Queue constructor +// concurrency - , asynchronous concurrency +function Queue(concurrency) { + this.paused = false; + this.concurrency = concurrency; + this.waitTimeout = 0; + this.processTimeout = 0; + this.throttleCount = 0; + this.throttleInterval = 1000; + this.count = 0; + this.tasks = []; + this.waiting = []; + this.factors = {}; + this.fifoMode = true; + this.roundRobinMode = false; + this.priorityMode = false; + this.onProcess = null; + this.onDone = null; + this.onSuccess = null; + this.onTimeout = null; + this.onFailure = null; + this.onDrain = null; +} + const QUEUE_TIMEOUT = 'Metasync: Queue timed out'; -class Queue { - constructor(concurrency) { - this.paused = false; - this.concurrency = concurrency; - this.waitTimeout = 0; - this.processTimeout = 0; - this.throttleCount = 0; - this.throttleInterval = 1000; - this.count = 0; - this.tasks = []; - this.waiting = []; - this.factors = {}; - this.fifoMode = true; - this.roundRobinMode = false; - this.priorityMode = false; - this.onProcess = null; - this.onDone = null; - this.onSuccess = null; - this.onTimeout = null; - this.onFailure = null; - this.onDrain = null; - } +// Set wait before processing timeout +// msec - , wait timeout for single item +// +// Returns: +Queue.prototype.wait = function (msec) { + this.waitTimeout = msec; + return this; +}; - wait(msec) { - this.waitTimeout = msec; - return this; - } +// Throttle to limit throughput +// Signature: count[, interval] +// count - , item count +// interval - , per interval, optional +// default: 1000 msec +// +// Returns: +Queue.prototype.throttle = function (count, interval = 1000) { + this.throttleCount = count; + this.throttleInterval = interval; + return this; +}; - throttle(count, interval = 1000) { - this.throttleCount = count; - this.throttleInterval = interval; +// Add item to queue +// Signature: item[, factor[, priority]] +// item - , to be added +// factor - | , type, source, +// destination or path, optional +// priority - , optional +// +// Returns: +Queue.prototype.add = function (item, factor = 0, priority = 0) { + if (this.priorityMode && !this.roundRobinMode) { + priority = factor; + factor = 0; + } + const task = [item, factor, priority]; + const slot = this.count < this.concurrency; + if (!this.paused && slot && this.onProcess) { + this.next(task); return this; } - - add(item, factor = 0, priority = 0) { - if (this.priorityMode && !this.roundRobinMode) { - priority = factor; - factor = 0; - } - const task = [item, factor, priority]; - const slot = this.count < this.concurrency; - if (!this.paused && slot && this.onProcess) { - this.next(task); - return this; - } - let tasks; - if (this.roundRobinMode) { - tasks = this.factors[factor]; - if (!tasks) { - tasks = []; - this.factors[factor] = tasks; - this.waiting.push(tasks); - } - } else { - tasks = this.tasks; + let tasks; + if (this.roundRobinMode) { + tasks = this.factors[factor]; + if (!tasks) { + tasks = []; + this.factors[factor] = tasks; + this.waiting.push(tasks); } + } else { + tasks = this.tasks; + } - if (this.fifoMode) tasks.push(task); - else tasks.unshift(task); + if (this.fifoMode) tasks.push(task); + else tasks.unshift(task); - if (this.priorityMode) { - if (this.fifoMode) { - tasks.sort((a, b) => b[2] - a[2]); - } else { - tasks.sort((a, b) => a[2] - b[2]); - } + if (this.priorityMode) { + if (this.fifoMode) { + tasks.sort((a, b) => b[2] - a[2]); + } else { + tasks.sort((a, b) => a[2] - b[2]); } - return this; } + return this; +}; - next(task) { - const item = task[0]; - let timer; - this.count++; - if (this.processTimeout) { - timer = setTimeout(() => { - const err = new Error(QUEUE_TIMEOUT); - if (this.onTimeout) this.onTimeout(err); - }, this.processTimeout); - } - this.onProcess(item, (err, result) => { - if (this.onDone) this.onDone(err, result); - if (err) { - if (this.onFailure) this.onFailure(err); - } else if (this.onSuccess) { - this.onSuccess(result); - } - if (timer) { - clearTimeout(timer); - timer = null; - } - this.count--; - if (this.tasks.length > 0 || this.waiting.length > 0) { - this.takeNext(); - } else if (this.count === 0 && this.onDrain) { - this.onDrain(); - } - }); - return this; +// Process next item +// task - , next task [item, factor, priority] +// +// Returns: +Queue.prototype.next = function (task) { + const item = task[0]; + let timer; + this.count++; + if (this.processTimeout) { + timer = setTimeout(() => { + const err = new Error(QUEUE_TIMEOUT); + if (this.onTimeout) this.onTimeout(err); + }, this.processTimeout); } - - takeNext() { - if (this.paused || !this.onProcess) { - return this; + this.onProcess(item, (err, result) => { + if (this.onDone) this.onDone(err, result); + if (err) { + if (this.onFailure) this.onFailure(err); + } else if (this.onSuccess) { + this.onSuccess(result); } - let tasks; - if (this.roundRobinMode) { - tasks = this.waiting.shift(); - if (tasks.length > 1) { - this.waiting.push(tasks); - } - } else { - tasks = this.tasks; + if (timer) { + clearTimeout(timer); + timer = null; } - const task = tasks.shift(); - if (task) this.next(task); - return this; - } + this.count--; + if (this.tasks.length > 0 || this.waiting.length > 0) { + this.takeNext(); + } else if (this.count === 0 && this.onDrain) { + this.onDrain(); + } + }); + return this; +}; - pause() { - this.paused = true; +// Prepare next item for processing +// +// Returns: +Queue.prototype.takeNext = function () { + if (this.paused || !this.onProcess) { return this; } - - resume() { - this.paused = false; - return this; + let tasks; + if (this.roundRobinMode) { + tasks = this.waiting.shift(); + if (tasks.length > 1) { + this.waiting.push(tasks); + } + } else { + tasks = this.tasks; } + const task = tasks.shift(); + if (task) this.next(task); + return this; +}; - clear() { - this.count = 0; - this.tasks = []; - this.waiting = []; - this.factors = {}; - return this; - } +// Pause queue +// This function is not completely implemented yet +// +// Returns: +Queue.prototype.pause = function () { + this.paused = true; + return this; +}; - timeout(msec, onTimeout = null) { - this.processTimeout = msec; - if (onTimeout) this.onTimeout = onTimeout; - return this; - } +// Resume queue +// This function is not completely implemented yet +// +// Returns: +Queue.prototype.resume = function () { + this.paused = false; + return this; +}; - process(fn) { - this.onProcess = fn; - return this; - } +// Clear queue +// +// Returns: +Queue.prototype.clear = function () { + this.count = 0; + this.tasks = []; + this.waiting = []; + this.factors = {}; + return this; +}; - done(fn) { - this.onDone = fn; - return this; - } +// Set timeout interval and listener +// msec - , process timeout for single item +// onTimeout - +// +// Returns: +Queue.prototype.timeout = function (msec, onTimeout = null) { + this.processTimeout = msec; + if (onTimeout) this.onTimeout = onTimeout; + return this; +}; - success(listener) { - this.onSuccess = listener; - return this; - } +// Set processing function +// fn - +// item - +// callback - +// err - | +// result - +// +// Returns: +Queue.prototype.process = function (fn) { + this.onProcess = fn; + return this; +}; - failure(listener) { - this.onFailure = listener; - return this; - } +// Set listener on processing done +// fn - , done listener +// err - | +// result - +// +// Returns: +Queue.prototype.done = function (fn) { + this.onDone = fn; + return this; +}; - drain(listener) { - this.onDrain = listener; - return this; - } +// Set listener on processing success +// listener - , on success +// item - +// +// Returns: +Queue.prototype.success = function (listener) { + this.onSuccess = listener; + return this; +}; - fifo() { - this.fifoMode = true; - return this; - } +// Set listener on processing error +// listener - , on failure +// err - | +// +// Returns: +Queue.prototype.failure = function (listener) { + this.onFailure = listener; + return this; +}; - lifo() { - this.fifoMode = false; - return this; - } +// Set listener on drain Queue +// listener - , on drain +// +// Returns: +Queue.prototype.drain = function (listener) { + this.onDrain = listener; + return this; +}; - priority(flag = true) { - this.priorityMode = flag; - return this; - } +// Switch to FIFO mode (default for Queue) +// +// Returns: +Queue.prototype.fifo = function () { + this.fifoMode = true; + return this; +}; - roundRobin(flag = true) { - this.roundRobinMode = flag; - return this; - } +// Switch to LIFO mode +// +// Returns: +Queue.prototype.lifo = function () { + this.fifoMode = false; + return this; +}; - pipe(dest) { - if (dest instanceof Queue) { - this.success((item) => void dest.add(item)); - } - return this; +// Activate or deactivate priority mode +// flag - , default: true, false will +// disable priority mode +// +// Returns: +Queue.prototype.priority = function (flag = true) { + this.priorityMode = flag; + return this; +}; + +// Activate or deactivate round robin mode +// flag - , default: true, false will +// disable roundRobin mode +// +// Returns: +Queue.prototype.roundRobin = function (flag = true) { + this.roundRobinMode = flag; + return this; +}; + +// Pipe processed items to different queue +// dest - , destination queue +// +// Returns: +Queue.prototype.pipe = function (dest) { + if (dest instanceof Queue) { + this.success((item) => { + dest.add(item); + }); } -} + return this; +}; +// Create Queue instance +// concurrency - , simultaneous and +// asynchronously executing tasks +// +// Returns: const queue = (concurrency) => new Queue(concurrency); module.exports = { queue, Queue }; From f372dfbb8b985022a1f743817aeb70e876441d5f Mon Sep 17 00:00:00 2001 From: Timur Sevimli Date: Sun, 4 Feb 2024 14:05:37 +0300 Subject: [PATCH 4/5] Add queue on classes --- lib/queue.class.js | 299 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 299 insertions(+) create mode 100644 lib/queue.class.js diff --git a/lib/queue.class.js b/lib/queue.class.js new file mode 100644 index 0000000..145d5b0 --- /dev/null +++ b/lib/queue.class.js @@ -0,0 +1,299 @@ +'use strict'; + +const QUEUE_TIMEOUT = 'Metasync: Queue timed out'; + +class Queue { + // Queue constructor + // concurrency - , asynchronous concurrency + constructor(concurrency) { + this.paused = false; + this.concurrency = concurrency; + this.waitTimeout = 0; + this.processTimeout = 0; + this.throttleCount = 0; + this.throttleInterval = 1000; + this.count = 0; + this.tasks = []; + this.waiting = []; + this.factors = {}; + this.fifoMode = true; + this.roundRobinMode = false; + this.priorityMode = false; + this.onProcess = null; + this.onDone = null; + this.onSuccess = null; + this.onTimeout = null; + this.onFailure = null; + this.onDrain = null; + } + + // Set wait before processing timeout + // msec - , wait timeout for single item + // + // Returns: + wait(msec) { + this.waitTimeout = msec; + return this; + } + + // Throttle to limit throughput + // Signature: count[, interval] + // count - , item count + // interval - , per interval, optional + // default: 1000 msec + // + // Returns: + throttle(count, interval = 1000) { + this.throttleCount = count; + this.throttleInterval = interval; + return this; + } + + // Add item to queue + // Signature: item[, factor[, priority]] + // item - , to be added + // factor - | , type, source, + // destination or path, optional + // priority - , optional + // + // Returns: + add(item, factor = 0, priority = 0) { + if (this.priorityMode && !this.roundRobinMode) { + priority = factor; + factor = 0; + } + const task = [item, factor, priority]; + const slot = this.count < this.concurrency; + if (!this.paused && slot && this.onProcess) { + this.next(task); + return this; + } + let tasks; + if (this.roundRobinMode) { + tasks = this.factors[factor]; + if (!tasks) { + tasks = []; + this.factors[factor] = tasks; + this.waiting.push(tasks); + } + } else { + tasks = this.tasks; + } + + if (this.fifoMode) tasks.push(task); + else tasks.unshift(task); + + if (this.priorityMode) { + if (this.fifoMode) { + tasks.sort((a, b) => b[2] - a[2]); + } else { + tasks.sort((a, b) => a[2] - b[2]); + } + } + return this; + } + + // Process next item + // task - , next task [item, factor, priority] + // + // Returns: + next(task) { + const item = task[0]; + let timer; + this.count++; + if (this.processTimeout) { + timer = setTimeout(() => { + const err = new Error(QUEUE_TIMEOUT); + if (this.onTimeout) this.onTimeout(err); + }, this.processTimeout); + } + this.onProcess(item, (err, result) => { + if (this.onDone) this.onDone(err, result); + if (err) { + if (this.onFailure) this.onFailure(err); + } else if (this.onSuccess) { + this.onSuccess(result); + } + if (timer) { + clearTimeout(timer); + timer = null; + } + this.count--; + if (this.tasks.length > 0 || this.waiting.length > 0) { + this.takeNext(); + } else if (this.count === 0 && this.onDrain) { + this.onDrain(); + } + }); + return this; + } + + // Prepare next item for processing + // + // Returns: + takeNext() { + if (this.paused || !this.onProcess) { + return this; + } + let tasks; + if (this.roundRobinMode) { + tasks = this.waiting.shift(); + if (tasks.length > 1) { + this.waiting.push(tasks); + } + } else { + tasks = this.tasks; + } + const task = tasks.shift(); + if (task) this.next(task); + return this; + } + + // This function is not completely implemented yet + // + // Returns: + pause() { + this.paused = true; + return this; + } + + // Resume queue + // This function is not completely implemented yet + // + // Returns: + resume() { + this.paused = false; + return this; + } + + // Clear queue + // + // Returns: + clear() { + this.count = 0; + this.tasks = []; + this.waiting = []; + this.factors = {}; + return this; + } + + // Set timeout interval and listener + // msec - , process timeout for single item + // onTimeout - + // + // Returns: + timeout(msec, onTimeout = null) { + this.processTimeout = msec; + if (onTimeout) this.onTimeout = onTimeout; + return this; + } + + // Set processing function + // fn - + // item - + // callback - + // err - | + // result - + // + // Returns: + process(fn) { + this.onProcess = fn; + return this; + } + + // Set listener on processing done + // fn - , done listener + // err - | + // result - + // + // Returns: + done(fn) { + this.onDone = fn; + return this; + } + + // Set listener on processing success + // listener - , on success + // item - + // + // Returns: + success(listener) { + this.onSuccess = listener; + return this; + } + + // Set listener on processing error + // listener - , on failure + // err - | + // + // Returns: + failure(listener) { + this.onFailure = listener; + return this; + } + + // Set listener on drain Queue + // listener - , on drain + // + // Returns: + drain(listener) { + this.onDrain = listener; + return this; + } + + // Switch to FIFO mode (default for Queue) + // + // Returns: + fifo() { + this.fifoMode = true; + return this; + } + + // Switch to LIFO mode + // + // Returns: + lifo() { + this.fifoMode = false; + return this; + } + + // Activate or deactivate priority mode + // flag - , default: true, false will + // disable priority mode + // + // Returns: + priority(flag = true) { + this.priorityMode = flag; + return this; + } + + // Activate or deactivate round robin mode + // flag - , default: true, false will + // disable roundRobin mode + // + // Returns: + roundRobin(flag = true) { + this.roundRobinMode = flag; + return this; + } + + // Pipe processed items to different queue + // dest - , destination queue + // + // Returns: + pipe(dest) { + if (dest instanceof Queue) { + this.success((item) => void dest.add(item)); + } + return this; + } +} + +// Create Queue instance +// concurrency - , simultaneous and +// asynchronously executing tasks +// +// Returns: +const queue = (concurrency) => new Queue(concurrency); + +module.exports = { queue, Queue }; From 307325466f47b741d9dcd8409639c2c1a57ceac9 Mon Sep 17 00:00:00 2001 From: Timur Sevimli Date: Sun, 4 Feb 2024 14:06:31 +0300 Subject: [PATCH 5/5] Fix typo --- metasync.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metasync.js b/metasync.js index 571ebab..7d32bec 100644 --- a/metasync.js +++ b/metasync.js @@ -1,7 +1,7 @@ 'use strict'; const common = require('@metarhia/common'); -const nodeVerion = common.between(process.version, 'v', '.'); +const nodeVersion = common.between(process.version, 'v', '.'); const submodules = [ 'composition', // Unified abstraction @@ -17,7 +17,7 @@ const submodules = [ 'throttle', // Throttling utilities ].map((path) => require('./lib/' + path)); -if (nodeVerion >= 10) { +if (nodeVersion >= 10) { submodules.push(require('./lib/async-iterator')); }