【问题标题】:Why does this readline async iterator not work properly?为什么这个 readline 异步迭代器不能正常工作?
【发布时间】:2020-07-13 23:08:15
【问题描述】:

这是我在节点 v14.4.0 中提炼成最小的、可重现的示例的更大过程的一部分。在这段代码中,它不会从 for 循环内部输出任何内容。

我在控制台中只看到这个输出:

before for() loop
finished
finally
done

for await (const line1 of rl1) 循环永远不会进入 for 循环 - 它只是跳过它:

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');

async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        await once(stream1, 'open');
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});

        const stream2 = fs.createReadStream(file2);
        await once(stream2, 'open');
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
})

但是,如果我删除了await once(stream, 'open') 语句中的任何一个,那么for 循环将完全按照预期执行(列出rl1 文件的所有行)。因此,显然,来自 readline 接口和流之间的异步迭代器存在一些计时问题。任何想法可能发生的事情。知道是什么导致了这个问题或如何解决它吗?

仅供参考,await once(stream, 'open') 之所以存在,是因为异步迭代器中存在另一个错误,如果打开文件时出现问题,它不会拒绝,而如果文件无法打开,await once(stream, 'open') 会导致您正确获得拒绝打开(基本上是在飞行前打开)。

如果您想知道为什么存在 stream2 代码,它在较大的项目中使用,但我已将此示例缩减为最小的、可重现的示例,并且只需要这么多代码来演示问题.


编辑:在尝试稍微不同的实现时,我发现如果我将两个once(stream, "open") 调用组合在一个Promise.all() 中,它就会起作用。所以,这行得通:

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const stream2 = fs.createReadStream(file2);
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
        // pre-flight file open to catch any open errors here
        // because of existing bug in async iterator with file open errors
        await Promise.all([once(stream1, "open"), once(stream2, "open")]);

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

这显然不应该对您等待文件打开的确切方式敏感。某处存在一些计时错误。我想在 readline 或 readStream 上找到该错误并将其归档。有什么想法吗?

【问题讨论】:

  • 我看到一个现有的May 18th issue 提出了类似的问题。我已将我的示例添加到该问题中。而且,另一个 related issue 从 7 月 5 日开始。
  • 还有,这里的the issue from last December 导致我必须输入await once(stream, 'open') 才能在打开文件时正确捕获错误。
  • 根据您的两个代码 sn-ps 之间的差异,我的第一个想法是在将读取流附加到 readline 接口之前触发了 data 事件。在对流执行某些操作之前等待open 可能会产生影响,因为事件发射器是同步的,而承诺解析不是。
  • @JakeHolzinger - 是否记录了您必须立即使用它而无需任何干预异步操作?而且,为什么 ASYNCHRONOUS 迭代器要求您不能在其他异步操作中使用它?而且,它还有其他错误。因此,我遇到的许多人宁愿自己重新实现逐行处理(或使用外部模块)而不是使用它。当您插入一行代码并且由于一些未记录的时间原因而中断时,这只是一个损坏的设计。它应该被固定为可靠或删除。

标签: javascript node.js promise readline async-iterator


【解决方案1】:

事实证明,根本问题是readline.createInterface() 立即调用它会添加一个data 事件侦听器 (code reference here) 并恢复流以开始流流动。

input.on('data', ondata);

input.resume();

然后,在ondata 监听器中,它解析行的数据,当它找到一行时,它会触发line 事件here

for (let n = 0; n < lines.length; n++)
  this._onLine(lines[n]);

但是,在我的示例中,在调用 readline.createInterface() 和创建异步迭代器(将侦听 line 事件)之间发生了其他异步事情。所以,line 事件正在发出,但还没有任何东西在监听它们。

因此,要使readline.createInterface() 正常工作,必须在调用readline.createInterface() 之后同步添加任何要监听line 事件的内容,否则会出现竞争条件,line events 可能会丢失。


在我的原始代码示例中,一种可靠的解决方法是在我完成 await once(...) 之前不调用 readline.createInterface()。然后,异步迭代器会在readline.createInterface()被调用后同步创建。

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const stream2 = fs.createReadStream(file2);
        // wait for both files to be open to catch any "open" errors here
        // since readline has bugs about not properly reporting file open errors
        // this await must be done before either call to readline.createInterface()
        // to avoid race conditions that can lead to lost lines of data
        await Promise.all([once(stream1, "open"), once(stream2, "open")]);

        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

