您可以通过从架构定义中获取 headers 来使用 fast-csv 来完成此操作,这会将解析的行作为“对象”返回。您实际上有一些不匹配,因此我已将它们标记为更正:
const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
只要架构实际上与提供的 CSV 一致,就可以了。这些是我可以看到的更正,但是如果您需要以不同方式对齐的实际字段名称,则需要进行调整。但是在有String 的位置基本上有一个Number,并且基本上是一个额外的字段,我假设它是CSV 中的空白字段。
一般的事情是从架构中获取字段名称数组,并在制作 csv 解析器实例时将其传递给选项:
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
一旦你真的这样做了,你就会得到一个“对象”而不是一个数组:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
不要担心“类型”,因为 Mongoose 会根据模式转换值。
其余的发生在data 事件的处理程序中。为了获得最大效率,我们使用insertMany() 每 10,000 行只写入一次数据库。这实际上如何进入服务器和进程取决于 MongoDB 版本,但是根据您为单个集合导入的平均字段数,就内存使用的“权衡”和编写合理的网络请求。如有必要,请缩小数字。
重要的部分是在继续之前将这些调用标记为async 函数和await 的结果insertMany()。此外,我们需要pause() 流和resume() 在每个项目上,否则我们冒着在实际发送之前覆盖要插入的文档的buffer 的风险。 pause() 和 resume() 是在管道上施加“背压”所必需的,否则项目只会不断“出来”并触发 data 事件。
自然地,对 10,000 个条目的控制要求我们在每次迭代和流完成时都检查以清空缓冲区并将任何剩余的文档发送到服务器。
这确实是您想要做的,因为您当然不想在通过data 事件的“每次”迭代或基本上不等待每个请求完成的情况下向服务器发出异步请求。如果不检查“非常小的文件”,您将侥幸逃脱,但对于任何现实世界的负载,由于尚未完成的“飞行中”异步调用,您肯定会超出调用堆栈。
仅供参考 - 使用了 package.json。 mz 是可选的,因为它只是一个现代化的Promise 启用的标准节点“内置”库,我只是习惯使用它。该代码当然可以与fs 模块完全互换。
{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
实际上,使用 Node v8.9.x 及更高版本,我们甚至可以通过 stream-to-iterator 模块实现 AsyncIterator 来简化此操作。它仍然处于Iterator<Promise<T>> 模式,但它应该在 Node v10.x 稳定 LTS 之前运行:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
基本上,所有的流“事件”处理以及暂停和恢复都被一个简单的for 循环所取代:
const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
简单!当它变得更稳定时,这将在以后的节点实现中使用for..await..of 进行清理。但以上在指定版本及以上版本上运行良好。