【问题标题】:What's the proper way to handle back-pressure in a node.js Transform stream?在 node.js 转换流中处理背压的正确方法是什么?
【发布时间】:2014-01-13 04:23:01
【问题描述】:

简介

这是我编写 node.js 服务器端的第一次冒险。它已经 到目前为止很有趣,但我很难理解正确的方法 实现与 node.js 流相关的东西。

问题

出于测试和学习目的,我正在处理大文件 内容是 zlib 压缩的。压缩后的内容是二进制数据,每个 packet 长度为 38 个字节。我正在尝试创建一个结果文件 看起来几乎与原始文件相同,除了有一个 每 1024 个 38 字节 数据包 的未压缩 31 字节标头。

原始文件内容(解压后)

+----------+----------+----------+----------+
| packet 1 | packet 2 |  ......  | packet N |
| 38 bytes | 38 bytes |  ......  | 38 bytes |
+----------+----------+----------+----------+

生成的文件内容

+----------+--------------------------------+----------+--------------------------------+
| header 1 |    1024 38 byte packets        | header 2 |    1024 38 byte packets        |
| 31 bytes |       zlib compressed          | 31 bytes |       zlib compressed          |
+----------+--------------------------------+----------+--------------------------------+

如您所见,这有点翻译问题。意思是,我 将一些源流作为输入,然后稍微转换它 进入一些输出流。因此,自然而然地实施 Transform stream.

该类只是尝试完成以下任务:

  1. 以流为输入
  2. zlib 对数据块进行膨胀以计算数据包的数量, 将其中的 1024 个放在一起,zlib 放气,然后 前置标题。
  3. 通过管道传递新生成的块 this.push(chunk)

用例类似于:

var fs = require('fs');
var me = require('./me'); // Where my Transform stream code sits
var inp = fs.createReadStream('depth_1000000');
var out = fs.createWriteStream('depth_1000000.out');
inp.pipe(me.createMyTranslate()).pipe(out);

问题

假设 Transform 是这个用例的一个不错的选择,我似乎是 遇到可能的背压问题。我打电话给this.push(chunk)_transform 内不断返回false。为什么会这样以及如何 处理这样的事情?

【问题讨论】:

标签: javascript node.js zlib


【解决方案1】:

这个 2013 年的问题是我能找到的关于如何处理“背压”的全部内容 创建节点转换流时。

从节点 7.10.0 Transform streamReadable stream 文档中我收集到的内容 是不是一旦push 返回 false,在_read 之前没有其他内容应该被推送 调用。

Transform 文档没有提到 _read,只是提到了基础 Transform 类实现它(和_write)。我找到了关于push返回false的信息 _readReadable stream 文档中被调用。

我在Transform背压上找到的唯一其他权威评论只提到了 这是一个问题,这是在节点文件_stream_transform.js 顶部的评论中。

这是该评论中关于背压的部分:

// This way, back-pressure is actually determined by the reading side,
// since _read has to be called to start processing a new chunk.  However,
// a pathological inflate type of transform can cause excessive buffering
// here.  For example, imagine a stream where every byte of input is
// interpreted as an integer from 0-255, and then results in that many
// bytes of output.  Writing the 4 bytes {ff,ff,ff,ff} would result in
// 1kb of data being output.  In this case, you could write a very small
// amount of input, and end up with a very large amount of output.  In
// such a pathological inflating mechanism, there'd be no way to tell
// the system to stop doing the transform.  A single 4MB write could
// cause the system to run out of memory.
//
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.

解决方案示例

这是我拼凑起来的解决方案,用于处理 Transform 流中的背压 我很确定这是可行的。 (我还没有写任何真正的测试,这需要 写一个Writable流来控制背压。)

这是一个基本的线变换,它需要像线变换一样工作,但确实 演示处理“背压”。

const stream = require('stream');

class LineTransform extends stream.Transform
{
    constructor(options)
    {
        super(options);

        this._lastLine = "";
        this._continueTransform = null;
        this._transforming = false;
        this._debugTransformCallCount = 0;
    }

