-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathindex.js
174 lines (151 loc) · 5.73 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
var assert = require('assert'),
assign = require('object-assign'),
onerr = require('on-error'),
eventuate = require('eventuate'),
once = require('once'),
Promise = require('promise-polyfill'),
after = require('afterward'),
setImmediate = require('timers').setImmediate,
MaxSizeExceededError = require('./errors').MaxSizeExceededError
module.exports = function () {
var pending = [],
processing = [],
maxSize = Infinity,
softMaxSize = Infinity,
concurrency = Infinity,
drained = true,
processor
function cq (item, cb) {
var done = new Promise(function (resolve, reject) {
if (pending.length >= maxSize) {
var err = new MaxSizeExceededError('unable to queue item')
reject(err)
return cq.rejected.produce({ item: item, err: err })
}
if (pending.length >= softMaxSize) cq.softLimitReached.produce({ size: pending.length })
drained = false
setImmediate(drain)
pending.push({
item : item,
resolve: onResolve,
reject : onReject
})
cq.enqueued.produce({ item: item })
function onResolve (value) {
resolve(value)
cq.processingEnded.produce({ item: item, result: value })
}
function onReject (err) {
reject(err)
cq.processingEnded.produce({ item: item, err: err })
}
})
return after(done, cb)
}
Object.defineProperties(cq, {
size : { get: getSize, enumerable: true },
isDrained : { get: getIsDrained, enumerable: true },
pending : { get: getPending, enumerable: true },
processing : { get: getProcessing, enumerable: true },
concurrency : { get: getConcurrency, set: setConcurrency, enumerable: true },
maxSize : { get: getMaxSize, set: setMaxSize, enumerable: true },
softMaxSize : { get: getSoftMaxSize, set: setSoftMaxSize, enumerable: true },
processor : { get: getProcessor },
limit : { value: limit },
process : { value: process },
enqueued : { value: eventuate() },
rejected : { value: eventuate() },
softLimitReached : { value: eventuate() },
processingStarted: { value: eventuate() },
processingEnded : { value: eventuate() },
drained : { value: eventuate() }
})
return cq
function drain () {
if (!drained && pending.length === 0 && processing.length === 0) {
drained = true
cq.drained.produce()
}
while (processor && pending.length > 0 && processing.length < concurrency) drainItem()
function drainItem () {
var task = pending.shift()
processing.push(task)
cq.processingStarted.produce({ item: task.item })
var reject = once(function reject (err) {
processing.splice(processing.indexOf(task), 1)
task.reject(err)
setImmediate(drain)
})
var resolve = once(function resolve () {
processing.splice(processing.indexOf(task), 1)
task.resolve.apply(undefined, arguments)
setImmediate(drain)
})
var p
try {
p = processor(task.item, onerr(reject).otherwise(resolve))
}
catch (err) {
return reject(err)
}
if (p && typeof p.then === 'function') p.then(resolve, reject)
}
}
function process (func) {
if (typeof func !== 'function') throw new TypeError('process requires a processor function')
assert(!processor, 'queue processor already defined')
processor = func
setImmediate(drain)
return cq
}
function limit (limits) {
limits = assign({ concurrency: Infinity, maxSize: Infinity, softMaxSize: Infinity }, limits)
if (typeof limits.maxSize !== 'number') throw new TypeError('maxSize must be a number')
if (typeof limits.softMaxSize !== 'number') throw new TypeError('softMaxSize must be a number')
if (typeof limits.concurrency !== 'number') throw new TypeError('concurrency must be a number')
maxSize = limits.maxSize
softMaxSize = limits.softMaxSize
concurrency = limits.concurrency
return cq
}
function getSize () {
return pending.length
}
function getIsDrained () {
return drained
}
function getPending () {
return pending.map(function (task) {
return task.item
})
}
function getProcessing () {
return processing.map(function (task) {
return task.item
})
}
function getConcurrency () {
return concurrency
}
function setConcurrency (value) {
if (typeof value !== 'number') throw new TypeError('concurrency must be a number')
concurrency = value
}
function getMaxSize () {
return maxSize
}
function setMaxSize (value) {
if (typeof value !== 'number') throw new TypeError('maxSize must be a number')
maxSize = value
}
function getSoftMaxSize () {
return softMaxSize
}
function setSoftMaxSize (value) {
if (typeof value !== 'number') throw new TypeError('softMaxSize must be a number')
softMaxSize = value
}
function getProcessor () {
return processor
}
}