为此使用 aggregation framework,但由于 Meteor 尚不支持聚合,因此您需要安装聚合框架包 - 它没有做任何花哨的事情,只是包装为你准备了一些 Mongo 方法。
只需添加流星 meteorhacks:aggregate,您就可以开始营业了。这将为 Meteor 添加适当的聚合支持。
现在您需要此管道来实现所需的结果。在 mongo shell 中,运行以下命令:
var pipeline = [
{
"$group": {
"_id": "$name",
"quantity": { "$sum": "$quantity" }
}
},
{
"$project": {
"name": "$_id", "_id": 0, "quantity": 1
}
}
];
db.sales.aggregate(pipeline);
样本输出
{
"result" : [
{
"quantity" : 7,
"name" : "Pizza"
},
{
"quantity" : 3,
"name" : "Hot Coffee"
}
],
"ok" : 1
}
此聚合操作的作用是使用 $group 管道步骤按 name 字段对所有文档进行分组,并且对于每个不同的组,您可以使用累加器运算符 $sum 对每组的数值字段数量。下一个管道 $project 会将先前管道流文档中的字段重塑为所需的结构。
在 Meteor 中,您可以使用以下模式将这些结果发布到客户端的 Sales 集合,前提是您已将聚合包添加到您的流星应用程序:
Meteor.publish('getDistinctSalesWithTotalQuantity', function (opts) {
var initializing = 1;
function run(action) {
// Define the aggregation pipeline ( aggregate(pipeline) )
var pipeline = [
{
"$group": {
"_id": "$name",
"quantity": { "$sum": "$quantity" }
}
},
{
"$project": {
"name": "$_id", "_id": 0, "quantity": 1
}
}
];
Sales.aggregate(pipeline).forEach(function(e){
this[action]('distinct-sales', e.name, e)
this.ready()
});
};
// Run the aggregation initially to add some data to your aggregation collection
run('added');
// Track any changes on the collection we are going to use for aggregation
var handle = Sales.find({}).observeChanges({
added(id) {
// observeChanges only returns after the initial `added` callbacks
// have run. Until then, we don't want to send a lot of
// `self.changed()` messages - hence tracking the
// `initializing` state.
if (initializing && initializing--) run('changed');
},
removed(id) {
run('changed');
},
changed(id) {
run('changed');
},
error(err) {
throw new Meteor.Error('Aaaaaaaaah! Grats! You broke it!', err.message)
}
});
// Stop observing the cursor when client unsubs.
// Stopping a subscription automatically takes
// care of sending the client any removed messages.
this.onStop(function () {
handle.stop();
});
});
以上观察变化,如有必要,重新运行聚合。