【问题标题】:Calculate a score from an existing fields with conditions根据条件计算现有字段的分数
【发布时间】:2016-10-13 23:47:51
【问题描述】:

我正在开发 MongoDB 2.6.9 和 NodeJs 0.10.37,我有一个集合 vols,这意味着航班。

> db.vols.findOne()
{
    "_id" : ObjectId("5717a5d4578f3f2556f300f2"),
    "Orig" : "AGP",
    "Dest" : "OTP",
    "Flight" : 126,
    "Routing" : "AGP-OTP",
    "Stops" : 0,
    "Seats" : 169,
    "Ops_Week" : 3,
    "Eff_Date" : "2016-04-14",
    "Mkt_Al" : "0B",
    "Dep_Time" : 1110,
    "Thru_Point" : "",
    "Arr_Time" : 1600,
    "Block_Mins" : 230

}

每个文档指的是航空公司完成的一次航班,并提供了详细信息,例如,上一个文档指的是直接完成的航班(Stops : 0)。但是下一个,航班停了。

db.vols.findOne({Stops:1})
{
    "_id" : ObjectId("5717a5d4578f3f2556f301c5"),
    "Orig" : "CEK",
    "Dest" : "IKT",
    "Flight" : 7756,
    "Routing" : "KZN-CEK-OVB-IKT",
    "Stops" : 1,
    "Seats" : 70,
    "Ops_Week" : 2,
    "Eff_Date" : "2016-04-11",
    "Mkt_Al" : "2G",
    "Dep_Time" : 1655,
    "Thru_Point" : "OVB",
    "Arr_Time" : 140,
    "Block_Mins" : 345
}

重要:

每个Airline 在每条路线中都有一个score (Origin - Destination)

如何计算分数?

所以,我需要进行这些计算并将一个新字段“QSI”插入我的集合vols

重要:

c4 中的平均经过时间意味着:

例如,我们有一个停止的航班,比如说:从 AC 的航班由 B,整个航班例如60 分钟,但从 AB 为 20 分钟,从 BC 为 20 分钟,这个平均值应该返回 40 分钟。

我尝试了这个解决方案,但是对于 c4,事情看起来不像是什么工作:

var mongoose = require('mongoose'),
    express  = require('express'),
    Schema   = mongoose.Schema;

mongoose.connect('mongodb://localhost/ramtest');

var volsSchema = new Schema({}, { strict : false, collection : 'vols' });
var MyModel    = mongoose.model("MyModel", volsSchema);

mongoose.set('debug', true);

mongoose.connection.on("open", function(err) {
  if (err) throw err;

  var bulkUpdateOps = MyModel.collection.initializeUnorderedBulkOp(),
      counter       = 0;

  MyModel.find({}).lean().exec(function(err, docs) {
    if (err) throw err;

    docs.forEach(function(doc) {
      // computations
      var c1, c2, c3, c4, qsi, first_leg, second_leg, total_flight;

      c1 = 0.3728 + (0.00454 * doc.Seats);
      c2 = (doc.Stops == 1) ? 0.03 : 1;
      c3 = doc.Ops_Week;

      if (doc.Stops == 1) {
        var Mkt_Air        = doc.Mkt_Al,
            Origin         = doc.Orig,
            Destination    = doc.Dest,
            Thru_Point     = doc.Thru_Point,
            Effective_Date = doc.Eff_Date,
            Block_Mins     = doc.Block_Mins;

        MyModel.find({ Mkt_Al : Mkt_Air }, { Orig : Origin }, { Dest : Thru_Point }, { Eff_Date : Effective_Date }).lean().exec(function(err, docs) {
          docs.forEach(function(doc) {
            var first_leg = doc.Block_Mins;
            MyModel.find({ Mkt_Al : Mkt_Air }, { Orig : Thru_Point }, { Dest : Destination }, { Eff_Date : Effective_Date }).lean().exec(function(err, docs) {
              docs.forEach(function(doc) {
                var second_leg = doc.Block_Mins, total_flight = second_leg + first_leg;
                c4 = Math.pow((Block_Mins / total_flight), -0.675);
                qsi = c1 * c2 * c3 * c4;
              }); // the end of docs.forEach(function (doc){
            }); // the end of MyModel.find..
          }); // the end of docs.forEach(function (doc){
        }); // the end of MyModel.find..
      } // end if
      else {
        c4 = 1;
      }

      qsi = c1 * c2 * c3 * c4;

      counter++;

      bulkUpdateOps.find({ "_id" : doc._id }).updateOne({
        "$set" : { "Qsi" : qsi }
      });

      if (counter % 500 == 0) {
        bulkUpdateOps.execute(function(err, result) {
          if (err) throw err;
          bulkUpdateOps = MyModel.collection.initializeUnorderedBulkOp();
          console.log(result);
          console.log(doc);
        });
      }

    });

    if (counter % 500 != 0) {
      bulkUpdateOps.execute(function(err, result) {
        if (err) throw err;
        console.log(result);
      });
    }
  });

  var app = express();
  app.listen(3000, function() {
    console.log('Ready to calculate and insert the QSI');
  });
});

