【问题标题】:"tee" Scala Streams / Iterators"tee" Scala 流/迭代器
【发布时间】:2018-12-21 02:52:26
【问题描述】:

我有一个表示为简单迭代器(或流)的顺序数据源。数据很大,不适合内存。此外,源可以遍历一次,并且获取成本很高。 此源用于一些繁重的过程(黑盒),这些过程将迭代器(或流)作为其参数以线性消耗数据。 好的,这很简单。但是,如果我有两个不同的此类消费程序,我该怎么办?正如我所说,我不想将输入数据吸入 List 之类的集合中。我也可以通过从一开始就重新阅读源两次来完成我的任务,但我不喜欢这样,因为它没有效果。 如果事实上我需要“开球”(一种克隆)迭代器(或流)以通过两个并行进程消耗单个两次而不将其缓存到内存集合中。我想如果这种方法消耗源流太快,应该做背压或者更确切地说限制兄弟姐妹。有效的解决方案可能应该有一些并行安全的队列缓冲区。 有谁知道如何在 Scala 上进行这样的事情(或使用任何外部流库/框架)?

PS 我发现了一个 4 岁的类似问题: One upstream stream feeding multiple downstream streams 不同之处在于我询问如何使用标准 Scala 迭代器(或流)或更好的一些现有库来执行它。

【问题讨论】:

    标签: scala stream tee


    【解决方案1】:

    您应该查看fs2 streams。该示例使用常量内存以增量方式读取文件并写入另一个文件。以下是如何修改他们的示例以写入两个文件:

    ...
    
    io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
      .through(text.utf8Decode)
      .through(text.lines)
      .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
      .map(line => fahrenheitToCelsius(line.toDouble).toString)
      .intersperse("\n")
      .through(text.utf8Encode)
      .observe(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
      .through(io.file.writeAll(Paths.get("testdata/celsius2.txt"), blockingEC))
    
    ...
    

    【讨论】:

    • 不错,看来我需要! :-) 请注意,我已经有了 Iterator[MyStructure] 。你能给我举个例子,如何提供两个以 Iterator 或 Stream 作为参数的函数 - 而不是两个文件(希望它不是太复杂)?是否可以将简单的 Scala 流转换为 fs2 流并返回?
    • 创建 fs2 流的方法有很多种,guide 中概述了其中的许多方法。如果一切都失败了,你总是可以通过unfold 构建一个。
    • 尝试: import fs2._ import cats.effect.{IO, Sync} val st2 = Stream.fromIterator(it0) // 从我的源创建 fs2 srteam Iterator[NginxRec] // 临时消费者首先尝试(只打印前 2 条记录): def sink1: Sink[IO, NginxRec] = _.take(2).evalMap(nr => IO.delay(println(nr.datetime))) def sink2: Sink[IO , NginxRec] = _.take(2).evalMap(nr => IO.delay(println(nr.datetime))) st2.observe(sink1).observe(sink2) // 我在这里吗?我在 Stream.fromIterator: Error: Diverging implicit expansion for type cats.effect.Sync[F] ...
    • 我在这里找到了类似于我想要的东西:fs2.io/…(部分“单一发布者/多个订阅者”)。但我仍然看不出这如何解决我的任务。
    • 你能用你正在使用的代码示例发布另一个问题吗?
    猜你喜欢
    • 2021-04-21
    • 2010-12-04
    • 2013-04-29
    • 2013-06-04
    • 2012-03-08
    • 2013-04-27
    • 1970-01-01
    • 1970-01-01
    • 2012-12-11
    相关资源
    最近更新 更多