【问题标题】:child_process stream backpressurechild_process 流背压
【发布时间】:2016-01-24 23:37:19
【问题描述】:

我将exec-stream 与Node.js 一起使用,并通过其他一些转换流传输该流,最终通过node-brake 流来限制数据速率。制动流似乎没有任何作用,实际上数据最终会在长链的末端丢失。

execStream('some-external-binary').pipe(transform1).pipe(transform2).pipe(brake(1024))

我认为正在发生的事情是child_process STDOUT 流(在exec-stream 内)没有暂停,因此缓冲区会填充直到数据丢失。

child_process 流的行为是否如此?有什么方法可以让背压与child_process 流一起正常工作?

【问题讨论】:

  • 我对 exec-stream 或 node-brake 没有太多经验,但我想用一个完整的例子来探讨你的问题。 gist.github.com/CodeLenny/007523ae5b13559e644f29b46364c1e4 是否与您所经历的行为相匹配?在我看来,刹车没有背压,但是当使用不同的刹车值时,我没有得到不同数量的数据。
  • @RyanLeonard 很有趣。也许这里一直存在的问题是node-brake 没有达到我的预期。是的,这似乎是对问题的一个很好的模拟。您能否将其发布为答案?
  • 我已经写了评论作为答案。

标签: node.js stream stdio


【解决方案1】:

我对 exec-stream 和 node-brake 不够熟悉,无法理解所有数据丢失的途径。

但是,我做了一个小实验,看看 node-brake 是否有背压效应,你提到这可能是数据丢失的潜在区域。

文件也托管在Gist

###
Created for http://stackoverflow.com/questions/34982953/child-process-stream-backpressure
Please pardon the CoffeeScript, but I couldn't stand to extend stream.Transform in native JavaScript.
###
fs = require("fs")
execStream = require("exec-stream")
brake = require("brake")

file = fs.createWriteStream("tmp.txt")

class Double extends require("stream").Transform
  _transform: (chunk, enc, cb) ->
    @_last ?= Date.now()
    @_called ?= []
    @_called.push Date.now() - @_last
    @_last = Date.now()
    @push chunk.toString() + chunk.toString()
    cb()

class UpperCase extends require("stream").Transform
  _transform: (chunk, enc, cb) ->
    @push chunk.toString().toUpperCase()
    cb()

sum = (nums) ->
  o = 0
  o += i for i in nums
  o

doTest = (size) ->

  transform1 = new Double()
  transform2 = new UpperCase()
  transform3 = new Double()

  execStream("dd", ["if=/dev/urandom", "bs=1024", "count=1"])
    .pipe(transform1)
    .pipe(transform2)
    .pipe(brake(size))
    .pipe(transform3)
    .pipe(file)

  file.on "finish", ->
    fs.stat "tmp.txt", (err, stats) ->
      throw err if err
      called1 = transform1._called
      averagePreBrake = sum(called1) / called1.length
      called2 = transform3._called
      averagePostBrake = sum(called2) / called2.length
      console.log """
        Generated with brake(#{size}): #{stats.size}
        Average time between transformations pre-brake:  #{averagePreBrake}ms
        Average time between transformations post-brake: #{averagePostBrake}ms
      """

doTest 1024
doTest 256

该过程的结果如下。

我注意到刹车前的转换之间没有间隙。然而,刹车扰乱了后来的转变。鉴于这些数据,我怀疑 node-brake 没有背压效应。

[我的sh2png 实用程序生成的控制台输出屏幕截图]

【讨论】:

    猜你喜欢
    • 2023-04-04
    • 1970-01-01
    • 1970-01-01
    • 2017-05-19
    • 2019-12-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-05-29
    相关资源
    最近更新 更多