【问题标题】:How do you throttle Flow in the latest Akka (2.4.6)?在最新的 Akka (2.4.6) 中如何限制 Flow?
【发布时间】:2016-09-22 06:14:45
【问题描述】:

在最新的 Akka (2.4.6) 中如何限制 Flow?我想限制 Http 客户端流量以将请求数限制为每秒 3 个请求。我在网上找到了以下示例,但它是针对旧的 Akka 和 akka-streams API 的变化太大,以至于我无法弄清楚如何重写它。

def throttled[T](rate: FiniteDuration): Flow[T, T] = {
  val tickSource: Source[Unit] = TickSource(rate, rate, () => ())
  val zip = Zip[T, Unit]
  val in = UndefinedSource[T]
  val out = UndefinedSink[T]
  PartialFlowGraph { implicit builder =>
    import FlowGraphImplicits._
    in ~> zip.left ~> Flow[(T, Unit)].map { case (t, _) => t } ~> out
    tickSource ~> zip.right
  }.toFlow(in, out)
}

这是我迄今为止最好的尝试

def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val ticker = Source.tick(rate, rate, Unit)

  val zip = builder.add(Zip[T, Unit.type])
  val map = Flow[(T, Unit.type)].map { case (value, _) => value }
  val messageExtractor = builder.add(map)

  val in = Inlet[T]("Req.in")
  val out = Outlet[T]("Req.out")

  out ~> zip.in0
  ticker ~> zip.in1
  zip.out ~> messageExtractor.in

  FlowShape.of(in, messageExtractor.out)
})

它在我的主要流程中引发异常:)

private val queueHttp = Source.queue[(HttpRequest, (Any, Promise[(Try[HttpResponse], Any)]))](1000, OverflowStrategy.backpressure)
  .via(throttleFlow(rate))
  .via(poolClientFlow)
  .mapAsync(4) {
    case (util.Success(resp), any) =>
      val strictFut = resp.entity.toStrict(5 seconds)
      strictFut.map(ent => (util.Success(resp.copy(entity = ent)), any))
    case other =>
      Future.successful(other)
  }
  .toMat(Sink.foreach({
    case (triedResp, (value: Any, p: Promise[(Try[HttpResponse], Any)])) =>
      p.success(triedResp -> value)
    case _ =>
      throw new RuntimeException()
  }))(Keep.left)
  .run

其中poolClientFlowHttp()(system).cachedHostConnectionPool[Any](baseDomain)

例外是:

Caused by: java.lang.IllegalArgumentException: requirement failed: The output port [Req.out] is not part of the underlying graph.
    at scala.Predef$.require(Predef.scala:219)
    at akka.stream.impl.StreamLayout$Module$class.wire(StreamLayout.scala:204)

【问题讨论】:

  • 以上内容在新的 akka 流中很容易重做。但这对我来说似乎也有缺陷。 Zip 需要每个来源一个。更好的方法可能是合并请求和滴答声,然后有一个有状态的 BidiFlow,如果请求多于滴答声,则使用 503 响应请求。
  • @RüdigerKlaehn 能否请您出示您的版本?我已经用我的非工作版本更新了问题。我不希望请求失败。我希望他们排队等待使用背压。
  • @Qingwei ,问题是专家显然想要全局限制(通过多个流实现),所以限制不起作用。

标签: scala akka akka-stream


【解决方案1】:

这是使用@Qingwei 提到的节流方法的尝试。关键是不要使用bindAndHandle(),而是使用bind() 并在处理传入连接之前对其进行限制。代码取自implementation of bindAndHandle(),但为简单起见省略了一些错误处理。请不要在生产环境中这样做。

implicit val system = ActorSystem("test")
implicit val mat = ActorMaterializer()
import system.dispatcher
val maxConcurrentConnections = 4

val handler: Flow[HttpRequest, HttpResponse, NotUsed] = complete(LocalDateTime.now().toString)

def handleOneConnection(incomingConnection: IncomingConnection): Future[Done] =
  incomingConnection.flow
        .watchTermination()(Keep.right)
        .joinMat(handler)(Keep.left)
        .run()

Http().bind("127.0.0.1", 8080)
  .throttle(3, 1.second, 1, ThrottleMode.Shaping)
  .mapAsyncUnordered(maxConcurrentConnections)(handleOneConnection)
  .to(Sink.ignore)
  .run()

【讨论】:

  • 谢谢。您能否告诉我一般如何创建 FlowShape ?当我使用.via 摄取给定流量时,如何在内部连接入口和出口?
  • 当然,如果我有时间的话。但这最好在另一个问题中处理。
  • @expert 的问题是将请求数限制为每秒 3 个。但是这个答案提供了将连接数限制为 3 的方式。不是这种情况。因为多个请求可以通过同一个连接到达,但 handlerFlow 没有限制它。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-06-22
  • 1970-01-01
  • 2017-08-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-07-08
相关资源
最近更新 更多