【发布时间】:2020-06-17 18:20:46
【问题描述】:
我正在尝试通过 scala 读取大量数据,如下所述:Scala read continuous http stream
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{Uri, HttpRequest}
import akka.stream.ActorMaterializer
object ChunkTestClient extends App {
implicit val system = ActorSystem("test")
import system.dispatcher
implicit val materializer = ActorMaterializer()
val source = Uri("https://jigsaw.w3.org/HTTP/ChunkedScript")
val finished = Http().singleRequest(HttpRequest(uri = source)).flatMap { response =>
response.entity.dataBytes.runForeach { chunk =>
println(chunk.utf8String)
}
}
}
上面的代码 sn-p 在大多数情况下都可以正常工作。但是如果响应数据很大,或者源不断地流式传输大量数据,那么块响应就不起作用,它也不会抛出任何错误,它只是默默地杀死。这似乎是数据生产速度大于消耗速度的背压情况。有没有办法用akka http流处理大尺寸\数据流。在上面的代码中可以做些什么来处理客户端的背压。
【问题讨论】:
-
我认为您认为背压导致问题的假设是不正确的。可能有一个错误被静默忽略并导致流停止(默认行为)。我建议添加错误记录 doc.akka.io/docs/akka/current/stream/… 和监督决策者 doc.akka.io/docs/akka/current/stream/…
-
最后加
Await.ready(finished, Duration.Inf)能用吗?
标签: scala akka akka-stream akka-http