-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdeque-readable-stream.js
53 lines (40 loc) · 1.05 KB
/
deque-readable-stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
'use strict';
const stream = require('stream');
const Deque = require('double-ended-queue');
class DequeReadableStream extends stream.Readable {
constructor(options) {
super(options);
this.readingData = false;
this.buffer = new Deque(options.bufferSize);
}
write(data) {
this.buffer.push(data);
this.startReadingData();
}
end() {
this.buffer.push(null);
}
_read() {
this.startReadingData()
}
startReadingData() {
if (!this.readingData) {
this.readingData = true;
setImmediate(() => this.readData());
}
}
readData() {
const buffer = this.buffer;
while (this.readingData) {
const item = buffer.peekFront();
if (item === undefined) {
this.readingData = false;
} else if (this.push(item)) {
buffer.shift();
} else {
this.readingData = false;
}
}
}
}
module.exports = DequeReadableStream;