topology.max.spout.pending 的意义在于 ,缓存spout 发送出去的tuple,当下流的bolt还有topology.max.spout.pending 个 tuple 没有消费完时,spout会停下来,等待下游bolt去消费,当tuple 的个数少于topology.max.spout.pending个数时,spout 会继续从消息源读取消息。(这个属性只对可靠消息处理有用)。
This new feature is aimed for automatic flow control through the topology DAG since different components may have unmatched tuple processing speed. Currently, the tuples may get dropped if the downstream components can not process as quickly, thereby causing a waste of network bandwidth and processing capability. In addition, it is difficult to tune the max.spout.pending parameter for best backpressure performance. Another big motivation is that using max.spout.pending for flow control forces users to enable acking, which does not make sense for the scenarios where acking is not needed and flow control is needed (e.g., the at-most-once cases). Therefore, an automatic back pressure scheme is highly desirable.
In this design, spouts throttle not only when max.spout.pending is hit, but also when any bolt has gone over a high water mark in their receive queue, and has not yet gone below a low water mark again. There is a lot of room for potential improvement here around control theory and having spouts only respond to downstream bolts backing up, but a simple bang-bang controller like this is a great start.
Our ABP scheme implements a light-weight yet efficient back pressure scheme. It monitors certain queues in executors and worker and exploits the callback schemes on ZooKeeper and disruptor queue for a fast-responding (in a push manner) flow control.
Please check the attached figures for more details about the implementation.
https://issues.apache.org/jira/secure/attachment/12761186/aSimpleExampleOfBackpressure.png
- worker executor的接收队列大于高水位,通知反压线程
- worker反压线程通知zookeeper,executor繁忙事件
- 所有worker监听zookeeper executor繁忙的事件
- worker spouts降低发送tuple速度
原文:
https://issues.apache.org/jira/browse/STORM-886
https://github.com/apache/storm/pull/700
在Alibaba的github上,其说到"storm 社区的人想通过动态调整max_pending达到这种效果,其实这种做法根本无效。"。Alibaba的JStorm中有其自己的解决方案。
在我的项目中,遇到反压机制的时候,曾经按照社区的做法,设置那个参数,但是我们的经验是,设定的这个参数有时候不一定有效,我们的做法是手动的删除Backpressure Thread的同步锁。