【问题标题】:Spray http client and thousands requests喷洒http客户端和数千个请求
【发布时间】:2015-06-30 10:53:18
【问题描述】:

我想以控制发送到服务器的最大请求数的方式配置喷雾 http 客户端。我需要这个,因为如果发送了超过 2 个请求,我发送请求的服务器会阻止我。我明白了

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://smallTasks/user/IO-HTTP#151444590]] after [15000 ms]
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://smallTasks/user/IO-HTTP#151444590]] after [15000 ms]
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://smallTasks/user/IO-HTTP#151444590]] after [15000 ms]
akka.pattern.AskTimeoutException: Ask timed out on 

我需要发送数千个请求,但在收到大约 100 个请求的响应后我被阻止了。

我有这个方法:

  implicit val system = ActorSystem("smallTasks")
  implicit val timeout = new Timeout(15.seconds)

  import system.dispatcher

  def doHttpRequest(url: String): Future[HttpResponse] = {
    (IO(Http) ? HttpRequest(GET, Uri(url))).mapTo[HttpResponse]
  }

在这里我捕获响应并在失败时重试(递归):

def getOnlineOffers(modelId: Int, count: Int = 0): Future[Any] = {

    val result = Promise[Any]()

    AkkaSys.doHttpRequest(Market.modelOffersUrl(modelId)).map(response => {
      val responseCode = response.status.intValue
      if (List(400, 404).contains(responseCode)) {
        result.success("Bad request")
      } else if (responseCode == 200) {
        Try {
          Json.parse(response.entity.asString).asOpt[JsObject]
        } match {
          case Success(Some(obj)) =>
            Try {
              (obj \\ "onlineOffers").head.as[Int]
            } match {
              case Success(offers) => result.success(offers)
              case _ => result.success("Can't find property")
            }

          case _ => result.success("Wrong body")
        }
      } else {
        result.success("Unexpected error")
      }
    }).recover { case err =>
      if (count > 5) {
        result.success("Too many tries")
      } else {
        println(err.toString)
        Thread.sleep(200)
        getOnlineOffers(modelId, count + 1).map(r => result.success(r))
      }
    }

    result.future

  }

如何正确地做到这一点?可能我需要以某种方式配置 akka 调度程序吗?

【问题讨论】:

  • 如果达到最大并发连接数你想做什么?拒绝请求?等待给定的超时时间?
  • 我需要为每个模型收集信息,所以如果请求失败,我想重试失败的请求并超时。我已经这样做了(参见 Thread.sleep(200))。它在 10 - 15 秒内工作正常,在那之后我在控制台中收到很多 Akka 询问超时异常。我认为这是因为我有 10000 个模型并且它们都是同时发布的。我正在寻找一种强制 http 客户端每次仅发送 2 或 3 个请求并且在旧请求没有响应时不发送其他请求的方法

标签: akka spray-client


【解决方案1】:

您可以使用http://spray.io/documentation/1.2.2/spray-client/ 并编写您的个人管道

val pipeline: Future[SendReceive] =
      for (
        Http.HostConnectorInfo(connector, _) <-
          IO(Http) ? Http.HostConnectorSetup("www.spray.io", port = 80)
      ) yield sendReceive(connector)

    val request = Get("/segment1/segment2/...")
    val responseFuture: Future[HttpResponse] = pipeline.flatMap(_(request))

获取 HttpResponse

import scala.concurrent.Await
import scala.concurrent.duration._
val response: HttpResponse = Aweit(responseFuture, ...)

转换

import spray.json._
response.entity.asString.parseJson.convertTo[T]

检查

Try(response.entity.asString.parseJson).isSuccess

括号太多。在scala中你可以写得更短

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-05-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-25
    • 2019-07-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多