【问题标题】:Http Client with Source.tick带有 Source.tick 的 Http 客户端
【发布时间】:2016-03-10 22:28:48
【问题描述】:

我正在尝试将 http 客户端连接到服务器公开的 http 服务,源应该每 1 秒发送一次请求,因为我已经按照部分图表进行了包装:

def httpSourceGraph() = {
  Source.fromGraph(GraphDSL.create() { implicit builder =>
    val sourceOutLet = builder.add(Source.tick(FiniteDuration(0, TimeUnit.SECONDS), FiniteDuration(1,
      TimeUnit.SECONDS),
      HttpRequest(uri ="/test", method = HttpMethods.GET))).out
    // expose outlet
    SourceShape(sourceOutLet)
  })
}

def httpConnFlow() = {
  Flow.fromGraph(GraphDSL.create() { implicit builder =>

    val httpSourceFlow = builder.add(Http(system).outgoingConnection(host = "localhost", port = 8080))

    FlowShape(httpSourceFlow.in, httpSourceFlow.out)
  })
}

图表组成为

val response= httpSourceGraph.via(httpConnFlow()).runForeach(println)

如果 http 服务器 (localhost:8080/test) 启动并运行,一切正常,每 1 秒我可以看到从服务器返回的响应。如果任一服务器关闭或稍后关闭,我将无法做出任何响应。

我认为它应该给我以下错误:

akka.stream.StreamTcpException: Tcp 命令 [Connect(localhost/127.0.0.1:8080,None,List(),Some(10 seconds),true)] 失败

这也可以用一些错误的 url 进行测试。 (域名stackoverflow1.com和错误的url“/test”)

感谢您的帮助。

-阿伦

【问题讨论】:

    标签: akka akka-stream akka-http


    【解决方案1】:

    我可以提出一种方法来获得您所寻求的行为。我认为您问题的核心是Flow 生成的Http().outgoingConnection 将在遇到故障时终止。一旦发生这种情况,就不再有下游需求从Source 拉取请求,整个流程停止。如果您想要无论连接是否丢失都将继续向下游发出元素的东西,那么您可以尝试使用主机连接池而不仅仅是单个连接。该池将对单个连接的故障更具弹性,并且它也从一开始就设置为向下游发送SuccessFailure。使用主机连接池的流的简化版本可以定义如下:

     val source = 
       Source.tick(
         1 second, 
         5 second, 
         (HttpRequest(uri ="/", method = HttpMethods.GET), 1)
       )
    
     val connFlow = Http(system).
       newHostConnectionPool[Int](host = "www.aquto.com", port = 80)
    
     val sink = Sink.foreach[(util.Try[HttpResponse], Int)]{
       case (util.Success(r), _ ) => 
         r.entity.toStrict(10 seconds)
         println(s"Success: ${r.status}")
    
       case (util.Failure(ex), _) => 
         println(s"Failure: ${ex.getMessage}")
     }
    
    source.via(connFlow).to(sink).run
    

    我对此进行了测试,在测试过程中拔掉了我的网络连接,这是我看到的输出:

    Success: 200 OK
    Success: 200 OK
    Failure: Tcp command [Connect(www.aquto.com/50.112.131.12:80,None,List(),Some(10 seconds),true)] failed
    Failure: Tcp command [Connect(www.aquto.com/50.112.131.12:80,None,List(),Some(10 seconds),true)] failed
    Failure: Tcp command [Connect(www.aquto.com/50.112.131.12:80,None,List(),Some(10 seconds),true)] failed
    Success: 200 OK
    Success: 200 OK
    

    【讨论】:

    • 为什么返回元组 (response,int)。我宁愿只回应。
    • 池的设置允许您一次将多个并发请求推送到其中。当您执行此操作时,输出的响应没有顺序保证(即顺序输入不一定等于顺序输出)。这就是元组上的第二个插槽的用途,但是对于您的情况,您不会一次传递多个请求并且只执行一个请求,因此这就是为什么该 id 在请求端设置为 1 并完全忽略的原因响应处理。
    • 非常感谢.. 它与使用 Http(system).outgoingConnection(host = "localhost", port = 8080) 有何不同
    • 您应该阅读这些文档,因为它们提供的知识比在这个空间中讨论的要多:doc.akka.io/docs/akka/2.4.2/scala/http/client-side/index.html
    • 太好了……我也经历了同样的事情。再次感谢
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-01-24
    • 2019-06-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多