【发布时间】:2021-01-19 08:50:39
【问题描述】:
我看到了 Akka Streams 的背压概念,但我的项目没有使用 Akka Stream,我为 Akka 开发了另一个背压指示器概念,我想在这里问一下它是否可行...
我打算使用邮箱队列的深度作为背压的标准......背后的逻辑,如果 Akka 能够跟上我产生消息的速度,那么消息队列的深度应该肤浅。如果我发送的消息太多而 Akka 跟不上,那么消息队列中的消息将会越来越多,我必须降低生成消息的速度。
这与 Apache Cassandra 的“飞行中请求”基本相同......
Session.State state = session.getState();
for (Host host : state.getConnectedHosts()) {
HostDistance distance = loadBalancingPolicy.distance(host);
int connections = state.getOpenConnections(host);
int inFlightQueries = state.getInFlightQueries(host);
....
}
所以我从 Akka 使用的 Akka reference.conf 中找到..
default-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue with
UnboundedMessageQueueSemantics {
final def enqueue(receiver: ActorRef, handle: Envelope): Unit = add(handle)
final def dequeue(): Envelope = poll()
所以我编写了一个 AspectJ Aspect 来拦截 enqueue() 和 dequeue(),并增加和减少一个全局 AtomicLong,我可以使用它来跟踪 Akka Actor 邮箱中等待的消息总数......
所以我的逻辑是,如果 Akka 能够跟上我发送的消息数量,那么该数量应该低于某个预先配置的值......
假设我有 100 000 个演员,他们在邮箱消息队列中有 1000 条消息,一切正常,但如果我看到 1 000 000 条消息在消息队列中等待,这是一个减慢消息生成的信号。 ...如果队列中有 10 000 000 条消息,则确定信号停止消息生产...
我构建了一个原型,它按我的预期工作......但在我继续之前,我想在这里问一下,你是否发现这个概念有任何真正的缺陷。
谢谢解答...
【问题讨论】:
标签: akka