问题:

我认为问题出在MyModel.find 上,比如如果我在这条指令中丢失数据...,当Stops = 0 时,我的score 计算干净,但如果Stops = 1,我的分数取值为@987654338 @,在像 callback(null, docs) 这样的一些迭代之后我有一个错误,请谁能帮忙??

我怎样才能达到上述目的?

【问题讨论】:

  • 能否请您改进格式。很难遵循缩进。
  • 我现在试着说得太清楚了......
  • @JeanDupont 我冒昧地为您重新格式化。
  • @robertklep 非常感谢

标签: node.js mongodb mongoose mongodb-query


【解决方案1】:

您的实施存在几个问题。首先,您错误地使用了 find() 方法,因为您为查询指定了太多参数:

MyModel.find(
    { Mkt_Al : Mkt_Air }, 
    { Orig : Origin }, 
    { Dest : Thru_Point }, 
    { Eff_Date : Effective_Date }
).lean().exec(function(err, docs) { .. }

应该是

MyModel.find({ 
    Mkt_Al: Mkt_Air, 
    Orig: Origin, 
    Dest: Thru_Point, 
    Eff_Date: Effective_Date 
}).lean().exec(function(err, docs) { ... }

同样,您不应该在这种情况下使用 find() 方法,因为您只需要一个与查询匹配的文档即可在计算中使用。从您之前的封闭问题中获取复杂算法:

现在我想计算一个分数 c4 并将其插入我的收藏:

为此,我应该像这样计算一个值 c4

1) 首先我验证每个文档是否 ( Field2 == 1 ) 如果它是真的我 继续,否则很简单 c4 取值为 1。

2)然后我应该做一个循环“for”并查看哪个文档验证这些 条件:doc.Field1 == this.Field1 && doc.Field6 == this.Field6 && doc.Field7 == this.Field8

3) 然后我将 doc.Field4 添加到另一个文档的 Field4

4) 我继续,我做另一个循环并寻找另一个文件 验证这些条件:

它应该具有相同的 Field1 就像之前的文档及其 Field6 等于上一个文档 Field7 和它的 Field8 相同 作为第一个文档中的 Field8

5) 然后我把 doc.Field4 添加到前面的 doc.Field4

使用MyModel.findOne() 应该足以完成上述任务 3、4 和 5。但是,由于调用的异步特性,您需要嵌套查询,但幸运的是嵌套调用的深度不大于 3,否则您会发现自己有一张通往回调地狱的单程票。为了避免这些常见的陷阱,最好使用 Promises (因为默认情况下本机 mongoose 查询可以返回 Promise)或使用 node-async 包,其中包含许多用于处理此类情况的函数。

如果使用 async 库,它可以有效地让您运行多个相互依赖的异步任务(如 MyModel.findOne() 调用),并在它们都完成后执行其他操作。在上面,您可以使用 async.series() 方法。


以下示例演示了上述概念,您可以从测试数据库中的以下示例文档计算 Qsi

填充测试数据库的 vol 集合:

db.vols.insert([
    {    
        "Mkt_Al" : "2G",
        "Stops" : 0,
        "Seats" : 169,
        "Block_Mins" : 230,                
        "Ops_Week" : 3,        
        "Orig" : "AGP",
        "Dest" : "OTP",
        "Thru_Point" : "",
    },
    {    
        "Mkt_Al" : "2G",
        "Stops" : 1,
        "Seats" : 260,              
        "Block_Mins" : 260,
        "Ops_Week" : 2,  
        "Orig" : "CEK",
        "Dest" : "IKT",
        "Thru_Point" : "OVB",
    },
    {    
        "Mkt_Al" : "2G",
        "Stops" : 0,
        "Seats" : 140,
        "Block_Mins" : 60,
        "Ops_Week" : 2,        
        "Orig" : "BEK",
        "Dest" : "OTP",
        "Thru_Point" : "",
    },
    {    
        "Mkt_Al" : "2G",
        "Stops" : 0,
        "Seats" : 160,
        "Block_Mins" : 90,
        "Ops_Week" : 3,        
        "Orig" : "CEK",
        "Dest" : "OVB",
        "Thru_Point" : "",
    },
    {    
        "Mkt_Al" : "2G",        
        "Stops" : 0,
        "Seats" : 60,
        "Block_Mins" : 50,
        "Ops_Week" : 3,        
        "Orig" : "OVB",
        "Dest" : "IKT",
        "Thru_Point" : "",
    }
])

