【问题标题】:How to download a HTTP resource to a file with Akka Streams and HTTP?如何使用 Akka Streams 和 HTTP 将 HTTP 资源下载到文件中?
【发布时间】:2016-04-27 00:18:57
【问题描述】:

在过去的几天里,我一直在尝试找出使用 Akka Streams 和 HTTP 将 HTTP 资源下载到文件的最佳方法。

最初我从Future-Based Variant 开始,看起来像这样:

def downloadViaFutures(uri: Uri, file: File): Future[Long] = {
  val request = Get(uri)
  val responseFuture = Http().singleRequest(request)
  responseFuture.flatMap { response =>
    val source = response.entity.dataBytes
    source.runWith(FileIO.toFile(file))
  }
}

这还不错,但是当我了解更多关于纯 Akka 流的知识后,我想尝试使用 Flow-Based Variant 创建从 Source[HttpRequest] 开始的流。起初这完全让我难过,直到我偶然发现了flatMapConcat 流转换。这最终变得更加冗长:

def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match {
  case (responseTry, context) => (responseTry.get, context)
}

def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match {
  case (response, _) => response.entity.dataBytes
}

def downloadViaFlow(uri: Uri, file: File): Future[Long] = {
  val request = Get(uri)
  val source = Source.single((request, ()))
  val requestResponseFlow = Http().superPool[Unit]()
  source.
    via(requestResponseFlow).
    map(responseOrFail).
    flatMapConcat(responseToByteSource).
    runWith(FileIO.toFile(file))
}

然后我想有点棘手并使用Content-Disposition 标头。

回到基于未来的变体:

def destinationFile(downloadDir: File, response: HttpResponse): File = {
  val fileName = response.header[ContentDisposition].get.value
  val file = new File(downloadDir, fileName)
  file.createNewFile()
  file
}

def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = {
  val request = Get(uri)
  val responseFuture = Http().singleRequest(request)
  responseFuture.flatMap { response =>
    val file = destinationFile(downloadDir, response)
    val source = response.entity.dataBytes
    source.runWith(FileIO.toFile(file))
  }
}

但现在我不知道如何使用基于未来的变体来做到这一点。这是据我所知:

def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match {
  case (response, _) =>
    val source = responseToByteSource(in)
    val file = destinationFile(downloadDir, response)
    source.map((_, file))
}

def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = {
  val request = Get(uri)
  val source = Source.single((request, ()))
  val requestResponseFlow = Http().superPool[Unit]()
  val sourceWithDest: Source[(ByteString, File), Unit] = source.
    via(requestResponseFlow).
    map(responseOrFail).
    flatMapConcat(responseToByteSourceWithDest(_, downloadDir))
  sourceWithDest.runWith(???)
}

所以现在我有一个Source,它将为每个File 发出一个或多个(ByteString, File) 元素(我说每个File,因为没有理由原来的Source 必须是一个@987654335 @)。

有没有办法把它们路由到动态的Sink

我在想flatMapConcat之类的东西,比如:

def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ???

这样我就可以完成downloadViaFlow2

def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = {
  val sink = FileIO.toFile(destination, true)
  Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right)
}
sourceWithDest.runWithMap {
  case (_, file) => destToSink(file)
}

【问题讨论】:

    标签: scala akka-stream akka-http


    【解决方案1】:

    该解决方案不需要 flatMapConcat。如果您不需要文件写入的任何返回值,则可以使用Sink.foreach

    def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = {
      val file = destinationFile(downloadDir, httpResponse)
      httpResponse.entity.dataBytes.runWith(FileIO.toFile(file))
    }
    
    def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = {
      val request = HttpRequest(uri=uri)
      val source = Source.single((request, ()))
      val requestResponseFlow = Http().superPool[Unit]()
    
      source.via(requestResponseFlow)
            .map(responseOrFail)
            .map(_._1)
            .runWith(Sink.foreach(writeFile(downloadDir)))
    }
    

    请注意,Sink.foreachwriteFile 函数创建 Futures。因此,没有太多的背压。硬盘驱动器可能会减慢 writeFile 的速度,但流会继续生成 Futures。要控制这一点,您可以使用Flow.mapAsyncUnordered(或Flow.mapAsync):

    val parallelism = 10
    
    source.via(requestResponseFlow)
          .map(responseOrFail)
          .map(_._1)
          .mapAsyncUnordered(parallelism)(writeFile(downloadDir))
          .runWith(Sink.ignore)
    

    如果您想累积 Long 值以获得总计数,您需要与 Sink.fold 结合:

    source.via(requestResponseFlow)
          .map(responseOrFail)
          .map(_._1)
          .mapAsyncUnordered(parallelism)(writeFile(downloadDir))
          .runWith(Sink.fold(0L)(_ + _))
    

    当请求的来源枯竭时,折叠将保持运行总和并发出最终值。

    【讨论】:

    • 嗯,我希望有比这更好的方法。我也不太确定这是否能正常工作。 writeFile 将在 FileIO 流实现后立即返回。如果响应被分块,则需要按顺序将其写入文件。使用mapAsync 的类似问题。 append 参数也需要设置。此外,写入文件的任何错误似乎都不会导致外部流接收错误信号。
    • @Steiny 打破我对您的多个 cmets 的回答:(a)正确,写入文件立即返回 Future 但 mapAsync 处理这个(b)没有解决方案可以纠正 chunkedsource 也不是原始问题/要求的一部分(c)仅在写入同一文件时才需要(d)强制外部流在任何文件写入失败时失败不是原始问题的一部分。您问“无论如何要采取这些并将它们路由到动态接收器吗?”,我的回答回答了那个问题。我在您的示例代码的上下文中写了我的回复...
    • 当你得到 404 时会发生什么?!我厌倦了遗漏主要代码路径的示例 - 这就是我们使用 scala 来防止这种废话编码的原因!
    • @PlexQ 我没有改变原始问题的逻辑,这是处理 404 的地方。我专门回答了这个问题“无论如何要采取这些并将它们路由到动态接收器吗?”如果你想问一个新问题,我很乐意看看。另外,请参阅更新后的关于友好和不友好沟通方式的行为准则:stackoverflow.com/conduct
    【解决方案2】:

    使用ws中注入的play Web Services客户端,记得导入scala.concurrent.duration._:

    def downloadFromUrl(url: String)(ws: WSClient): Future[Try[File]] = {
      val file = File.createTempFile("my-prefix", new File("/tmp"))
      file.deleteOnExit()
    
      val futureResponse: Future[WSResponse] =
        ws.url(url).withMethod("GET").withRequestTimeout(5 minutes).stream()
    
      futureResponse.flatMap { res =>
        res.status match {
          case 200 =>
            val outputStream = java.nio.file.Files.newOutputStream(file.toPath)
    
            val sink = Sink.foreach[ByteString] { bytes => outputStream.write(bytes.toArray) }
    
            res.bodyAsSource.runWith(sink).andThen {
              case result =>
                outputStream.close()
                result.get
            } map (_ => Success(file))
          case other => Future(Failure[File](new Exception("HTTP Failure, response code " + other + " : " + res.statusText)))
        }
      }
    }
    

    【讨论】:

    • 正是我需要的!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-06-24
    • 2016-11-26
    • 2018-02-06
    • 1970-01-01
    • 1970-01-01
    • 2022-01-10
    • 1970-01-01
    相关资源
    最近更新 更多