【发布时间】:2018-07-08 06:31:28
【问题描述】:
我想使用 akka 流来将一些 json 网络服务连接在一起。我想知道从http请求生成流并将块流传输到另一个的最佳方法。 有没有办法定义这样的图表并运行它而不是下面的代码? 到目前为止,我尝试过这种方式,但不确定它是否真的在流式传输:
override def receive: Receive = {
case GetTestData(p, id) =>
// Get the data and pipes it to itself through a message as recommended
// https://doc.akka.io/docs/akka-http/current/client-side/request-level.html
http.singleRequest(HttpRequest(uri = uri.format(p, id)))
.pipeTo(self)
case HttpResponse(StatusCodes.OK, _, entity, _) =>
val initialRes = entity.dataBytes.via(JsonFraming.objectScanner(Int.MaxValue)).map(bStr => ChunkStreamPart(bStr.utf8String))
// Forward the response to next job and pipes the request response to dedicated actor
http.singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`,
initialRes)
))
case resp @ HttpResponse(code, _, _, _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
case _ => log.warning("Unexpected message in TestJobActor")
}
【问题讨论】:
标签: scala akka akka-stream akka-http