【问题标题】:Send big file over reactive stream通过反应流发送大文件
【发布时间】:2017-02-19 14:26:43
【问题描述】:

我正在编写的部分应用程序需要将任意大的文件(对于这个问题,我假设 100-200 GB)文件从客户端传输到服务器。重要的是,接收器(服务器)没有存储这个文件——它只是读取/检查流并将其发送到下一个点。因为在任何时候我都不需要整个文件,但希望同时进行多次传输,所以我想尽量减少 RAM 使用并消除磁盘​​使用。我想以 1 MB 为单位处理文件。

目前,服务器使用 Spring Boot 和 Akka。

我的第一次尝试是在客户端打开缓冲文件输入流,以 1 MB 的块读取它并在单独的线程中以消息的形式发送它们。它可以工作,但问题是客户端一个接一个地发送消息,而不必担心服务器是否有缓冲区来存储它(缺乏背压)。

我的第二个想法是像这样使用 akka-streams:

How to use Reactive Streams for NIO binary processing?

像这样使用 ActorPublisher:

akka-streams with akka-cluster

但是,正如此处所述:

http://doc.akka.io/docs/akka/2.4.16/scala/stream/stream-integrations.html#Implementing_Reactive_Streams_Publisher_or_Subscriber

“警告 ActorPublisher 和 ActorSubscriber 可能会在 Akka 的未来版本中被弃用。

警告 ActorPublisher 和 ActorSubscriber 不能与远程 Actor 一起使用,因为如果 Reactive Streams 协议的信号(例如请求)丢失,则流可能会死锁。”

这看起来不是个好主意。

我不想将其保存在任何存储提供商(Dropbox、Google Drive 等)中,因为我想即时分析数据。我有 Spring 5 和 Akka,但我可以使用任何其他软件来解决这个问题。原始套接字将缺乏背压,并且种子不保证顺序/有序读写(我需要)。

主要问题是:假设服务器不能同时将文件存储在磁盘或内存中,如何将大文件从客户端传输到服务器?

额外的问题是:如何在这种传输中计算“正确”的块大小?

我几天来一直在寻找答案,看起来我不是唯一一个遇到这种问题的人,但是没有任何答案或像“不要这样做”这样的答案,而没有指出其他适当的替代解决方案。

【问题讨论】:

  • 目前还不清楚您的问题到底是什么。 Akka-streams 提供了您需要的所有工具 - TCP 套接字周围有流包装器(当然有背压),还有 GraphStage(这是 ActorSubscriber 和 ActorPublisher 的预期替代品),您可以使用它来实现处理如果没有任何默认组合符适合您,则为逻辑。你只需要将它们结合起来。
  • 请您给我看一些远程流的最小示例好吗?
  • 当然,这里是:gist.github.com/netvl/1245564b106c02691dd0808fe98d07eb。它很脏(尤其是在服务器关闭处理方面),但它应该传达基本思想。它使用原始 TCP 套接字进行通信;您可能还想为此使用 akka-http,因为它还为您提供了更轻松的 TLS 配置(尽管它也可以使用原始 TCP 流来完成)。 TCP 流记录在 here,akka-http 记录在 here
  • 这是我修复你的代码的尝试:gist.github.com/netvl/a8c5024ef9b905a6d6ff32cda8a39872你基本上是对的。确实,Java DSL 与 Scala API 有细微的差别,而且更加冗长,但基本思想保持不变。我之前没有使用过 Java API,所以有可能在那里进行优化。
  • 当然,这不是一个完整的实现,这只是一个如何使用 akka-streams 进行 network I/O 的示例。我提前说它很脏。您必须根据它专门为您的用例创建一些东西。对不起,我真的没有能力为你完全实施它。您必须使用 Akka 文档来了解如何操作;我向您保证,您需要的所有信息都在那里,或者至少我在您的问题中没有看到任何不存在的信息。

标签: spring stream akka reactive-programming akka-stream


【解决方案1】:

Akka 流专门为此用例提供功能:streaming File IO。来自文档:

import akka.stream.scaladsl._
val file = Paths.get("example.csv")

val foreach: Future[IOResult] = 
  FileIO.fromPath(file)
        .to(Sink.ignore)
        .run()

关于块“正确大小”的奖励问题;这在很大程度上取决于您的硬件和软件配置。你最好的办法是编写一个测试客户端并调整块大小,直到你为你的服务器找到一个“最佳位置”。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-01-07
    • 2021-07-09
    • 2015-10-04
    • 2015-06-23
    • 1970-01-01
    • 2016-08-01
    • 1970-01-01
    • 2012-07-12
    相关资源
    最近更新 更多