-
Notifications
You must be signed in to change notification settings - Fork 5
/
remember.ts
70 lines (63 loc) · 1.8 KB
/
remember.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import { Operator, StrictSink } from './types';
export function remember<T>(
numRemembered: number = 1,
shouldUnsubscribe: boolean = false
): Operator<T, T> {
return source => {
const marker = {};
let sinks: Array<StrictSink<T> | undefined> = [];
let talkback: any;
let lasts: T[] = Array(numRemembered).fill(marker);
let curr = -1;
let started = false;
const mkTalkback = (sink: StrictSink<T>) => () => {
sinks[sinks.indexOf(sink)] = void 0;
if (shouldUnsubscribe) {
// Allow others to subscribe in the same iteration of the JS event loop
queueMicrotask(() => {
if (sinks.every(x => x === undefined)) {
sinks = [];
talkback(2);
}
});
}
};
return (_, sink) => {
sinks.push(sink);
if (!started) {
source(0, (t, d) => {
if (t === 0) {
started = true;
talkback = d;
for (let i = 0; i < sinks.length; i++) {
const sink = sinks[i]!;
sink(0, mkTalkback(sink));
}
} else {
if (t === 1) {
curr = (curr + 1) % numRemembered;
lasts[curr] = d;
}
let hasDeleted = false;
for (let i = 0; i < sinks.length; i++) {
const sink = sinks[i];
if (sink) sink(t, d);
else hasDeleted = true;
}
if (hasDeleted) {
sinks = sinks.filter(x => x !== undefined);
}
}
});
} else {
sink(0, mkTalkback(sink));
for (let i = 0; i < lasts.length; i++) {
let idx = (curr + 1 + i) % numRemembered;
if (lasts[idx] !== marker) {
sink(1, lasts[idx]);
}
}
}
};
};
}