Node.js 应用:

var mongoose = require('mongoose'),
    express = require('express'),
    async = require('async'),
    Schema = mongoose.Schema;

mongoose.connect('mongodb://localhost/test');
var volSchema = new Schema({},{ strict: false, collection: 'vols' }),    
    Vol = mongoose.model("Vol", volSchema);

mongoose.set('debug', false);

mongoose.connection.on("open", function (err) {
    if (err) throw err;  
    var bulkUpdateOps = Vol.collection.initializeUnorderedBulkOp(), 
        counter = 0;

    Vol.find({}).lean().exec(function (err, docs) {
        if (err) throw err; 
        var locals = {};

        docs.forEach(function(doc) {            
            locals.c1 = 0.3728 + (0.00454 * doc.Seats);         
            locals.c3 = doc.Ops_Week;

            if (doc.Stops == 1) {               
                async.series([
                    // Load doc with first leg first
                    function(callback) {
                        Vol.findOne({ 
                            Mkt_Al: doc.Mkt_Al,
                            Orig: doc.Orig,
                            Dest: doc.Dest                          
                        }).lean().exec(function (err, flight) {
                            if (err) return callback(err);
                            locals.first_leg = flight.Block_Mins;
                            callback();
                        });
                    },
                    // Load second leg doc 
                    // (won't be called before task 1's "task callback" 
                    // has been called)
                    function(callback) {                    
                        Vol.findOne({ 
                            Mkt_Al: doc.Mkt_Al,
                            Orig: doc.Thru_Point,
                            Dest: doc.Dest                          
                        }).lean().exec(function (err, flight) {
                            if (err) return callback(err);
                            locals.second_leg = flight.Block_Mins;
                            callback();
                        });
                    }
                ], function(err) { // This function gets called after the
                    // two tasks have called their "task callbacks"
                    if (err) throw err;
                    // Here locals will be populated with `first_leg` 
                    // and `second_leg`
                    // Just like in the previous example
                    var total_flight = locals.second_leg + locals.first_leg;                    
                    locals.c2 = 0.03;
                    locals.c4 = Math.pow((doc.Block_Mins / total_flight), -0.675);                    

                }); 
            } else {
                locals.c2 = 1;
                locals.c4 = 1;
            }

            counter++;
            console.log(locals);
            bulkUpdateOps.find({ "_id" : doc._id }).updateOne({ 
                "$set": { 
                    "Qsi": (locals.c1 * locals.c2 * locals.c3 * locals.c4) 
                } 
            });

            if (counter % 500 == 0) {
               bulkUpdateOps.execute(function(err, result) {          
                    if (err) throw err; 
                    bulkUpdateOps = Vol.collection.initializeUnorderedBulkOp();                        
                });
            } 
        });

        if (counter % 500 != 0) {
            bulkUpdateOps.execute(function(err, result) {
                if (err) throw err; 
                console.log(result.nModified);                
            });
        }   
    });
});

样本输出:

db.vols.find()

/* 1 */
{
    "_id" : ObjectId("5767e7549ebce6d574702221"),
    "Mkt_Al" : "2G",
    "Stops" : 0,
    "Seats" : 169,
    "Block_Mins" : 230,
    "Ops_Week" : 3,
    "Orig" : "AGP",
    "Dest" : "OTP",
    "Thru_Point" : "",
    "Qsi" : 3.42018
}

/* 2 */
{
    "_id" : ObjectId("5767e7549ebce6d574702222"),
    "Mkt_Al" : "2G",
    "Stops" : 1,
    "Seats" : 260,
    "Block_Mins" : 260,
    "Ops_Week" : 2,
    "Orig" : "CEK",
    "Dest" : "IKT",
    "Thru_Point" : "OVB",
    "Qsi" : 3.1064
}

