diff --git a/lib/queue.class.js b/lib/queue.class.js new file mode 100644 index 0000000..145d5b0 --- /dev/null +++ b/lib/queue.class.js @@ -0,0 +1,299 @@ +'use strict'; + +const QUEUE_TIMEOUT = 'Metasync: Queue timed out'; + +class Queue { + // Queue constructor + // concurrency - , asynchronous concurrency + constructor(concurrency) { + this.paused = false; + this.concurrency = concurrency; + this.waitTimeout = 0; + this.processTimeout = 0; + this.throttleCount = 0; + this.throttleInterval = 1000; + this.count = 0; + this.tasks = []; + this.waiting = []; + this.factors = {}; + this.fifoMode = true; + this.roundRobinMode = false; + this.priorityMode = false; + this.onProcess = null; + this.onDone = null; + this.onSuccess = null; + this.onTimeout = null; + this.onFailure = null; + this.onDrain = null; + } + + // Set wait before processing timeout + // msec - , wait timeout for single item + // + // Returns: + wait(msec) { + this.waitTimeout = msec; + return this; + } + + // Throttle to limit throughput + // Signature: count[, interval] + // count - , item count + // interval - , per interval, optional + // default: 1000 msec + // + // Returns: + throttle(count, interval = 1000) { + this.throttleCount = count; + this.throttleInterval = interval; + return this; + } + + // Add item to queue + // Signature: item[, factor[, priority]] + // item - , to be added + // factor - | , type, source, + // destination or path, optional + // priority - , optional + // + // Returns: + add(item, factor = 0, priority = 0) { + if (this.priorityMode && !this.roundRobinMode) { + priority = factor; + factor = 0; + } + const task = [item, factor, priority]; + const slot = this.count < this.concurrency; + if (!this.paused && slot && this.onProcess) { + this.next(task); + return this; + } + let tasks; + if (this.roundRobinMode) { + tasks = this.factors[factor]; + if (!tasks) { + tasks = []; + this.factors[factor] = tasks; + this.waiting.push(tasks); + } + } else { + tasks = this.tasks; + } + + if (this.fifoMode) tasks.push(task); + else tasks.unshift(task); + + if (this.priorityMode) { + if (this.fifoMode) { + tasks.sort((a, b) => b[2] - a[2]); + } else { + tasks.sort((a, b) => a[2] - b[2]); + } + } + return this; + } + + // Process next item + // task - , next task [item, factor, priority] + // + // Returns: + next(task) { + const item = task[0]; + let timer; + this.count++; + if (this.processTimeout) { + timer = setTimeout(() => { + const err = new Error(QUEUE_TIMEOUT); + if (this.onTimeout) this.onTimeout(err); + }, this.processTimeout); + } + this.onProcess(item, (err, result) => { + if (this.onDone) this.onDone(err, result); + if (err) { + if (this.onFailure) this.onFailure(err); + } else if (this.onSuccess) { + this.onSuccess(result); + } + if (timer) { + clearTimeout(timer); + timer = null; + } + this.count--; + if (this.tasks.length > 0 || this.waiting.length > 0) { + this.takeNext(); + } else if (this.count === 0 && this.onDrain) { + this.onDrain(); + } + }); + return this; + } + + // Prepare next item for processing + // + // Returns: + takeNext() { + if (this.paused || !this.onProcess) { + return this; + } + let tasks; + if (this.roundRobinMode) { + tasks = this.waiting.shift(); + if (tasks.length > 1) { + this.waiting.push(tasks); + } + } else { + tasks = this.tasks; + } + const task = tasks.shift(); + if (task) this.next(task); + return this; + } + + // This function is not completely implemented yet + // + // Returns: + pause() { + this.paused = true; + return this; + } + + // Resume queue + // This function is not completely implemented yet + // + // Returns: + resume() { + this.paused = false; + return this; + } + + // Clear queue + // + // Returns: + clear() { + this.count = 0; + this.tasks = []; + this.waiting = []; + this.factors = {}; + return this; + } + + // Set timeout interval and listener + // msec - , process timeout for single item + // onTimeout - + // + // Returns: + timeout(msec, onTimeout = null) { + this.processTimeout = msec; + if (onTimeout) this.onTimeout = onTimeout; + return this; + } + + // Set processing function + // fn - + // item - + // callback - + // err - | + // result - + // + // Returns: + process(fn) { + this.onProcess = fn; + return this; + } + + // Set listener on processing done + // fn - , done listener + // err - | + // result - + // + // Returns: + done(fn) { + this.onDone = fn; + return this; + } + + // Set listener on processing success + // listener - , on success + // item - + // + // Returns: + success(listener) { + this.onSuccess = listener; + return this; + } + + // Set listener on processing error + // listener - , on failure + // err - | + // + // Returns: + failure(listener) { + this.onFailure = listener; + return this; + } + + // Set listener on drain Queue + // listener - , on drain + // + // Returns: + drain(listener) { + this.onDrain = listener; + return this; + } + + // Switch to FIFO mode (default for Queue) + // + // Returns: + fifo() { + this.fifoMode = true; + return this; + } + + // Switch to LIFO mode + // + // Returns: + lifo() { + this.fifoMode = false; + return this; + } + + // Activate or deactivate priority mode + // flag - , default: true, false will + // disable priority mode + // + // Returns: + priority(flag = true) { + this.priorityMode = flag; + return this; + } + + // Activate or deactivate round robin mode + // flag - , default: true, false will + // disable roundRobin mode + // + // Returns: + roundRobin(flag = true) { + this.roundRobinMode = flag; + return this; + } + + // Pipe processed items to different queue + // dest - , destination queue + // + // Returns: + pipe(dest) { + if (dest instanceof Queue) { + this.success((item) => void dest.add(item)); + } + return this; + } +} + +// Create Queue instance +// concurrency - , simultaneous and +// asynchronously executing tasks +// +// Returns: +const queue = (concurrency) => new Queue(concurrency); + +module.exports = { queue, Queue }; diff --git a/lib/throttle.js b/lib/throttle.js index b634039..c7dbda2 100644 --- a/lib/throttle.js +++ b/lib/throttle.js @@ -8,7 +8,7 @@ // // Returns: const throttle = (timeout, fn, ...args) => { - let timer; + let timer = null; let wait = false; const execute = args @@ -16,7 +16,7 @@ const throttle = (timeout, fn, ...args) => { : (...pars) => (pars ? fn(...pars) : fn()); const delayed = (...pars) => { - timer = undefined; + timer = null; if (wait) execute(...pars); }; @@ -38,12 +38,15 @@ const throttle = (timeout, fn, ...args) => { // fn - , to be debounced // args - , arguments for fn, optional const debounce = (timeout, fn, ...args) => { - let timer; + let timer = null; const debounced = () => (args ? fn(...args) : fn()); const wrapped = () => { - if (timer) clearTimeout(timer); + if (timer) { + clearTimeout(timer); + timer = null; + } timer = setTimeout(debounced, timeout); }; diff --git a/metasync.js b/metasync.js index 571ebab..7d32bec 100644 --- a/metasync.js +++ b/metasync.js @@ -1,7 +1,7 @@ 'use strict'; const common = require('@metarhia/common'); -const nodeVerion = common.between(process.version, 'v', '.'); +const nodeVersion = common.between(process.version, 'v', '.'); const submodules = [ 'composition', // Unified abstraction @@ -17,7 +17,7 @@ const submodules = [ 'throttle', // Throttling utilities ].map((path) => require('./lib/' + path)); -if (nodeVerion >= 10) { +if (nodeVersion >= 10) { submodules.push(require('./lib/async-iterator')); }