【问题标题】:Nodejs Stream chaining promises write only the first oneNodejs 流链承诺只写第一个
【发布时间】:2020-06-04 16:13:07
【问题描述】:

我正在尝试使用异步生成器创建一个写入多个文件的流。

使用reduce promises 是顺序执行的,所以期望的输出是:

$ wc -l *
42 f1.csv
98 f2.csv
78 f3.csv

但我得到以下结果:

$ wc -l *
42 f1.csv // only the first one is good
0  f2.csv // nothing in
0  f3.csv // same

AsyncGenerator() 是一个 ElasticSearch 滚动搜索 (Doc) 对于每个承诺,我都会向他发送不同的查询。

const { Readable } = require('stream');
const fs = require('fs');
const util = require('util');
const stream = require('stream');
const pipeline = util.promisify(stream.pipeline);

async function* Query(params) {

    var response = await ESClient.search(params);

    while (true) {
        const sourceHits = response.hits.hits;

        if (sourceHits.length === 0) {
            break;
        }

        for (const hit of sourceHits) {
            yield hit;
        }

        if (!response._scroll_id) {
            break;
        }

        response = await ESClient.scroll({
            scrollId: response._scroll_id,
            scroll: params.scroll
        });
    }
}

async function* AsyncGenerator() {

        const params = {
            index: 'apache-logs',
            scroll: '5s',
            _source: ["foo"],
            size: 2048,
            body: { query } // ElasticSearch DSL Query
        };

        for await (const res of Query(params)) {
            yield res;
        }
    }

async function WriteToFile(file) {

        return new Promise(async (resolve, reject) => {
            const input = Readable.from(AsyncGenerator());
            const output = fs.createWriteStream(file + '.csv');

            await pipeline(
                input,
                json2csv,
                output
            );
            resolve();   
        });
}

const lists = ["f1", "f2", "f3"];

const last = lists.reduce(async (previous, next) => {
            await previous;
            return WriteToFile(next);
}, Promise.resolve());

last.then(_ => { 
    console.log("DONE");  
}).catch((e) => {
    console.log(e);
});

(Lorem ipsum lalalaalalal(sry stackoverflow 要求提供更多详细信息,但我没有大声笑))

【问题讨论】:

  • 在您的代码问题中包含您的代码的所有要求(可读、AsyncGenerator 和管道)可能很有用
  • 抱歉,已编辑。 AsyncGenerator 是一个简单的生成器 (yield { foo: "bar" }; ...)。
  • 我们能得到 AsyncGenerator 的完整代码吗?我认为您需要在 WriteToFile 之间重新启动生成器。
  • 已编辑。 asyncGenerator 运行良好,如果我 console.log 在 while(true) 中得到响应,但在 last.then() 中的 console.log("DONE") 之后。所以在我将响应写入文件之前关闭了流?
  • 你可以尝试用一个简单的 for 循环替换 reduce 和 await 吗?看看是reduce问题还是其他功能。

标签: node.js csv promise stream


【解决方案1】:

您好,我不确定,因为我的熟人在 NodeJs 方面很差。我认为您只调用一次创建 .csv 的函数

const lists = ["f1", "f2", "f3"];

const last = lists.reduce(async (previous, next) => {
            await previous;
            return WriteToFile(next);
}, Promise.resolve());
猜你喜欢
  • 2021-02-18
  • 2018-11-03
  • 1970-01-01
  • 2015-09-26
  • 1970-01-01
  • 2020-04-24
  • 2013-11-25
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多