【问题标题】:How to reduce ingestion rate of Kafka Spout and enable Back pressure?如何降低 Kafka Spout 的摄取率并启用背压?
【发布时间】:2018-08-03 03:06:18
【问题描述】:

我正在使用storm-kafka-client 1.1.1 和storm-core 1.1.0。

我已经调整了以下参数,但无法启用背压并降低 kafka-spout 的摄取率。

Spout 每秒消耗 2000 条消息。

下游 Bolt 处理一条消息需要 50 毫秒,即每秒处理 20 条消息。

spout 发出的元组和 bolt 执行的元组之间的延迟随着时间的推移而增加。

**如何让 Spout 每秒读取 20 条消息并保持其消耗率与 Bolt 的执行率相同**

   **Topology**

   topology.max.spout.pending= **5** , 
   topology.message.timeout.secs= **600** , 
   topology.executor.send.buffer.size=**64** , 
   topology.executor.receive.buffer.size=**64** , 
   topology.transfer.buffer.size=**64**

   **KafkaSpoutConfig**

   setPollTimeoutMs(**200**) , 
   setFirstPollOffsetStrategy(latest) , 
   setMaxUncommittedOffsets(**2_000_000**) , 
   setGroupId(groupName) , 
   setProp("fetch.max.wait.ms",**1000**) , 
   setProp("max.poll.records", **100**) , 
   setMaxPartitionFectchBytes(**512**)  , 
   setProp("send.buffer.bytes", **512**) , 
   setProp("receive.buffer.bytes", **512**) , 
   setPartitionRefreshPeriodMs(30_000).setProp("enable.auto.commit", "true") , 
   setProp("session.timeout.ms", "**60000**") , 

   KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(**50**) ,
   KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(**5**) , 1 ,
   KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(**1**) ) ;

我不确定应该为 TOPOLOGY_SPOUT_WAIT_STRATEGYBACKPRESSURE_DISRUPTOR_HIGH_WATERMARK

设置什么值

那么上述参数和值的组合可以帮助控制 spout 摄取率?

任何建议都将受到高度赞赏。

谢谢 卡尼斯卡

【问题讨论】:

    标签: apache-kafka apache-storm kafka-consumer-api confluent-platform backpressure


    【解决方案1】:

    TOPOLOGY_SPOUT_WAIT_STRATEGY 仅在要求 spout 发出新元组时使用,并且它不发出任何内容(即,如果没有新消息)。它应该对背压没有任何影响。

    我对当前的背压实现不太熟悉,但我很确定您需要使用 TOPOLOGY_BACKPRESSURE_ENABLE 显式启用它。

    BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK 是一个比率,因此如果您将其设置为例如0.9 当螺栓的输入队列已满 90% 时,它将限制 spout。您可以在https://github.com/apache/storm/blob/1.1.x-branch/storm-core/src/jvm/org/apache/storm/Config.java 找到设置文档,在https://github.com/apache/storm/blob/1.1.x-branch/conf/defaults.yaml 找到默认值

    为了避免一次发出过多的元组,我认为您应该将 topology.max.spout.pending 设置为一些合理数量的元组(可能是几百个?)。确保您的拓扑设置为启用确认(即将 topology.enable.message.timeouts 设置为 true)。否则 max spout pending 无效。

    不确定为什么要更改执行器缓冲区大小。

    您还应该考虑将 Storm 和storm-kafka-client 升级到至少1.1.2。最近对storm-kafka-client进行了很多修复,如果您升级,可能会更轻松。

    我不确定你代码中的星星是什么意思?

    【讨论】:

    • 非常感谢您的解释。将设置您提到的标志。在阅读了以下博客文章后,我更改了“执行程序缓冲区大小”〜'jobs.one2team.com/apache-storms'〜“背压在默认参数下无法开箱即用的原因是我们有一个漂亮的螺栓处理时间长,处理一条消息通常需要 0.1 秒 .. spout 的速度足以填充这些慢速螺栓的缓冲区... 我们必须调整的主要参数是缓冲区大小 "
    猜你喜欢
    • 2017-08-24
    • 2019-12-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-27
    • 2016-06-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多