【发布时间】:2016-09-22 06:14:45
【问题描述】:
在最新的 Akka (2.4.6) 中如何限制 Flow?我想限制 Http 客户端流量以将请求数限制为每秒 3 个请求。我在网上找到了以下示例,但它是针对旧的 Akka 和 akka-streams API 的变化太大,以至于我无法弄清楚如何重写它。
def throttled[T](rate: FiniteDuration): Flow[T, T] = {
val tickSource: Source[Unit] = TickSource(rate, rate, () => ())
val zip = Zip[T, Unit]
val in = UndefinedSource[T]
val out = UndefinedSink[T]
PartialFlowGraph { implicit builder =>
import FlowGraphImplicits._
in ~> zip.left ~> Flow[(T, Unit)].map { case (t, _) => t } ~> out
tickSource ~> zip.right
}.toFlow(in, out)
}
这是我迄今为止最好的尝试
def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val ticker = Source.tick(rate, rate, Unit)
val zip = builder.add(Zip[T, Unit.type])
val map = Flow[(T, Unit.type)].map { case (value, _) => value }
val messageExtractor = builder.add(map)
val in = Inlet[T]("Req.in")
val out = Outlet[T]("Req.out")
out ~> zip.in0
ticker ~> zip.in1
zip.out ~> messageExtractor.in
FlowShape.of(in, messageExtractor.out)
})
它在我的主要流程中引发异常:)
private val queueHttp = Source.queue[(HttpRequest, (Any, Promise[(Try[HttpResponse], Any)]))](1000, OverflowStrategy.backpressure)
.via(throttleFlow(rate))
.via(poolClientFlow)
.mapAsync(4) {
case (util.Success(resp), any) =>
val strictFut = resp.entity.toStrict(5 seconds)
strictFut.map(ent => (util.Success(resp.copy(entity = ent)), any))
case other =>
Future.successful(other)
}
.toMat(Sink.foreach({
case (triedResp, (value: Any, p: Promise[(Try[HttpResponse], Any)])) =>
p.success(triedResp -> value)
case _ =>
throw new RuntimeException()
}))(Keep.left)
.run
其中poolClientFlow 是Http()(system).cachedHostConnectionPool[Any](baseDomain)
例外是:
Caused by: java.lang.IllegalArgumentException: requirement failed: The output port [Req.out] is not part of the underlying graph.
at scala.Predef$.require(Predef.scala:219)
at akka.stream.impl.StreamLayout$Module$class.wire(StreamLayout.scala:204)
【问题讨论】:
-
以上内容在新的 akka 流中很容易重做。但这对我来说似乎也有缺陷。 Zip 需要每个来源一个。更好的方法可能是合并请求和滴答声,然后有一个有状态的 BidiFlow,如果请求多于滴答声,则使用 503 响应请求。
-
@RüdigerKlaehn 能否请您出示您的版本?我已经用我的非工作版本更新了问题。我不希望请求失败。我希望他们排队等待使用背压。
-
@Qingwei ,问题是专家显然想要全局限制(通过多个流实现),所以限制不起作用。
标签: scala akka akka-stream