【问题标题】:Async/Await createReadStream end before read all data : Node.js在读取所有数据之前异步/等待 createReadStream 结束:Node.js
【发布时间】:2021-09-14 02:11:55
【问题描述】:

我正在使用fs.createReadStream 读取上传的 csv 文件数据。当收到数据行时,我正在验证这些数据并将验证数据推送到数组中。

问题是,它在调用end 后收到最后一行。

我已使用四行的 csv 进行波纹管测试。所有四条记录均通过验证。

const stream = fs.createReadStream(filePath)
    .pipe(csv.parse({ headers: true }))
    .on("error", (error) => {
        throw error.message;
    })
    .on("data", async (row) => {
        try {
            stream.pause();
            const mobile = Helper.validateMobile(row.Telephone)

            const validationerrors = await Helper.ValidateCustomer(null, mobile, row.code, "BULK");

            console.log(`validationerrors ---> ${validationerrors} Telephone--> ${row.Telephone}`)

            if (validationerrors.length) {
                throw validationerrors.message

            } else {
                csvData.push(row);
            }
        } finally {
            stream.resume();
        }
    })
    .on("end", async () => {
        console.log("ENDDD ---->", csvData)
    });

Console.log 输出是这样的。最后一行(777223478 行)在on("end") 调用后接收。

    validationerrors --->  Telephone--> 778786516
    validationerrors --->  Telephone--> 718254596 
    validationerrors --->  Telephone--> 712760763 
    
    ENDDD ----> [ 
       { 
        Code: 'CTLD000323',',   
        Telephone: '778786516' }, 
       { 
        Code: 'CTLD000324',  
        Telephone: '718254596' }, 
       { 
        Code: 'CTLD000376',    
        Telephone: '712760763' } 
      ] 
    
    validationerrors --->  Telephone--> 777223478

【问题讨论】:

  • 我想知道是不是因为.pipe(),因此您可能没有暂停正确的流,或者CSV模块没有正确尊重.pause()?您使用的是 NPM 上的哪个 csv 模块,以便我们查看它的代码和文档?
  • Multer 不是您的 csv 实现。该示例使用const csv = require('fast-csv');。你用的是这个吗?
  • 是的,它正在使用内部管道,例如 .pipe(csv.parse({ headers: true })))
  • 在我看来,您使用的 csv 解析器不正确支持 .pause(),即使您暂停了流,它也会继续向您发送缓冲区中已处理的行。我能想到的是,当您想暂停时,您必须缓冲 data 事件,并且在您准备好之前不要处理它们。

标签: node.js async-await stream


【解决方案1】:

因此,CSV 库似乎不遵守 .pause() 方法,如果它有多行排队,它会继续并为它们触发 data 事件,即使流已暂停。我知道的两种解决方法是修改 csv 模块以尊重流暂停或在它们进入时排队,这样当你已经在处理某些东西时,你可以手动暂停它们的处理。而且,您还必须排队 end 事件,因为流将在您完成处理之前发送它。我通过合成我自己的finalEnd 事件来处理结束事件。这是一个队列的粗略实现,它缓冲在流暂停时到达的data 事件。

let paused = false;
const queue = [];
let end = false;

const stream = fs.createReadStream(filePath)
    .pipe(csv.parse({ headers: true }))
    .on("error", (error) => {
        throw error.message;
    })
    .on("data", async (row) => {
        queue.push(row);
        if (!paused) {
            stream.pause();
            paused = true;
            while (queue.length) {
                try {
                    await processRow(queue.shift());
                } catch (e) {
                    // decide what to do here if you get an error processing a row
                    console.log(e);
                }
            }
            paused = false;
            stream.resume();
            if (end) {
                stream.emit("finalEnd");
            }
        }

        async function processRow(row) {
            const mobile = Helper.validateMobile(row.Telephone)

            const validationerrors = await Helper.ValidateCustomer(null, mobile, row.code, "BULK");

            console.log(`validationerrors ---> ${validationerrors} Telephone--> ${row.Telephone}`)

            if (validationerrors.length) {
                throw validationerrors.message

            } else {
                csvData.push(row);
            }
        }

    })
    .on("end", async () => {
        end = true;
        if (!queue.length && !paused) {
            stream.emit("finalEnd");
        }
    }).on("finalEnd", () => {
        console.log("ENDDD ---->", csvData)
    });

【讨论】:

  • 谢谢。现在它被称为 finalEnd 两次,第二次被称为 csvData 数组已完成所有四条记录。
  • @Tje123 - 查看防止双重finalEnd 事件的更新。我必须确保在发出 finalEnd 之前我们还没有暂停并处理最后一行。
  • 谢谢。非常感谢您的帮助。现在它按预期工作了。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-07-13
  • 1970-01-01
相关资源
最近更新 更多