解决此一般问题的一种方法是更改​​readline.createInterface(),使其不添加data 事件并恢复流,直到有人添加line 事件侦听器。这将防止数据丢失。它将允许 readline 接口对象安静地坐在那里而不会丢失数据,直到其输出的接收器实际准备好。这将适用于异步迭代器,并且还可以防止混入其他异步代码的接口的其他用途可能丢失line 事件。

关于这一点的注释已添加到相关的 open readline 错误问题 here

【讨论】:

  • 你救了我的命。我将在另一个问题上分享这个答案:)
【解决方案2】:

readline 模块也可以使用更现代的流 API 替换为简单的 Transform 流。现代流 API 支持开箱即用的异步迭代器以及背压(例如,流的写入端(文件读取)将暂停,直到流的读取端(行读取)被消耗)。

const fs = require('fs');
const { Transform } = require('stream');

function toLines() {
    let remaining = '';
    return new Transform({
        writableObjectMode: false,
        readableObjectMode: true,
        transform(chunk, encoding, callback) {
            try {
                const lines = (remaining + chunk).split(/\r?\n/g);
                remaining = lines.pop();
                for (const line of lines) {
                    this.push(line);
                }
                callback();
            } catch (err) {
                callback(err);
            }
        },
        flush(callback) {
            if (remaining !== '') {
                this.push(remaining);
            }
            callback();
        }
    });
}


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1, { encoding: 'utf8' });
        const rl1 = stream1.pipe(toLines());

        const stream2 = fs.createReadStream(file2, { encoding: 'utf8' });
        const rl2 = stream2.pipe(toLines());

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

此示例不支持readline 模块的crlfDelay 选项,但可以修改算法以执行类似操作。它还(据我所知)比readline 模块支持的错误处理更好。

【讨论】:

  • 如果你得到一个没有行边界的块或者一个实际上不是整行的块的结尾,这不会有问题吗?
  • 如果chunk 没有行边界,它将附加到前一个remaining(例如remaining=remaining+chunk)。如果flush 被调用,那么remaining 中的任何内容都被视为一行。所以是的,remaining 数据在技术上可能不是一行,因为它可能没有正确的行结尾。你可以想象一个没有行尾的大文本文件,文件中的文本是否存在于该点的一行上?不以行结尾的文件怎么办?最后一段文字是一行吗?
【解决方案3】:

如果您在构造 readline 接口后立即创建异步迭代器,则可以按预期工作。如果您等待创建异步迭代器,您可能会丢失一些行,因为 readline 接口不会缓冲行事件,但借助异步迭代器,它们将被缓冲。

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');

async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        await once(stream1, 'open');
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});

        const rl1Iterator = rl1[Symbol.asyncIterator]();

        const stream2 = fs.createReadStream(file2);
        await once(stream2, 'open');
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1Iterator) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("stream.txt", "stream.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
})

根据 cmets 中的讨论,这可能仍然不是一个理想的解决方案,因为 readline 模块还有其他各种问题,但我想我会添加一个答案来解决原始问题中指出的问题。

【讨论】:

  • 这似乎有效。如果这是主要问题,那么为什么我使用 Promise.all() 的第二个代码示例完全有效?它与您所说的在这里避免的问题相同。我只是不明白为什么我的其他代码示例仍然存在您说要避免的相同问题?这似乎不是问题的全部解释,或者Promise.all() 示例也不起作用。
  • 我不能肯定地说,但它是一个竞争条件(或你所说的时间问题)。也许创建读取流和读取线接口所需的额外 CPU 周期会产生很大的不同。公平地说,如果我将await new Promise((resolve) =&gt; setTimeout(resolve)); 放在循环之前,则存在与第一个示例相同的行为。
  • promise 解析也可能有优化,这样Promise.all() 在一个微任务中得到解析(假设两个文件在promise 解析完成之前打开),而第一个示例需要两个微任务,这表明它会更慢地执行,从而加剧比赛条件。
  • 在查看readline.createInterface() 时,它会立即在流上安装data 侦听器并恢复流。因此,数据将开始流动。我有一个故障调试器,所以当它们到达时我无法确切地看到它对这些数据事件的作用,但它可能会触发 line 事件并且没有人在监听它们。是的,我已经确认了。这是在创建异步迭代器之前是否触发任何line 事件的竞争条件。我不知道为什么 Promise.all() 示例中没有发生这种情况。
  • 解决此问题的更基本方法可能是让 readline 不添加它的 data 侦听器,并且在有人安装 line 侦听器之前不恢复流。这似乎是 readline 接口的一个普遍问题,因为您必须同步安装 line 侦听器,否则您会丢失数据。似乎不必这样设计。
猜你喜欢
  • 2017-02-27
  • 2012-04-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-06-10
  • 2021-01-18
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多