【问题标题】:Nested stream operations in Highland.jsHighland.js 中的嵌套流操作
【发布时间】:2015-02-27 13:14:35
【问题描述】:

我有来自readdirp 模块的目录流。

我想:-

  • 在每个目录中使用正则表达式(例如README.*)搜索文件
  • 读取该文件的第一行不以# 开头
  • 打印出每个目录和目录中自述文件的第一个非标题行。

我正在尝试使用流和highland.js 来做到这一点。

我一直在尝试处理每个目录中的所有文件流。

h = require 'highland'

dirStream = readdirp root: root, depth: 0, entryType: 'directories'

dirStream = h(dirStream)
  .filter (entry) -> entry.stat.isDirectory()
  .map (entry) ->

    # Search all files in the directory for README.
    fileStream = readdirp root: entry.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store'
    fileStream = h(fileStream).filter (entry) -> /README\..*/.test entry.name
    fileStream.each (file) ->
      readmeStream = fs.createReadStream file
      _(readmeStream)
        .split()
        .takeUntil (line) -> not line.startsWith '#' and line isnt ''
        .last(1)
        .toArray (comment) ->
          # TODO: How do I access `comment` asynchronously to include in the return value of the map?

    return {name: entry.name, comment: comment}

【问题讨论】:

    标签: node.js stream highland.js


    【解决方案1】:

    最好将 Highland 流视为不可变的,并且像 filtermap 这样的操作返回依赖于旧流的新流,而不是对旧流的修改。

    此外,Highland 方法很懒惰:只有当您现在绝对需要数据时,才应该调用eachtoArray

    异步映射流的标准方法是flatMap。就像map,但你给它的函数应该返回一个流。您从flatMap 获得的流是所有返回流的串联。由于新流依序依赖于所有旧流,因此可用于对异步进程进行排序。

    我会将您的示例修改为以下内容(澄清了一些变量名称):

    h = require 'highland'
    
    readmeStream = h(readdirp root: root, depth: 0, entryType: 'directories')
      .filter (dir) -> dir.stat.isDirectory()
      .flatMap (dir) ->
        # Search all files in the directory for README.
        h(readdirp root: dir.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store')
        .filter (file) -> /README\..*/.test file.name
        .flatMap (file) ->
          h(fs.createReadStream file.name)
            .split()
            .takeUntil (line) -> not line.startsWith '#' and line isnt ''
            .last(1)
            .map (comment) -> {name: file.name, comment}
    

    让我们来看看这段代码中的类型。首先,请注意flatMap 的类型(用 Haskellish 表示法)Stream a → (a → Stream b) → Stream b,即它接受一个包含 a 类型的东西的流,以及一个期望类型为 a 的东西并返回包含 bs 的流的函数,并返回一个包含bs 的流。集合类型(例如流和数组)将flatMap 实现为连接返回的集合是标准的。

    h(readdirp root: root, depth: 0, entryType: 'directories')
    

    假设它的类型为Stream Directoryfilter 不会更改类型,因此 flatMap 将是 Stream Directory → (Directory → Stream b) → Stream b。我们将看到函数返回什么:

    h(readdirp root: dir.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store')
    

    将此称为Stream File,因此第二个flatMapStream File → (File → Stream b) → Stream b

    h(fs.createReadStream file.name)
    

    这是Stream StringsplittakeUntillast 不会改变这一点,那么 map 会做什么? mapflatMap 非常相似:它的类型是Stream a → (a → b) → Stream b。在这种情况下,aStringb 是对象类型 {name : String, comment : String}。然后map 返回该对象的流,这是整个flatMap 函数返回的内容。上一步,第二个flatMap中的b是对象,所以第一个flatMap的函数也返回了一个对象的流,所以整个流就是一个Stream {name : String, comment : String}

    请注意,由于 Highland 的惰性,这实际上不会启动任何流式处理或处理。您需要使用eachtoArray 导致thunk 并启动管道。在each 中,将使用您的对象调用回调。根据您想对 cme​​ts 执行的操作,最好再发送一些 flatMap(例如,如果您将它们写入文件)。

    好吧,我不是故意写论文的。希望这会有所帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-12-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-28
      • 2020-01-08
      • 2014-01-11
      相关资源
      最近更新 更多