    _transform(chunk, encoding, callback)
    {
        if (encoding === "buffer")
            return callback(new Error("Buffer chunks not supported"));

        if (this._continueTransform !== null)
            return callback(new Error("_transform called before previous transform has completed."));

        // DEBUG: Uncomment for debugging help to see what's going on
        //console.error(`${++this._debugTransformCallCount} _transform called:`);

        // Guard (so we don't call _continueTransform from _read while it is being
        // invoked from _transform)
        this._transforming = true;

        // Do our transforming (in this case splitting the big chunk into lines)
        let lines = (this._lastLine + chunk).split(/\r\n|\n/);
        this._lastLine = lines.pop();

        // In order to respond to "back pressure" create a function
        // that will push all of the lines stopping when push returns false,
        // and then resume where it left off when called again, only calling
        // the "callback" once all lines from this transform have been pushed.
        // Resuming (until done) will be done by _read().
        let nextLine = 0;
        this._continueTransform = () =>
            {
                let backpressure = false;
                while (nextLine < lines.length)
                {

                    if (!this.push(lines[nextLine++] + "\n"))
                    {
                        // we've got more to push, but we got backpressure so it has to wait.
                        if (backpressure)
                            return;

                        backpressure = !this.push(lines[nextLine++] + "\n");
                    }
                }

                // DEBUG: Uncomment for debugging help to see what's going on
                //console.error(`_continueTransform ${this._debugTransformCallCount} finished\n`);

                // All lines are pushed, remove this function from the LineTransform instance
                this._continueTransform = null;
                return callback();
            };

        // Start pushing the lines
        this._continueTransform();

        // Turn off guard allowing _read to continue the transform pushes if needed.
        this._transforming = false;
    }

    _flush(callback)
    {
        if (this._lastLine.length > 0)
        {
            this.push(this._lastLine);
            this._lastLine = "";
        }

        return callback();
    }

    _read(size)
    {
        // DEBUG: Uncomment for debugging help to see what's going on
        //if (this._transforming)
        //    console.error(`_read called during _transform ${this._debugTransformCallCount}`);

        // If a transform has not pushed every line yet, continue that transform
        // otherwise just let the base class implementation do its thing.
        if (!this._transforming && this._continueTransform !== null)
            this._continueTransform();
        else
            super._read(size);
    }
}

我通过在 ~10000 行上未注释的 DEBUG 行运行它来测试上述内容 ~200KB 文件。将 stdout 或 stderr 重定向到一个文件(或两者)以分离调试 来自预期输出的语句。 (node test.js &gt; out.log 2&gt; err.log)

const fs = require('fs');
let inStrm = fs.createReadStream("testdata/largefile.txt", { encoding: "utf8" });
let lineStrm = new LineTransform({ encoding: "utf8", decodeStrings: false });
inStrm.pipe(lineStrm).pipe(process.stdout);

有用的调试提示

最初写这篇文章时,我没有意识到 _read 可以被称为 before _transform 返回了,所以我没有实现 this._transforming 守卫,我是 收到以下错误:

Error: no writecb in Transform class
    at afterTransform (_stream_transform.js:71:33)
    at TransformState.afterTransform (_stream_transform.js:54:12)
    at LineTransform._continueTransform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:44:13)
    at LineTransform._transform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:46:21)
    at LineTransform.Transform._read (_stream_transform.js:167:10)
    at LineTransform._read (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:56:15)
    at LineTransform.Transform._write (_stream_transform.js:155:12)
    at doWrite (_stream_writable.js:331:12)
    at writeOrBuffer (_stream_writable.js:317:5)
    at LineTransform.Writable.write (_stream_writable.js:243:11)

查看节点实现我意识到这个错误意味着回调 给予_transform 的电话不止一次。没有太多信息 也可以找到有关此错误的信息,所以我想我会在此处包含我发现的内容。

