【问题标题】:Import CSV Using Mongoose Schema使用 Mongoose 模式导入 CSV
【发布时间】:2018-11-06 17:07:55
【问题描述】:

目前我需要将一个大的 CSV 文件推送到一个 mongo 数据库中,并且值的顺序需要确定数据库条目的键:

CSV 文件示例:

9,1557,358,286,Mutantville,4368,2358026,,M,0,0,0,1,0
9,1557,359,147,Wroogny,4853,2356061,,D,0,0,0,1,0

将其解析为数组的代码:

var fs = require("fs");

var csv = require("fast-csv");

fs.createReadStream("rank.txt")
    .pipe(csv())
    .on("data", function(data){
        console.log(data);
    })
    .on("end", function(data){
        console.log("Read Finished");
    });

代码输出:

[ '9',
  '1557',
  '358',
  '286',
  'Mutantville',
  '4368',
  '2358026',
  '',
  'M',
  '0',
  '0',
  '0',
  '1',
  '0' ]
[ '9',
  '1557',
  '359',
  '147',
  'Wroogny',
  '4853',
  '2356061',
  '',
  'D',
  '0',
  '0',
  '0',
  '1',
  '0' ]

如何将数组插入到我的 mongoose 架构中以进入 mongo db?

架构:

var mongoose = require("mongoose");


var rankSchema = new mongoose.Schema({
   serverid: Number,
   resetid: Number,
   rank: Number,
   number: Number,
   name: String,
   land: Number,
   networth: Number,
   tag: String,
   gov: String,
   gdi: Number,
   protection: Number,
   vacation: Number,
   alive: Number,
   deleted: Number
});

module.exports = mongoose.model("Rank", rankSchema);

数组的顺序需要与模式的顺序相匹配,例如在数组中,第一个数字 9 需要始终保存为它们的键“serverid”等等。我正在使用 Node.JS

【问题讨论】:

  • 不能插入“数组”,需要将其转换为“对象”。通常,您会知道字段应该是什么,无论是预定义的还是通过 CSV 本身的标题行。大多数 CSV 解析器实际上都支持这一点。

标签: javascript node.js mongodb csv mongoose


【解决方案1】:

通过说@Neil Lunn 在 CSV 本身中需要 headerline

使用 csvtojson 模块的示例。

const csv = require('csvtojson');

const csvArray = [];
  csv()
    .fromFile(file-path)
    .on('json', (jsonObj) => {
      csvArray.push({ name: jsonObj.name, id: jsonObj.id });
    })
    .on('done', (error) => {
      if (error) {
        return res.status(500).json({ error});
      }
          Model.create(csvArray)
      .then((result) => {
         return res.status(200).json({result});
      }).catch((err) => {
          return res.status(500).json({ error});
      });
      });
    });

