【问题标题】:Creating synchronous queries with node-mysql使用 node-mysql 创建同步查询
【发布时间】:2014-07-28 19:53:23
【问题描述】:

我正在尝试确保一个 mysql 查询导致另一个查询,并且在其所有子查询完成之前不会完成。例如,我从一个选择和流行开始,然后从该行结果执行后续查询。这对于回调是可行的,但我最终会耗尽内存,所以我想减慢进程并运行批处理,但由于调度的异步性质,我无法保持同步并结束处理完所有行后连接。

这是一个例子:

var query = conn.query('select id from table1 limit 10');

query.on('result', function(row){
    console.log('query1', row);
    var query2 = conn.query('select id from books where id  = ? ', [row.id]);
    query2.on('result', function(row2){
        console.log('query2', row2);
        var query3 = conn.query('insert into test (id) values (?)', [row2.id]);
        query3.on('result', function(row3){
            console.log(row3);
        });
    });
});

query.on('end', function(){
    conn.end();
});

上述失败,因为在初始查询结束后,query3 中仍有行要处理。
有什么想法吗?实际代码更加复杂,因为我必须处理来自后续查询的 xml,并在循环批处理时触发更多插入。

谢谢!

【问题讨论】:

    标签: mysql node.js node-mysql


    【解决方案1】:

    这就是我所做的,

    db.query(
        "select name from USER where name = ?",
        ["test"],
        (err, result) => {
          if (err) {
            console.log("Error : ", err);
          } else if (result.length <= 0) {
            res.json("Not Found");
          } else {
            console.log("name found, executing update query!");
            updateAgeIfUserFound("test");  //Calling funtion with 2nd query
          }
        }
      );
    
      //Update age only if name is present
      function updateAgeIfUserFound(name, age) {
        if (name) {
          db.query(
            "update USER set age = ? where name = ?,
            [age, name],
            (err, result) => {
              if (err) throw err;
              console.log("Name Updated");
              res.json("Name Updated");
            }
          );
        }
      }
    

    【讨论】:

      【解决方案2】:

      在我看来,最好的解决方案是以一种非常简单的方式使代码同步。

      你可以使用“synchonize”包。

      只是

      npm 安装同步

      然后var sync = require(synchronize);

      使用

      将应该同步的逻辑放入纤程中

      sync.fiber(function() { //put your logic here }

      两个mysql查询的例子:

      var express = require('express');
      var bodyParser = require('body-parser');
      var mysql = require('mysql');
      var sync = require('synchronize');
      
      var db = mysql.createConnection({
          host     : 'localhost',
          user     : 'user',
          password : 'password',
          database : 'database'
      });
      
      db.connect(function(err) {
          if (err) {
              console.error('error connecting: ' + err.stack);
              return;
          }
      });
      
      function saveSomething() {
          var post  = {id: newId};
          //no callback here; the result is in "query"
          var query = sync.await(db.query('INSERT INTO mainTable SET ?', post, sync.defer()));
          var newId = query.insertId;
          post  = {foreignKey: newId};
          //this query can be async, because it doesn't matter in this case
          db.query('INSERT INTO subTable SET ?', post, function(err, result) {
              if (err) throw err;
          });
      }
      

      当调用“saveSomething()”时,它会在主表中插入一行并接收最后插入的 id。之后将执行下面的代码。不需要嵌套承诺或类似的东西。

      【讨论】:

        【解决方案3】:

        @glukki,感谢您对异步的出色回答和参考。我对您的代码和两个异步请求进行了排列,它们使用单​​个连接和连接池进行“咀嚼和咀嚼”,以将超过 100K 行选择处理成 1.2M 行插入。效果非常好,不到 10 分钟。这是减去模块和连接设置的完整实现。我希望这对其他人也有帮助。再次感谢!

        function populateMesh(row, callback){    
        
            xmlParser.parseString('<root>'+row.mesh_heading_list+'</root>', function(err, result){
        
                var q2 = async.queue(function (task, cb) {
        
                    pool.getConnection(function(err, cnx){
                        cnx.query('INSERT INTO abstract_mesh (mesh_id, abstract_id, major_topic) SELECT mesh_descriptor.id, ?, ? FROM mesh_descriptor WHERE mesh_descriptor.name = ?', [task.id, task.majorTopic, task.descriptorName], function(err, result){
                            if (err) {throw err;}
                            cnx.release();
                            cb();
                        });
                    });
        
                }, 50);
        
                q2.drain = function() {
                    //console.log('all mesh processed');
                    callback();
                }
        
                if(!(result.root instanceof Object)){
                    //console.log('its not obj!', row.id);
                    q2.push({id: row.id, majorTopic: 'N', descriptorName: 'Null'}, function (err) {});
                }
        
                for(var i in result.root.MeshHeading){
        //            console.log('in loop',result.root.MeshHeading[i].DescriptorName);
                    if(typeof result.root.MeshHeading[i].DescriptorName === 'undefined'){
                        q2.push({id: row.id, majorTopic: 'N', descriptorName: 'Emergency'}, function(err){});
                    }
        
                    for(var j in result.root.MeshHeading[i].DescriptorName){
        
                        var descriptorName = result.root.MeshHeading[i].DescriptorName[j]._;
                        var majorTopic = result.root.MeshHeading[i].DescriptorName[j].$.MajorTopicYN;
        
                        q2.push({id: row.id, majorTopic: majorTopic, descriptorName: descriptorName}, function (err) {});
        
                    }
                }
            });       
        
        }
        
        
        // here goes task serving logic
        // if any async function should be finished before drain callback, push them into q
        var q = async.queue(function (row, callback) {
                console.log('got id: ' + row.id);
                populateMesh(row, function(){
                    callback();
                });
        
            }, 10);
        
            q.drain = function() {
                console.log('all items have been processed');
                conn.end(function(err){
                    console.log('connection ended');
                });
                pool.end(function(err){
                    console.log('pool closed');
                });
            };
        
        var truncate = conn.query('truncate abstract_mesh');
        
        var select = conn.query('SELECT id, mesh_heading_list FROM pubtbl');
        
            select.on('result', function(result){
        //        console.log(result);
                q.push(result, function (err) {
                    //console.log('finished processing row');
                });
            });
        

        【讨论】:

          【解决方案4】:

          我建议使用 async 模块来解决这个问题:

          var async = require("async");
          // connection instance
          var conn;
          
          // here goes task serving logic
          // if any async function should be finished before drain callback, push them into q
          var solvers = {
              query: function(q, task, row){
                  console.log('query1', row);
                  q.push({
                      solver: "query2",
                      req: "select id from books where id = ?",
                      reqArgs: [row.id]
                  });
              },
              query2: function(q, task, row){
                  console.log('query2', row);
                  q.push({
                      solver: "query3",
                      req: "insert into test (id) values (?)",
                      reqArgs: [row.id]
                  });
              },
              query3: function(q, task, row){
                  console.log(row);
              }
          }
          
          // here is a queue of tasks
          var q = async.queue(function(task, cb){
              var query = conn.query(task.req, task.reqArgs);
              query.on("end", cb);
              query.on("result",function(row){
                  solvers[task.solver](q, task, row);
              });
          }, 2); // limit of parallel queries
          
          // when every request has reached "end"
          q.drain = function(){
              conn.end();
              // continue from here
          };
          
          // initial task
          q.push({
              solver: "query",
              req: "select id from table1 limit 10",
              reqArgs: []
          });
          

          但是,我仍然不确定按 ID 发出请求是一个好的解决方案。
          也许,我只是没有意识到一个完整的问题。

          【讨论】:

            猜你喜欢
            • 2014-05-06
            • 1970-01-01
            • 2023-03-28
            • 2017-06-06
            • 2020-11-03
            • 1970-01-01
            • 2018-04-07
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多