【问题标题】:How to use Reactive Streams for NIO binary processing?如何使用 Reactive Streams 进行 NIO 二进制处理?
【发布时间】:2023-03-25 15:53:01
【问题描述】:

是否有一些代码示例使用org.reactivestreams 库来处理使用 Java NIO 的大型数据流(以获得高性能)?我的目标是分布式处理,所以最好使用 Akka 的示例,但我可以弄清楚。

似乎大多数(我希望不是全部)在 scala 中读取文件的示例都求助于Source(非二进制)或直接 Java NIO(甚至像Files.readAllBytes!)

也许我错过了一个激活器模板? (Akka Streams with Scala! 正在解决我需要的一切,除了二进制/NIO 端)

【问题讨论】:

    标签: scala akka nio reactive-streams


    【解决方案1】:

    不要使用scala.collection.immutable.Stream 来消费这样的文件,原因是它执行记忆 - 也就是说,虽然它很懒,但它会将整个流缓冲(记忆)在内存中!

    当您考虑“流处理文件”时,这绝对不是您想要的。 Scala 的 Stream 之所以这样工作,是因为在功能设置中它完全有意义 - 例如,由于这一点,您可以避免一次又一次地轻松计算斐波那契数,有关更多详细信息,请参阅 ScalaDoc

    Akka Streams 提供了 Reactive Streams 实现并提供了一个您可以在此处使用的 FileIO 类(它会适当地反压并仅在需要时从文件中提取数据,并且流的其余部分已准备好使用它):

    import java.io._
    import akka.actor.ActorSystem
    import akka.stream.scaladsl.{ Sink, Source }
    
    object ExampleApp extends App {
    
    
      implicit val sys = ActorSystem()
      implicit val mat = FlowMaterializer()
    
      FileIO.fromPath(Paths.get("/example/file.txt"))
        .map(c ⇒ { print(c); c })
        .runWith(Sink.onComplete(_ ⇒ { f.close(); sys.shutdown() } ))
    }
    

    这里有更多关于使用 IO with Akka Streams 的文档 请注意,这是针对 Akka 的当前编写版本,因此是 2.5.x 系列。

    希望这会有所帮助!

    【讨论】:

    • 感谢您的精彩回答 - 我必须再次找到自己的问题才能知道我在寻找什么:doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/java/… - akka 2.4 已经发布(大概意味着它是 NIO 2)! (一旦您或我使用 api 更新/创建代码答案,我将接受)
    • 真的会一直将整个流保存在内存中吗?还是取决于您持有对流开头的引用?我的(一厢情愿?)印象是,如果您继续迭代处理尾部并忘记头部,Stream 项目最终会被释放。
    • 请阅读文档,我在下面链接了它们; scala-lang.org/api/current/scala/collection/immutable/…
    • @Stephen 我更新了答案以显示 2.4 和 2.5 API(它是一样的),快乐哈克
    【解决方案2】:

    我们实际上使用 akka 流来处理二进制文件。由于没有任何相关文档,所以让事情进展起来有点棘手,但这是我们想出的:

    val binFile = new File(filePath)
    val inputStream = new BufferedInputStream(new FileInputStream(binFile))
    val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte) 
    val binSource = Source(binStream)
    

    一旦你有了binSource,这是一个别名Source[Byte],你可以继续并开始应用你想要的任何流转换(mapflatMaptransform,等等...)。此功能利用 Source 伴随对象的 apply 接受 Iterable,传入一个 scala Stream,它应该延迟读取数据并使其可用于您的转换。

    编辑

    正如 Konrad 在 cmets 部分中指出的那样,Stream 在处理大文件时可能会成为问题,因为它会在延迟构建流时对遇到的元素执行记忆化。如果您不小心,这可能会导致内存不足的情况。但是,如果您查看 Stream 的文档,有一个提示可以避免在内存中建立记忆:

    一个人必须小心记忆;你可以很快吃掉很多 如果你不小心,内存量。这样做的原因是 Stream 的记忆创建了一个很像的结构 scala.collection.immutable.List。只要有东西坚持 头,头抓住尾巴,所以它继续 递归地。另一方面,如果没有任何东西可以支撑 head(例如,我们使用 def 来定义 Stream)然后一旦它不再 直接使用就消失了。

    因此,考虑到这一点,您可以将我原来的示例修改如下:

    val binFile = new File(filePath)
    val inputStream = new BufferedInputStream(new FileInputStream(binFile))     
    val binSource = Source(() => binStream(inputStream).iterator)
    
    def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte)
    

    所以这里的想法是通过def 构建Stream,而不是分配给val,然后立即从中获取iterator 并使用它来初始化Akka Source。以这种方式设置应该可以避免 momoization 的问题。我对一个大文件运行旧代码,并且能够通过在Source 上执行foreach 来产生OutOfMemory 情况。当我将它切换到新代码时,我能够避免这个问题。

    【讨论】:

    • 在这里使用 scala.collection.immutable.Stream 是相当危险的——它使用了记忆(!)(参见文档scala-lang.org/api/current/…),所以你最终会把整个文件都放在内存中,而不是通过 (!) 流式传输。
    • @Konrad'ktoso'Malawski,好点子。我将发布有关记忆问题解决方法的更新。
    • 好更新,暴露输入流的迭代器工作正常。记得在流完成时关闭资源。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-05-26
    • 2011-04-05
    • 1970-01-01
    • 1970-01-01
    • 2023-03-17
    • 2017-07-17
    相关资源
    最近更新 更多