/* 3 */
{
    "_id" : ObjectId("5767e7549ebce6d574702223"),
    "Mkt_Al" : "2G",
    "Stops" : 0,
    "Seats" : 140,
    "Block_Mins" : 60,
    "Ops_Week" : 2,
    "Orig" : "BEK",
    "Dest" : "OTP",
    "Thru_Point" : "",
    "Qsi" : 2.0168
}

/* 4 */
{
    "_id" : ObjectId("5767e7549ebce6d574702224"),
    "Mkt_Al" : "2G",
    "Stops" : 0,
    "Seats" : 160,
    "Block_Mins" : 90,
    "Ops_Week" : 3,
    "Orig" : "CEK",
    "Dest" : "OVB",
    "Thru_Point" : "",
    "Qsi" : 3.2976
}

/* 5 */
{
    "_id" : ObjectId("5767e7549ebce6d574702225"),
    "Mkt_Al" : "2G",
    "Stops" : 0,
    "Seats" : 60,
    "Block_Mins" : 50,
    "Ops_Week" : 3,
    "Orig" : "OVB",
    "Dest" : "IKT",
    "Thru_Point" : "",
    "Qsi" : 1.9356
}

【讨论】:

  • 非常感谢 Chridam,我在这里遇到了一个错误:bulkUpdateOps.push({TypeError: Object [object Object] has no method 'push'
  • 现在我又来了……if (err) throw err; ^ MongoError: operations must be an array of documents
  • 那么这意味着没有找到与查询匹配的文档。我认真地认为你可以从这里开始,做一些空值检查等。我上面提供的示例仅用于演示概念,不能替代生产代码。现在由您来重构或优化代码以满足您的核心需求。我想我已经做了足够多的事情来推动你朝着正确的方向前进。您需要了解这里的概念,而不仅仅是复制和粘贴您提供的代码。
  • 非常感谢@chridam,您非常乐于助人和严谨
  • @JeanDupont 您可能是对的,试一试,如果您遇到任何进一步的障碍,请创建一个新问题。
【解决方案2】:

问题在于,在Stops == 1 的情况下,您正在进行异步调用,在使用它计算qsi 之前不会设置c4 的值。相关块在这里:

  if (doc.Stops == 1) {
    // do some stuff
    MyModel.find({/* some query */}).lean().exec(function(err, docs) {
      // SITE A: this function will not be called for a few milliseconds
      // do some more stuff
      c4 = Math.pow((Block_Mins / total_flight), -0.675);
      qsi = c1 * c2 * c3 * c4;
    }); // the end of MyModel.find..
  } // end if
  else {
    c4 = 1;
  }

  qsi = c1 * c2 * c3 * c4; // SITE B: this will be called before SITE A, and qsi will be NaN since c4 is not initialized
  bulkUpdateOps.find({ "_id" : doc._id }).updateOne({
    "$set" : { "Qsi" : qsi } // qsi here will be from SITE B, and SITE A still hasn't been reached yet
  });

一个粗略的解决方法是做类似的事情

  function setQsi(qsi) {
    bulkUpdateOps.find({ "_id" : doc._id }).updateOne({
      "$set" : { "Qsi" : qsi }
    });
  }

  if (doc.Stops == 1) {
    // do some stuff
    MyModel.find({/* some query */}).lean().exec(function(err, docs) {
      // do some more stuff
      c4 = Math.pow((Block_Mins / total_flight), -0.675);
      setQsi(c1 * c2 * c3 * c4);
    }); // the end of MyModel.find..
  } // end if
  else {
    c4 = 1;
    setQsi(c1 * c2 * c3 * c4);
  }

对于更复杂的异步模式,您应该考虑使用Promises

【讨论】:

  • 我应该把这段代码放在哪里:“function setQsi(qsi) { ...”?
  • 我打算把它放在if (doc.Stops == 1) 之前。但是我没有注意到您还有诸如稍后执行批量更新操作之类的事情也会出现并发问题,因此这可能是问题的一部分。例如,可能是您没有重置数据库中的 Qsi 值,因此由于批量更新问题,一些旧的 NaNs 没有被覆盖。稍后我会尝试相应地编辑我的答案,但与此同时,您至少了解这个问题吗?
  • 是的,我想是的,我不知道是否还有其他方法可以使事情正常进行,map reduce 或其他...
  • 我不知道是否有人可以修复它,或者只是对我的 mongodb-query 中的解决方案有不同的想法......
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-11-02
  • 2019-04-09
  • 1970-01-01
  • 1970-01-01
  • 2021-07-14
  • 2021-10-24
  • 1970-01-01
相关资源
最近更新 更多