这个 2013 年的问题是我能找到的关于如何处理“背压”的全部内容
创建节点转换流时。
从节点 7.10.0 Transform stream 和 Readable stream 文档中我收集到的内容
是不是一旦push 返回 false,在_read 之前没有其他内容应该被推送
调用。
Transform 文档没有提到 _read,只是提到了基础 Transform
类实现它(和_write)。我找到了关于push返回false的信息
_read 在 Readable stream 文档中被调用。
我在Transform背压上找到的唯一其他权威评论只提到了
这是一个问题,这是在节点文件_stream_transform.js 顶部的评论中。
这是该评论中关于背压的部分:
// This way, back-pressure is actually determined by the reading side,
// since _read has to be called to start processing a new chunk. However,
// a pathological inflate type of transform can cause excessive buffering
// here. For example, imagine a stream where every byte of input is
// interpreted as an integer from 0-255, and then results in that many
// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in
// 1kb of data being output. In this case, you could write a very small
// amount of input, and end up with a very large amount of output. In
// such a pathological inflating mechanism, there'd be no way to tell
// the system to stop doing the transform. A single 4MB write could
// cause the system to run out of memory.
//
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.
解决方案示例
这是我拼凑起来的解决方案,用于处理 Transform 流中的背压
我很确定这是可行的。 (我还没有写任何真正的测试,这需要
写一个Writable流来控制背压。)
这是一个基本的线变换,它需要像线变换一样工作,但确实
演示处理“背压”。
const stream = require('stream');
class LineTransform extends stream.Transform
{
constructor(options)
{
super(options);
this._lastLine = "";
this._continueTransform = null;
this._transforming = false;
this._debugTransformCallCount = 0;
}
_transform(chunk, encoding, callback)
{
if (encoding === "buffer")
return callback(new Error("Buffer chunks not supported"));
if (this._continueTransform !== null)
return callback(new Error("_transform called before previous transform has completed."));
// DEBUG: Uncomment for debugging help to see what's going on
//console.error(`${++this._debugTransformCallCount} _transform called:`);
// Guard (so we don't call _continueTransform from _read while it is being
// invoked from _transform)
this._transforming = true;
// Do our transforming (in this case splitting the big chunk into lines)
let lines = (this._lastLine + chunk).split(/\r\n|\n/);
this._lastLine = lines.pop();
// In order to respond to "back pressure" create a function
// that will push all of the lines stopping when push returns false,
// and then resume where it left off when called again, only calling
// the "callback" once all lines from this transform have been pushed.
// Resuming (until done) will be done by _read().
let nextLine = 0;
this._continueTransform = () =>
{
let backpressure = false;
while (nextLine < lines.length)
{
if (!this.push(lines[nextLine++] + "\n"))
{
// we've got more to push, but we got backpressure so it has to wait.
if (backpressure)
return;
backpressure = !this.push(lines[nextLine++] + "\n");
}
}
// DEBUG: Uncomment for debugging help to see what's going on
//console.error(`_continueTransform ${this._debugTransformCallCount} finished\n`);
// All lines are pushed, remove this function from the LineTransform instance
this._continueTransform = null;
return callback();
};
// Start pushing the lines
this._continueTransform();
// Turn off guard allowing _read to continue the transform pushes if needed.
this._transforming = false;
}
_flush(callback)
{
if (this._lastLine.length > 0)
{
this.push(this._lastLine);
this._lastLine = "";
}
return callback();
}
_read(size)
{
// DEBUG: Uncomment for debugging help to see what's going on
//if (this._transforming)
// console.error(`_read called during _transform ${this._debugTransformCallCount}`);
// If a transform has not pushed every line yet, continue that transform
// otherwise just let the base class implementation do its thing.
if (!this._transforming && this._continueTransform !== null)
this._continueTransform();
else
super._read(size);
}
}
我通过在 ~10000 行上未注释的 DEBUG 行运行它来测试上述内容
~200KB 文件。将 stdout 或 stderr 重定向到一个文件(或两者)以分离调试
来自预期输出的语句。 (node test.js > out.log 2> err.log)
const fs = require('fs');
let inStrm = fs.createReadStream("testdata/largefile.txt", { encoding: "utf8" });
let lineStrm = new LineTransform({ encoding: "utf8", decodeStrings: false });
inStrm.pipe(lineStrm).pipe(process.stdout);
有用的调试提示
最初写这篇文章时,我没有意识到 _read 可以被称为 before
_transform 返回了,所以我没有实现 this._transforming 守卫,我是
收到以下错误:
Error: no writecb in Transform class
at afterTransform (_stream_transform.js:71:33)
at TransformState.afterTransform (_stream_transform.js:54:12)
at LineTransform._continueTransform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:44:13)
at LineTransform._transform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:46:21)
at LineTransform.Transform._read (_stream_transform.js:167:10)
at LineTransform._read (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:56:15)
at LineTransform.Transform._write (_stream_transform.js:155:12)
at doWrite (_stream_writable.js:331:12)
at writeOrBuffer (_stream_writable.js:317:5)
at LineTransform.Writable.write (_stream_writable.js:243:11)
查看节点实现我意识到这个错误意味着回调
给予_transform 的电话不止一次。没有太多信息
也可以找到有关此错误的信息,所以我想我会在此处包含我发现的内容。