【讨论】:

    【解决方案2】:

    您可以通过从架构定义中获取 headers 来使用 fast-csv 来完成此操作,这会将解析的行作为“对象”返回。您实际上有一些不匹配,因此我已将它们标记为更正:

    const fs = require('mz/fs');
    const csv = require('fast-csv');
    
    const { Schema } = mongoose = require('mongoose');
    
    const uri = 'mongodb://localhost/test';
    
    mongoose.Promise = global.Promise;
    mongoose.set('debug', true);
    
    const rankSchema = new Schema({
      serverid: Number,
      resetid: Number,
      rank: Number,
      name: String,
      land: String,         // <-- You have this as Number but it's a string
      networth: Number,
      tag: String,
      stuff: String,        // the empty field in the csv
      gov: String,
      gdi: Number,
      protection: Number,
      vacation: Number,
      alive: Number,
      deleted: Number
    });
    
    const Rank = mongoose.model('Rank', rankSchema);
    
    const log = data => console.log(JSON.stringify(data, undefined, 2));
    
    (async function() {
    
      try {
        const conn = await mongoose.connect(uri);
    
        await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
    
        let headers = Object.keys(Rank.schema.paths)
          .filter(k => ['_id','__v'].indexOf(k) === -1);
    
        console.log(headers);
    
        await new Promise((resolve,reject) => {
    
          let buffer = [],
              counter = 0;
    
          let stream = fs.createReadStream('input.csv')
            .pipe(csv({ headers }))
            .on("error", reject)
            .on("data", async doc => {
              stream.pause();
              buffer.push(doc);
              counter++;
              log(doc);
              try {
                if ( counter > 10000 ) {
                  await Rank.insertMany(buffer);
                  buffer = [];
                  counter = 0;
                }
              } catch(e) {
                stream.destroy(e);
              }
    
              stream.resume();
    
            })
            .on("end", async () => {
              try {
                if ( counter > 0 ) {
                  await Rank.insertMany(buffer);
                  buffer = [];
                  counter = 0;
                  resolve();
                }
              } catch(e) {
                stream.destroy(e);
              }
            });
    
        });
    
    
      } catch(e) {
        console.error(e)
      } finally {
        process.exit()
      }
    
    
    })()
    

    只要架构实际上与提供的 CSV 一致,就可以了。这些是我可以看到的更正,但是如果您需要以不同方式对齐的实际字段名称,则需要进行调整。但是在有String 的位置基本上有一个Number,并且基本上是一个额外的字段,我假设它是CSV 中的空白字段。

    一般的事情是从架构中获取字段名称数组,并在制作 csv 解析器实例时将其传递给选项:

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);
    
    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }))
    

    一旦你真的这样做了,你就会得到一个“对象”而不是一个数组:

    {
      "serverid": "9",
      "resetid": "1557",
      "rank": "358",
      "name": "286",
      "land": "Mutantville",
      "networth": "4368",
      "tag": "2358026",
      "stuff": "",
      "gov": "M",
      "gdi": "0",
      "protection": "0",
      "vacation": "0",
      "alive": "1",
      "deleted": "0"
    }
    

    不要担心“类型”,因为 Mongoose 会根据模式转换值。

    其余的发生在data 事件的处理程序中。为了获得最大效率,我们使用insertMany() 每 10,000 行只写入一次数据库。这实际上如何进入服务器和进程取决于 MongoDB 版本,但是根据您为单个集合导入的平均字段数,就内存使用的“权衡”和编写合理的网络请求。如有必要,请缩小数字。

    重要的部分是在继续之前将这些调用标记为async 函数和await 的结果insertMany()。此外,我们需要pause() 流和resume() 在每个项目上,否则我们冒着在实际发送之前覆盖要插入的文档的buffer 的风险。 pause()resume() 是在管道上施加“背压”所必需的,否则项目只会不断“出来”并触发 data 事件。

    自然地,对 10,000 个条目的控制要求我们在每次迭代和流完成时都检查以清空缓冲区并将任何剩余的文档发送到服务器。

    这确实是您想要做的,因为您当然不想在通过data 事件的“每次”迭代或基本上不等待每个请求完成的情况下向服务器发出异步请求。如果不检查“非常小的文件”,您将侥幸逃脱,但对于任何现实世界的负载,由于尚未完成的“飞行中”异步调用,您肯定会超出调用堆栈。


    仅供参考 - 使用了 package.jsonmz 是可选的,因为它只是一个现代化的Promise 启用的标准节点“内置”库,我只是习惯使用它。该代码当然可以与fs 模块完全互换。

    {
      "description": "",
      "main": "index.js",
      "dependencies": {
        "fast-csv": "^2.4.1",
        "mongoose": "^5.1.1",
        "mz": "^2.7.0"
      },
      "keywords": [],
      "author": "",
      "license": "ISC"
    }
    

    实际上,使用 Node v8.9.x 及更高版本,我们甚至可以通过 stream-to-iterator 模块实现 AsyncIterator 来简化此操作。它仍然处于Iterator&lt;Promise&lt;T&gt;&gt; 模式,但它应该在 Node v10.x 稳定 LTS 之前运行:

    const fs = require('mz/fs');
    const csv = require('fast-csv');
    const streamToIterator = require('stream-to-iterator');
    
    const { Schema } = mongoose = require('mongoose');
    
    const uri = 'mongodb://localhost/test';
    
    mongoose.Promise = global.Promise;
    mongoose.set('debug', true);
    
    const rankSchema = new Schema({
      serverid: Number,
      resetid: Number,
      rank: Number,
      name: String,
      land: String,
      networth: Number,
      tag: String,
      stuff: String,        // the empty field
      gov: String,
      gdi: Number,
      protection: Number,
      vacation: Number,
      alive: Number,
      deleted: Number
    });
    
    const Rank = mongoose.model('Rank', rankSchema);
    
    const log = data => console.log(JSON.stringify(data, undefined, 2));
    
    (async function() {
    
      try {
        const conn = await mongoose.connect(uri);
    
        await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
    
        let headers = Object.keys(Rank.schema.paths)
          .filter(k => ['_id','__v'].indexOf(k) === -1);
    
        //console.log(headers);
    
        let stream = fs.createReadStream('input.csv')
          .pipe(csv({ headers }));
    
        const iterator = await streamToIterator(stream).init();
    
        let buffer = [],
            counter = 0;
    
        for ( let docPromise of iterator ) {
          let doc = await docPromise;
          buffer.push(doc);
          counter++;
    
          if ( counter > 10000 ) {
            await Rank.insertMany(buffer);
            buffer = [];
            counter = 0;
          }
        }
    
        if ( counter > 0 ) {
          await Rank.insertMany(buffer);
          buffer = [];
          counter = 0;
        }
    
      } catch(e) {
        console.error(e)
      } finally {
        process.exit()
      }
    
    })()
    

    基本上,所有的流“事件”处理以及暂停和恢复都被一个简单的for 循环所取代:

    const iterator = await streamToIterator(stream).init();
    
    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      // ... The things in the loop
    }
    

    简单!当它变得更稳定时,这将在以后的节点实现中使用for..await..of 进行清理。但以上在指定版本及以上版本上运行良好。

    【讨论】:

    • 哇,非常感谢你!我确实对 fs.createReadStream('input.csv') 有另一个问题。该命令是否可以从 Web URL 而不是我服务器上的本地文件获取文件?
    • @Ryansworld84 不是来自fs 命令,因为这意味着“文件系统”不是远程URI。 request 可以做到这一点,并在被要求时返回一个流。如果您还有其他问题,请Ask a New Question
    • 最后一个问题,当它运行时它会清除数据库并重新加载所有内容,有没有办法让它追加更多而不是清除它然后加载 csv?因此,例如,如果它从 CSV 加载 10 行,然后在 10 分钟后再次运行并从 CSV 加载另外 10 行,它是否可以将它们添加到数据库而不删除原始的 10 行并有 20 个条目?
    • @Ryansworld84 这是一个“示例”,并不意味着您可以“准备好使用代码”。根本不要拨打remove()。我这样做只是为了让您可以一遍又一遍地运行“示例”以理解它。
    • 用 4500 行的 csv 文件编写了一个 nodejs 脚本,流到迭代器的解决方案挂起,但第一个解决方案有效。非常感谢@NeilLunn
    猜你喜欢
    • 1970-01-01
    • 2016-02-15
    • 2017-06-07
    • 2015-03-29
    • 1970-01-01
    • 2016-06-07
    • 1970-01-01
    • 2016-12-21
    相关资源
    最近更新 更多