【发布时间】:2018-08-03 03:06:18
【问题描述】:
我正在使用storm-kafka-client 1.1.1 和storm-core 1.1.0。
我已经调整了以下参数,但无法启用背压并降低 kafka-spout 的摄取率。
Spout 每秒消耗 2000 条消息。
下游 Bolt 处理一条消息需要 50 毫秒,即每秒处理 20 条消息。
spout 发出的元组和 bolt 执行的元组之间的延迟随着时间的推移而增加。
**如何让 Spout 每秒读取 20 条消息并保持其消耗率与 Bolt 的执行率相同**
**Topology**
topology.max.spout.pending= **5** ,
topology.message.timeout.secs= **600** ,
topology.executor.send.buffer.size=**64** ,
topology.executor.receive.buffer.size=**64** ,
topology.transfer.buffer.size=**64**
**KafkaSpoutConfig**
setPollTimeoutMs(**200**) ,
setFirstPollOffsetStrategy(latest) ,
setMaxUncommittedOffsets(**2_000_000**) ,
setGroupId(groupName) ,
setProp("fetch.max.wait.ms",**1000**) ,
setProp("max.poll.records", **100**) ,
setMaxPartitionFectchBytes(**512**) ,
setProp("send.buffer.bytes", **512**) ,
setProp("receive.buffer.bytes", **512**) ,
setPartitionRefreshPeriodMs(30_000).setProp("enable.auto.commit", "true") ,
setProp("session.timeout.ms", "**60000**") ,
KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(**50**) ,
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(**5**) , 1 ,
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(**1**) ) ;
我不确定应该为 TOPOLOGY_SPOUT_WAIT_STRATEGY 和 BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK
设置什么值那么上述参数和值的组合可以帮助控制 spout 摄取率?
任何建议都将受到高度赞赏。
谢谢 卡尼斯卡
【问题讨论】:
标签: apache-kafka apache-storm kafka-consumer-api confluent-platform backpressure