-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathaggregate.js
118 lines (110 loc) · 3.98 KB
/
aggregate.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import { Meteor } from 'meteor/meteor';
import { Mongo } from 'meteor/mongo';
const defaultOptions = ({
collection, options
}) => ({
observeSelector: {},
observeOptions: {},
delay: 250,
lookupCollections: {},
clientCollection: collection._name,
...options
});
export const ReactiveAggregate = function (subscription, collection, pipeline = [], options = {}) {
// fill out default options
const {
observeSelector, observeOptions, delay, lookupCollections, clientCollection
} = defaultOptions({
collection,
options
});
// flag to prevent multiple ready messages from being sent
let ready = false;
// run, or re-run, the aggregation pipeline
const throttledUpdate = _.throttle(Meteor.bindEnvironment(() => {
collection.aggregate(safePipeline).each((err, doc) => {
if (err) {
subscription.error(new Meteor.Error("aggregation-failed", err.message));
}
// when cursor.each is done, it sends null in place of a document - check for that
else if (!doc) {
// remove documents not in the result anymore
_.each(subscription._ids, (iteration, key) => {
if (iteration != subscription._iteration) {
delete subscription._ids[key];
subscription.removed(clientCollection, key);
}
});
subscription._iteration++;
// if this is the first run, mark the subscription ready
if (!ready) {
ready = true;
subscription.ready();
}
}
// cursor is not done iterating, add and update documents on the client
else {
if (!subscription._ids[doc._id]) {
subscription.added(clientCollection, doc._id, doc);
} else {
subscription.changed(clientCollection, doc._id, doc);
}
subscription._ids[doc._id] = subscription._iteration;
}
});
}), delay);
const update = () => !initializing ? throttledUpdate() : null;
// don't update the subscription until __after__ the initial hydrating of our collection
let initializing = true;
// mutate the subscription to ensure it updates as we version it
subscription._ids = {};
subscription._iteration = 1;
// create a list of collections to watch and make sure
// we create a sanitized "strings-only" version of our pipeline
const observerHandles = [createObserver(collection, { observeSelector, observeOptions })];
// look for $lookup collections passed in as Mongo.Collection instances
// and create observers for them
// if any $lookup.from stages are passed in as strings they will be omitted
// from this process. the aggregation will still work, but those collections
// will not force an update to this query if changed.
const safePipeline = pipeline.map((stage) => {
if (stage.$lookup && stage.$lookup.from instanceof Mongo.Collection) {
const collection = stage.$lookup.from;
observerHandles.push(createObserver(collection, lookupCollections[collection._name]));
return {
...stage,
$lookup: {
...stage.$lookup,
from: collection._name
}
};
}
return stage;
});
// observeChanges() will immediately fire an "added" event for each document in the query
// these are skipped using the initializing flag
initializing = false;
// send an initial result set to the client
update();
// stop observing the cursor when the client unsubscribes
subscription.onStop(() => observerHandles.map((handle) => handle.stop()));
/**
* Create observer
* @param {Mongo.Collection|*} collection
* @returns {any|*|Meteor.LiveQueryHandle} Handle
*/
function createObserver(collection, queryOptions = {}) {
const { observeSelector, observeOptions } = queryOptions;
const selector = observeSelector || {};
const options = observeOptions || {};
const query = collection.find(selector, options);
return query.observeChanges({
added: update,
changed: update,
removed: update,
error: (err) => {
throw err;
}
});
}
};