【问题标题】:http => akka stream => httphttp => akka 流 => http
【发布时间】:2018-07-08 06:31:28
【问题描述】:

我想使用 akka 流来将一些 json 网络服务连接在一起。我想知道从http请求生成流并将块流传输到另一个的最佳方法。 有没有办法定义这样的图表并运行它而不是下面的代码? 到目前为止,我尝试过这种方式,但不确定它是否真的在流式传输:

override def receive: Receive = {
   case GetTestData(p, id) =>
     // Get the data and pipes it to itself through a message as recommended
     // https://doc.akka.io/docs/akka-http/current/client-side/request-level.html
     http.singleRequest(HttpRequest(uri = uri.format(p, id)))
       .pipeTo(self)

   case HttpResponse(StatusCodes.OK, _, entity, _) =>
     val initialRes = entity.dataBytes.via(JsonFraming.objectScanner(Int.MaxValue)).map(bStr => ChunkStreamPart(bStr.utf8String))

     // Forward the response to next job and pipes the request response to dedicated actor
     http.singleRequest(HttpRequest(
       method = HttpMethods.POST,
       uri = "googl.cm/flow",
       entity = HttpEntity.Chunked(ContentTypes.`application/json`, 
       initialRes)
     ))


   case resp @ HttpResponse(code, _, _, _) =>
     log.error("Request to test job failed, response code: " + code)
     // Discard the flow to avoid backpressure
     resp.discardEntityBytes()

   case _ => log.warning("Unexpected message in TestJobActor")
 }

【问题讨论】:

    标签: scala akka akka-stream akka-http


    【解决方案1】:

    这应该是一个与您的receive 等效的图表:

    Http()
    .cachedHostConnectionPool[Unit](uri.format(p, id))
    .collect {
      case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
        val initialRes = entity.dataBytes
          .via(JsonFraming.objectScanner(Int.MaxValue))
          .map(bStr => ChunkStreamPart(bStr.utf8String))
        Some(initialRes)
    
      case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
        log.error("Request to test job failed, response code: " + code)
        // Discard the flow to avoid backpressure
        resp.discardEntityBytes()
        None
    }
    .collect {
      case Some(initialRes) => initialRes
    }
    .map { initialRes =>
      (HttpRequest(
         method = HttpMethods.POST,
         uri = "googl.cm/flow",
         entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
       ),
       ())
    }
    .via(Http().superPool[Unit]())
    

    this的类型是Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), HostConnectionPool],其中Unit是一个关联ID,如果你想知道哪个请求对应到达的响应,可以使用,HostConnectionPool物化值可以用来关闭与主机的连接。只有cachedHostConnectionPool 会返回这个具体化的值,superPool 可能会自行处理(尽管我没有检查过)。无论如何,我建议您在关闭应用程序时使用Http().shutdownAllConnectionPools(),除非您出于某种原因需要。根据我的经验,它更不容易出错(例如忘记关机)。

    您也可以使用 Graph DSL,来表达相同的图形:

    val graph = Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._
    
        val host1Flow = b.add(Http().cachedHostConnectionPool[Unit](uri.format(p, id)))
        val host2Flow = b.add(Http().superPool[Unit]())
    
        val toInitialRes = b.add(
          Flow[(Try[HttpResponse], Unit)]
            .collect {
              case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
                val initialRes = entity.dataBytes
                  .via(JsonFraming.objectScanner(Int.MaxValue))
                  .map(bStr => ChunkStreamPart(bStr.utf8String))
                Some(initialRes)
    
              case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
                log.error("Request to test job failed, response code: " + code)
                // Discard the flow to avoid backpressure
                resp.discardEntityBytes()
                None
            }
        )
    
        val keepOkStatus = b.add(
          Flow[Option[Source[HttpEntity.ChunkStreamPart, Any]]]
            .collect {
              case Some(initialRes) => initialRes
            }
        )
    
        val toOtherHost = b.add(
          Flow[Source[HttpEntity.ChunkStreamPart, Any]]
            .map { initialRes =>
              (HttpRequest(
                 method = HttpMethods.POST,
                 uri = "googl.cm/flow",
                 entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
               ),
               ())
            }
        )
    
        host1Flow ~> toInitialRes ~> keepOkStatus ~> toOtherHost ~> host2Flow
    
        FlowShape(host1Flow.in, host2Flow.out)
    })
    

    【讨论】:

    • 感谢您的提示,实际上图重构的目标是能够广播/过滤第一个响应以便将第二个响应发送到不同的 url,这将如何实现?类似于stackoverflow.com/questions/33817241/…
    • 我用 Graph DSL 来扩展答案。然后,您可以使用 Partition 元素 (doc),根据它来路由您的流程。 this answer中有一个很好的例子
    猜你喜欢
    • 2017-07-17
    • 1970-01-01
    • 2017-01-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多