【讨论】:

    【解决方案2】:

    我认为Transform 适合这种情况,但我会将膨胀作为管道中的一个单独步骤来执行。

    这是一个快速且大部分未经测试的示例:

    var zlib        = require('zlib');
    var stream      = require('stream');
    var transformer = new stream.Transform();
    
    // Properties used to keep internal state of transformer.
    transformer._buffers    = [];
    transformer._inputSize  = 0;
    transformer._targetSize = 1024 * 38;
    
    // Dump one 'output packet'
    transformer._dump       = function(done) {
      // concatenate buffers and convert to binary string
      var buffer = Buffer.concat(this._buffers).toString('binary');
    
      // Take first 1024 packets.
      var packetBuffer = buffer.substring(0, this._targetSize);
    
      // Keep the rest and reset counter.
      this._buffers   = [ new Buffer(buffer.substring(this._targetSize)) ];
      this._inputSize = this._buffers[0].length;
    
      // output header
      this.push('HELLO WORLD');
    
      // output compressed packet buffer
      zlib.deflate(packetBuffer, function(err, compressed) {
        // TODO: handle `err`
        this.push(compressed);
        if (done) {
          done();
        }
      }.bind(this));
    };
    
    // Main transformer logic: buffer chunks and dump them once the
    // target size has been met.
    transformer._transform  = function(chunk, encoding, done) {
      this._buffers.push(chunk);
      this._inputSize += chunk.length;
    
      if (this._inputSize >= this._targetSize) {
        this._dump(done);
      } else {
        done();
      }
    };
    
    // Flush any remaining buffers.
    transformer._flush = function() {
      this._dump();
    };
    
    // Example:
    var fs = require('fs');
    fs.createReadStream('depth_1000000')
      .pipe(zlib.createInflate())
      .pipe(transformer)
      .pipe(fs.createWriteStream('depth_1000000.out'));
    

    【讨论】:

    • 非常感谢您的详细回答,因为它提供了对我正在尝试做的事情的深入了解。我很好奇的是在你的_dump 函数中,当this.push(compressed) 返回false 时你会如何处理?这种情况偶尔会发生,我认为它必须是流上 背压 的副作用。在这些情况下,如何在不丢失数据的情况下恢复?这有意义吗?
    • @ScottSaad 我不完全确定要诚实。我也找不到太多关于在转换流中处理背压问题的信息:(
    • 想让您知道我更新了这个问题,使其更加精确,因为它指的是背压问题,但想感谢您在上述代码中投​​入的时间。这很棒! :)
    • 你能从概念上解释一下这里发生了什么吗?看起来就像代码转储
    • @MrCholo 代码从膨胀的流中读取至少 1024*38 字节,然后以恰好 1024*38 字节的块写回在它前面有一个额外的标题(在这个例子中,字符串'HELLO WORLD')。这些块中的每一个(1024*38 + header.length 字节)都是 zlib 压缩的。
    【解决方案3】:

    push 如果您正在写入的流(在本例中为文件输出流)缓冲的数据过多,将返回 false。由于您正在写入磁盘,因此这是有道理的:您处理数据的速度比写出它的速度要快。

    out 的缓冲区已满时,您的转换流将无法推送,并开始自行缓冲数据。如果该缓冲区应该填满,那么inp 将开始填满。这就是事情应该如何运作的方式。管道流只会以链中最慢的链接可以处理数据的速度处理数据(一旦您的缓冲区已满)。

    【讨论】:

    • 我想当它返回false 时,我对如何处理信息感到困惑。对我来说,如果一个函数有一个返回码(在这种情况下是 truefalse,它会告诉调用者一些重要的信息。从你上面描述的内容来看,忽略返回是完全安全的代码。这就是你的建议吗?如果是,为什么?(糟糕的文档等)?
    • 当你使用管道时,你可以忽略'false'。我相信pipe 旨在在内部处理此问题。如果您想编写自己的自定义背压机制,可以使用 push 返回的值
    【解决方案4】:

    最近遇到了类似的问题,需要处理膨胀转换流中的背压 - 处理 push() 返回 false 的秘诀是在流上注册和处理 'drain' 事件

    _transform(data, enc, callback) {
      const continueTransforming = () => {
        // ... do some work / parse the data, keep state of where we're at etc
        if(!this.push(event)) 
             this._readableState.pipes.once('drain', continueTransforming); // will get called again when the reader can consume more data
        if(allDone)
           callback();
      }
      continueTransforming()
    }
    

    注意,这有点棘手,因为我们正在深入了解内部结构,pipes 甚至可以是 Readables 的数组,但它确实适用于 ....pipe(transform).pipe(... 的常见情况

    如果 Node 社区的某个人能提出一个“正确”的方法来处理 .push() 返回 false,那就太好了

    【讨论】:

    • 现在是 2018 年,今天仍然需要这种解决方法,还是管道会在我的转换内部处理背压?如果是这样,可能会作为一个模块制作一个更通用的解决方案,在转换流遇到背压问题后使用“BackPressure”转换器,它可以选择在固定大小的缓冲区和队列限制处处理块。也许有人已经制作了这样的模块?有关流上背压的更多信息。 nodejs.org/en/docs/guides/backpressuring-in-streams
    • TLDR:当使用管道节点处理背压但使用自定义转换器时,您需要在内部处理它。所以是的,我们需要在内部处理。
    【解决方案5】:

    我最终按照 Ledion 的示例创建了一个实用的 Transform 类来辅助背压。该实用程序添加了一个名为 addData 的异步方法,实现 Transform 可以等待。

    'use strict';
    
    const { Transform } = require('stream');
    
    /**
     * The BackPressureTransform class adds a utility method addData which
     * allows for pushing data to the Readable, while honoring back-pressure.
     */
    class BackPressureTransform extends Transform {
      constructor(...args) {
        super(...args);
      }
    
      /**
       * Asynchronously add a chunk of data to the output, honoring back-pressure.
       *
       * @param {String} data
       * The chunk of data to add to the output.
       *
       * @returns {Promise<void>}
       * A Promise resolving after the data has been added.
       */
      async addData(data) {
        // if .push() returns false, it means that the readable buffer is full
        // when this occurs, we must wait for the internal readable to emit
        // the 'drain' event, signalling the readable is ready for more data
        if (!this.push(data)) {
          await new Promise((resolve, reject) => {
            const errorHandler = error => {
              this.emit('error', error);
              reject();
            };
            const boundErrorHandler = errorHandler.bind(this);
    
            this._readableState.pipes.on('error', boundErrorHandler);
            this._readableState.pipes.once('drain', () => {
              this._readableState.pipes.removeListener('error', boundErrorHandler);
              resolve();
            });
          });
        }
      }
    }
    
    module.exports = {
      BackPressureTransform
    };
    

    使用这个实用程序类,我的变换现在看起来像这样:

    'use strict';
    
    const { BackPressureTransform } = require('./back-pressure-transform');
    
    /**
     * The Formatter class accepts the transformed row to be added to the output file.
     * The class provides generic support for formatting the result file.
     */
    class Formatter extends BackPressureTransform {
      constructor() {
        super({
          encoding: 'utf8',
          readableObjectMode: false,
          writableObjectMode: true
        });
    
        this.anyObjectsWritten = false;
      }
    
      /**
       * Called when the data pipeline is complete.
       *
       * @param {Function} callback
       * The function which is called when final processing is complete.
       *
       * @returns {Promise<void>}
       * A Promise resolving after the flush completes.
       */
      async _flush(callback) {
        // if any object is added, close the surrounding array
        if (this.anyObjectsWritten) {
          await this.addData('\n]');
        }
    
        callback(null);
      }
    
      /**
       * Given the transformed row from the ETL, format it to the desired layout.
       *
       * @param {Object} sourceRow
       * The transformed row from the ETL.
       *
       * @param {String} encoding
       * Ignored in object mode.
       *
       * @param {Function} callback
       * The callback function which is called when the formatting is complete.
       *
       * @returns {Promise<void>}
       * A Promise resolving after the row is transformed.
       */
      async _transform(sourceRow, encoding, callback) {
        // before the first object is added, surround the data as an array
        // between each object, add a comma separator
        await this.addData(this.anyObjectsWritten ? ',\n' : '[\n');
    
        // update state
        this.anyObjectsWritten = true;
    
        // add the object to the output
        const parsed = JSON.stringify(sourceRow, null, 2).split('\n');
        for (const [index, row] of parsed.entries()) {
          // prepend the row with 2 additional spaces since we're inside a larger array
          await this.addData(`  ${row}`);
    
          // add line breaks except for the last row
          if (index < parsed.length - 1) {
            await this.addData('\n');
          }
        }
    
        callback(null);
      }
    }
    
    module.exports = {
      Formatter
    };
    

    【讨论】:

      【解决方案6】:

      我认为,Mike Lippert 的 answer 是最接近事实的。似乎等待从阅读流中再次开始新的_read() 调用是主动通知Transform 阅读器已准备好的唯一方法。我想分享一个简单的例子,说明我如何临时覆盖_read()

      _transform(buf, enc, callback) {
      
        // prepend any unused data from the prior chunk.
        if (this.prev) {
          buf = Buffer.concat([ this.prev, buf ]);
          this.prev = null;
        }
      
        // will keep transforming until buf runs low on data.
        if (buf.length < this.requiredData) {
          this.prev = buf;
          return callback();
        }
      
        var result = // do something with data...
        var nextbuf = buf.slice(this.requiredData);
      
        if (this.push(result)) {
          // Continue transforming this chunk
          this._transform(nextbuf, enc, callback);
        }
        else {
          // Node is warning us to slow down (applying "backpressure")
          // Temporarily override _read request to continue the transform
          this._read = function() {
              delete this._read;
              this._transform(nextbuf, enc, callback);
          };
        }
      }
      

      【讨论】:

        【解决方案7】:

        我试图找到转换源代码中提到的注释,并且参考链接不断更改,所以我将把它留在这里以供参考:

        // a transform stream is a readable/writable stream where you do
        // something with the data.  Sometimes it's called a "filter",
        // but that's not a great name for it, since that implies a thing where
        // some bits pass through, and others are simply ignored.  (That would
        // be a valid example of a transform, of course.)
        //
        // While the output is causally related to the input, it's not a
        // necessarily symmetric or synchronous transformation.  For example,
        // a zlib stream might take multiple plain-text writes(), and then
        // emit a single compressed chunk some time in the future.
        //
        // Here's how this works:
        //
        // The Transform stream has all the aspects of the readable and writable
        // stream classes.  When you write(chunk), that calls _write(chunk,cb)
        // internally, and returns false if there's a lot of pending writes
        // buffered up.  When you call read(), that calls _read(n) until
        // there's enough pending readable data buffered up.
        //
        // In a transform stream, the written data is placed in a buffer.  When
        // _read(n) is called, it transforms the queued up data, calling the
        // buffered _write cb's as it consumes chunks.  If consuming a single
        // written chunk would result in multiple output chunks, then the first
        // outputted bit calls the readcb, and subsequent chunks just go into
        // the read buffer, and will cause it to emit 'readable' if necessary.
        //
        // This way, back-pressure is actually determined by the reading side,
        // since _read has to be called to start processing a new chunk.  However,
        // a pathological inflate type of transform can cause excessive buffering
        // here.  For example, imagine a stream where every byte of input is
        // interpreted as an integer from 0-255, and then results in that many
        // bytes of output.  Writing the 4 bytes {ff,ff,ff,ff} would result in
        // 1kb of data being output.  In this case, you could write a very small
        // amount of input, and end up with a very large amount of output.  In
        // such a pathological inflating mechanism, there'd be no way to tell
        // the system to stop doing the transform.  A single 4MB write could
        // cause the system to run out of memory.
        //
        // However, even in such a pathological case, only a single written chunk
        // would be consumed, and then the rest would wait (un-transformed) until
        // the results of the previous transformed chunk were consumed.
        

        【讨论】:

          猜你喜欢
          • 2017-05-19
          • 2020-12-02
          • 1970-01-01
          • 1970-01-01
          • 2018-12-16
          • 1970-01-01
          • 2012-10-22
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多