【问题标题】:Calculate Average of last X documents in MapReduce计算 MapReduce 中最后 X 个文档的平均值
【发布时间】:2015-06-27 18:02:06
【问题描述】:

我什至知道 mapreduce 是否是我需要的最佳选择。我有一个像这样的猫鼬文档:

currency { 
   Time: Date,
   Interval: Number
}

在我的 mapreduce 工作中,我想计算最后 X 个文档的平均价格(间隔)。 (包括当前)。

如果我将 20 传递给我的方法,我希望对于每个文档,计算最后一个 19 + 当前一个的值除以 20。 任何正确方向的建议或指示将不胜感激。这就是我想要完成的:

function calculateAverages(Schema, interval, avg, callback){
    v
    var o = {};
    o.scope = {interval: interval, avg: avg};

    o.map = function(){

            var value = {
                  Time: this.Date,
                  Interval: this.Interval

            };

            // How am I gonna group the correct number of docs togheter?
            var key= ??

            emit(key, value);


    };
    //  an array of avg ( ex 20) number of items should be passed here
    o.reduce = function(key, intervals){
            var reducedVal = { avg: 0};

            for(var i=0;i<intervals.length;i++){
                reducedVal.avg += intervals[i].Interval;
            }
        reducedVal.avg /= avg;
        return reducedVal;
    };

    o.out = {
      merge: "testing"
    };
    o.finalize = function(key, reducedVal){
        return reducedVal;
    };

    Schema.mapReduce(o, function (err, results) {
        if (err) throw err;

        //console.log(results);
        console.log("mapReduce complete");

        callback(results);
    });
};

【问题讨论】:

  • MapReduce 在这种情况下(根本)不是正确的选择。您正在传递随机数,在这种情况下为 20,然后在运行时您需要基于传递的参数的结果,Map reduce 将在这些情况下工作,例如在您的日志文件中,每毫秒打印一些文本,并且您希望每小时发生某个特定世界的总数(这不会像您的情况那样根据某些参数而改变),因此您可以预先计算并在需要时随身携带只需从该文档返回...对于您的用例,只需按照某人的建议选择聚合管道...

标签: javascript node.js mongodb mapreduce mongoose


【解决方案1】:

您可以使用一个简单的聚合管道来获取平均值,管道将遵循此步骤

  1. 按时间字段对文档进行排序
  2. 限制文档数量
  3. 对文档进行分组并获取平均值

你可以试试下面这段代码,它只是创建 Currency 模型并向 mongodb 发送聚合查询,结果显示在控制台上,你的集合应该在你的 mongodb 服务器上命名为 currency。

var mongoose = require('mongoose');
var db = mongoose.connection;
mongoose.connect('mongodb://localhost/test');

var CurrencySchema = mongoose.Schema({
    Time: Date,
    Interval: Number
}, {collection: 'currency'});

var Currency = mongoose.model('Currency', CurrencySchema);
// You can change the $limit to specify the number of document

db.once('open', function (callback) {
  var pipeline = [
    {$sort: {Time : -1 }},
    {$limit: 5},
    {$group: {_id: null, average : {$avg: "$Interval"}}}
  ];

  Currency.aggregate(pipeline).exec(function(err, data) {
     console.log(data);
  });
});

【讨论】:

  • 感谢您的回答,但我不确定您是否了解我的情况。对于每个文档,我需要计算最后 x 个文档的平均值。因此,如果我有 100k 个文档,并且我指定 avg = 5,对于每个文档,我需要计算(并保存)最后 5 个文档的平均值。(100k 计算)。 ` 示例 db: 1. 01.01.2010 22:35:00 1.32 2. 2 01.01.2010 22:40:00 1.40 3. 01.01.2010.22.45:00 1.33 ` 如果我指定 avg = 3,那么在第三个文档中,我需要保存:(1.32 + 1.40 + 1.33) / 3,到一个名为 avg 的新字段。
  • 管道中的 $limit 选项用于指定文档的数量,并且 $sort 选项用于仅获取最后的文档
【解决方案2】:

如果您想继续使用 MapReduce,我的第一种方法是使用“GROUP”+X 作为键,其中 X 是您在每个组之后递增的数值。在您可以访问的任何地方声明 n=1 并使用 n++ 递增它,当 n % avg = 0 时,X++/n=1 您的发射之后。

对不起,如果我用 Java 输入我的示例,这是我在 MapReduce 中使用的语言。但我想你会很容易理解我的方法:

private int X = 1, n=1;

public map(Text anyKey, YourStructure value){
    int avg = ....; //Recover your avg value here, wherever it is stored.
    emit("GROUP"+X, value);
    if (n % avg == 0){
        X++;
        n=1;
    }else{
        n++;
    }
}

然后你发出:

Doc 1 --> key: GROUP1, value: {.....}
Doc 2 --> key: GROUP1, value: {.....}
...
Doc 19 --> key: GROUP1, value: {.....}
Doc 20 --> key: GROUP1, value: {.....}

Doc 21 --> key: GROUP2, value: {.....}
Doc 22 --> key: GROUP2, value: {.....}
...
Doc 29 --> key: GROUP2, value: {.....}
Doc 30 --> key: GROUP2, value: {.....}

Doc 31 --> key: GROUP3, value: {.....}
...

通过这种方式,您将收到所需的文件数量,以计算您的平均价格。

希望对你有所帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-04-23
    • 2014-01-24
    • 2017-02-06
    相关资源
    最近更新 更多