【发布时间】:2020-06-04 16:13:07
【问题描述】:
我正在尝试使用异步生成器创建一个写入多个文件的流。
使用reduce promises 是顺序执行的,所以期望的输出是:
$ wc -l *
42 f1.csv
98 f2.csv
78 f3.csv
但我得到以下结果:
$ wc -l *
42 f1.csv // only the first one is good
0 f2.csv // nothing in
0 f3.csv // same
AsyncGenerator() 是一个 ElasticSearch 滚动搜索 (Doc) 对于每个承诺,我都会向他发送不同的查询。
const { Readable } = require('stream');
const fs = require('fs');
const util = require('util');
const stream = require('stream');
const pipeline = util.promisify(stream.pipeline);
async function* Query(params) {
var response = await ESClient.search(params);
while (true) {
const sourceHits = response.hits.hits;
if (sourceHits.length === 0) {
break;
}
for (const hit of sourceHits) {
yield hit;
}
if (!response._scroll_id) {
break;
}
response = await ESClient.scroll({
scrollId: response._scroll_id,
scroll: params.scroll
});
}
}
async function* AsyncGenerator() {
const params = {
index: 'apache-logs',
scroll: '5s',
_source: ["foo"],
size: 2048,
body: { query } // ElasticSearch DSL Query
};
for await (const res of Query(params)) {
yield res;
}
}
async function WriteToFile(file) {
return new Promise(async (resolve, reject) => {
const input = Readable.from(AsyncGenerator());
const output = fs.createWriteStream(file + '.csv');
await pipeline(
input,
json2csv,
output
);
resolve();
});
}
const lists = ["f1", "f2", "f3"];
const last = lists.reduce(async (previous, next) => {
await previous;
return WriteToFile(next);
}, Promise.resolve());
last.then(_ => {
console.log("DONE");
}).catch((e) => {
console.log(e);
});
(Lorem ipsum lalalaalalal(sry stackoverflow 要求提供更多详细信息,但我没有大声笑))
【问题讨论】:
-
在您的代码问题中包含您的代码的所有要求(可读、AsyncGenerator 和管道)可能很有用
-
抱歉,已编辑。 AsyncGenerator 是一个简单的生成器 (yield { foo: "bar" }; ...)。
-
我们能得到 AsyncGenerator 的完整代码吗?我认为您需要在 WriteToFile 之间重新启动生成器。
-
已编辑。 asyncGenerator 运行良好,如果我 console.log 在 while(true) 中得到响应,但在 last.then() 中的 console.log("DONE") 之后。所以在我将响应写入文件之前关闭了流?
-
你可以尝试用一个简单的 for 循环替换 reduce 和 await 吗?看看是reduce问题还是其他功能。
标签: node.js csv promise stream