【问题标题】:Throttle HTTP request on Akka/Spray限制 Akka/Spray 上的 HTTP 请求
【发布时间】:2014-09-30 07:23:34
【问题描述】:

我在 Scala 中使用 Akka Actor 从外部服务(HTTP 获取请求)下载资源。来自外部服务的响应是 JSON,我必须使用分页(提供程序非常慢)。我想在 10 个线程中同时下载所有分页结果。我使用这样的 URL 来下载块:http://service.com/itmes?limit=50&offset=1000

我创建了以下管道:

ScatterActor => RoundRobinPool[10](LoadChunkActor) => Aggreator

ScatterActor 获取要下载的项目总数并将其分成块。我创建了 10 个 LoadChunkActor 来同时处理任务。

  override def receive: Receive = {
    case LoadMessage(limit) =>
    val offsets: IndexedSeq[Int] = 0 until limit by chunkSize
    offsets.foreach(offset => context.system.actorSelection(pipe) !
    LoadMessage(chunkSize, offset))
 }

LoadChunkActor 使用 Spray 发送请求。演员长这样:

val pipeline = sendReceive ~> unmarshal[List[Items]]
override def receive: Receive = {
  case LoadMessage(limit, offset) =>
    val uri: String = s"http://service.com/items?limit=50&offset=$offset"
    val responseFuture = pipeline {Get(uri)}
    responseFuture onComplete {
      case Success(items) => aggregator ! Loaded(items)
    }
 }

如您所见,LoadChunkActor 正在从外部服务请求块并添加回调以在 onComplete 上运行。 Actor 现在准备好接收另一条消息并且他正在请求另一个块。 Spray 正在使用非阻塞 API 来下载块。结果外部服务被我的请求淹没了,我得到了超时。

如何安排任务列表但我想同时处理最多 10 个?

【问题讨论】:

  • 你读过this的文章吗?
  • 是的,我已经阅读了这篇文章。它描述了限制为 3m/s 的系统。我对反馈节流感兴趣,我想在 LoadChunkActor 将响应从 HTTP 服务发送到聚合器时开始处理下一条消息。
  • 这似乎是工作拉动的绝佳候选人:michaelpollmeier.com/akka-work-pulling-pattern。使用一个 Actor 作为主节点,根据批次总数创建工作列表以下拉和管理结果,然后在其下的一组工作人员一次处理一个块,将结果报告给主节点。
  • @cmbaxter,我使用了类似于拉动的类似解决方案。效果很好。

标签: scala akka spray-client


【解决方案1】:

我已经创建了以下解决方案(类似于拉http://www.michaelpollmeier.com/akka-work-pulling-pattern/:

ScatterActor (10000x messages) => 
  ThrottleActor => LoadChunkActor => ThrottleMonitorActor => Aggregator
         ^                                    |
         |<--------WorkDoneMessage------------|
  1. ThrottleActor 将消息发布到 ListBuffer 并发送到 LoadChunkActor 最多 N 条消息。
  2. 当 LoadChunkActor 通过 ThrottleMonitorActor 向 Aggregator 发送消息时。
  3. ThrottleMonitorActor 向 ThrottleActor 发送确认。
  4. ThrottleActor 将下一条消息发送到 LoadChunkActor。

【讨论】:

    【解决方案2】:

    来自project adhoclabs/akka-http-contrib,你现在(2016 年 7 月,两年后)来自Yeghishe Piruzyanscala.co.adhoclabs.akka.http.contrib.throttle package

    见“Akka Http Request Throttling

    implicit val throttleSettings = MetricThrottleSettings.fromConfig
    
    Http().bindAndHandle(
      throttle.apply(routes),
      httpInterface,
      httpPort
    )
    

    【讨论】:

      猜你喜欢
      • 2019-02-25
      • 1970-01-01
      • 2020-10-29
      • 2014-08-19
      • 1970-01-01
      • 2016-08-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多