【发布时间】:2016-04-27 00:18:57
【问题描述】:
在过去的几天里,我一直在尝试找出使用 Akka Streams 和 HTTP 将 HTTP 资源下载到文件的最佳方法。
最初我从Future-Based Variant 开始,看起来像这样:
def downloadViaFutures(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
这还不错,但是当我了解更多关于纯 Akka 流的知识后,我想尝试使用 Flow-Based Variant 创建从 Source[HttpRequest] 开始的流。起初这完全让我难过,直到我偶然发现了flatMapConcat 流转换。这最终变得更加冗长:
def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match {
case (responseTry, context) => (responseTry.get, context)
}
def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match {
case (response, _) => response.entity.dataBytes
}
def downloadViaFlow(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSource).
runWith(FileIO.toFile(file))
}
然后我想有点棘手并使用Content-Disposition 标头。
回到基于未来的变体:
def destinationFile(downloadDir: File, response: HttpResponse): File = {
val fileName = response.header[ContentDisposition].get.value
val file = new File(downloadDir, fileName)
file.createNewFile()
file
}
def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val file = destinationFile(downloadDir, response)
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
但现在我不知道如何使用基于未来的变体来做到这一点。这是据我所知:
def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match {
case (response, _) =>
val source = responseToByteSource(in)
val file = destinationFile(downloadDir, response)
source.map((_, file))
}
def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
val sourceWithDest: Source[(ByteString, File), Unit] = source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSourceWithDest(_, downloadDir))
sourceWithDest.runWith(???)
}
所以现在我有一个Source,它将为每个File 发出一个或多个(ByteString, File) 元素(我说每个File,因为没有理由原来的Source 必须是一个@987654335 @)。
有没有办法把它们路由到动态的Sink?
我在想flatMapConcat之类的东西,比如:
def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ???
这样我就可以完成downloadViaFlow2:
def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = {
val sink = FileIO.toFile(destination, true)
Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right)
}
sourceWithDest.runWithMap {
case (_, file) => destToSink(file)
}
【问题讨论】:
标签: scala akka-stream akka-http