【问题标题】:How to slow down or set given speed on the Kafka stream consumer?如何减慢或设置 Kafka 流消费者的给定速度?
【发布时间】:2017-12-02 17:15:58
【问题描述】:

我正在尝试控制 KStream 消耗的消息数量,但我不是很成功。

我正在使用: max.poll.interval.ms=100max.poll.records=20 每秒收到大约 200 条消息。

但这似乎不是很好,因为我看到我的统计数据中每秒也有大约 500 条消息。

我还应该在流消费者一侧设置什么?

【问题讨论】:

    标签: performance queue apache-kafka apache-kafka-streams


    【解决方案1】:

    我正在使用:max.poll.interval.ms=100 和 max.poll.records=20 来获取 比如每秒 200 条消息。

    max.poll.interval.ms 和 max.poll.records 属性不能以这种方式工作。

    ma​​x.poll.interval.ms 表示消费者在主题的每次消费者轮询之间必须等待的最大时间间隔(以毫秒为单位)。

    ma​​x.poll.records表示消费者在主题的每次消费者轮询期间可以消费的最大记录数。

    每次轮询之间的间隔不受上述两个属性的控制,而是由您的消费者确认获取的记录所用的时间控制。

    例如,假设一个主题 X 存在 1000 条记录,消费者确认获取的记录所用的时间为 20 毫秒。 max.poll.interval.ms = 100 和 max.poll.records = 20,消费者将每 20ms 轮询一次 Kafka 主题,并且在每次轮询中,最多将获取 20 条记录。如果确认获取记录所花费的时间大于 max.poll.interval.ms,则轮询将被视为失败,并且将从 Kafka 主题重新轮询该特定批次。

    【讨论】:

    • 意思是,你建议,我需要自己编程放慢速度,不要使用自动提交,或者?
    • 是的。减慢程序预期的等待时间并使用手动提交。但是,我仍然不确定您为什么要降低消费者的性能。
    • 在 Kafka Streams 中手动提交实际上是不可能的......或者这个问题不是针对 Streams API 的?
    【解决方案2】:

    您可以在消费者端使用类似 akka-stream-kafka(又名 reactive-kafka)的东西。 akka-streams 有很好的节流功能,在这里可以派上用场:

    http://doc.akka.io/docs/akka/snapshot/java/stream/stream-quickstart.html#time-based-processing

    【讨论】:

      【解决方案3】:

      一个KafkaConsumer(也是KafkaStreams内部使用的那个)尽可能快地读取记录。

      您提到的参数可能会对性能产生影响,但您无法控制实际的数据速率。另请注意,max.poll.records 仅配置poll() 返回的记录数,但对客户端-代理通信没有影响。 KafkaConsumer 可以在与代理通信时获取更多记录,然后在 poll() 上返回缓冲消息,只要记录在缓冲区中(即,对于这种情况,poll() 是客户端操作符,它只确保您不会通过max.poll.interval.ms 超时)。因此,您可能对fetch.max.bytes 更感兴趣,它决定了从代理获取的字节大小。如果您减少此参数,消费者的效率会降低,因此吞吐量应该会降低。 (虽然不推荐)。

      配置吞吐量的另一种方法是配额 (https://kafka.apache.org/documentation/#design_quotas),它是一种代理端配置,允许您限制客户端可以读取和/或写入的数据量。

      在 Kafka Streams 中(以及使用普通 KafkaConsumer 时)最好的做法是手动限制对 poll() 的调用。对于 Kafka Streams,您可以将 Thread.sleep() 添加到任何 UDF 中。如果您不想将其搭载到现有运算符中,您可以添加一个带有临时状态(即类成员变量)的foreach() 来跟踪吞吐量并计算您需要睡眠多少才能相应地限制吞吐量.

      【讨论】:

      • 最后(它已经在生产软件中)我实现了最后一个建议 - 消息节流器:-)。感谢您对想法的确认和总结。
      • 感谢@Matthias J. Sax,所以如果使用thread.sleep(1 sec) with max.poll.records=100,我可以保证消费者每秒的最大消费率=100, (至少从本地缓冲区返回到消费者应用程序轮询循环的内容)。是否可以推断f返回的记录小于100,那么我的到达率小于100。更一般地说,当返回的记录小于100时,通过上述配置可以得出什么结论....如何消费率是否与到达率相关(如果有)?谢谢
      • 动态观察了消费率的变化顺序,什么事件表明消费者跟不上到达率,是不是出现了几条消息的延迟?有什么提示吗?
      • 如果您使用sleep(),它将针对每条记录执行。因此,您可能希望使用 10 毫秒来达到每秒大约 100 条记录。 -- 是的,滞后(或者更准确地说,增长滞后)是消费者无法跟上的指标。
      【解决方案4】:

      在 Kafka 中出现了 Kafka Quota 的新概念。

      所有细节都在这里Kafka -> 4.9 Quotas

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-10-24
        • 1970-01-01
        • 1970-01-01
        • 2017-05-04
        • 1970-01-01
        • 2019-12-02
        相关资源
        最近更新 更多