【问题标题】:Slow consumer throuput when using 2 consumer-configuration使用 2 个消费者配置时消费者吞吐量较慢
【发布时间】:2015-04-09 08:27:46
【问题描述】:

使用 spring-integration-kafka 扩展和以下配置:

<int-kafka:zookeeper-connect id="zookeeperConnect"
    zk-connect="#{kafkaConfig['zooKeeperUrl']}" zk-connection-timeout="10000"
    zk-session-timeout="10000" zk-sync-time="2000" />

<int-kafka:consumer-context id="consumerContext" consumer-timeout="5000" zookeeper-connect="zookeeperConnect">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration
                group-id="realtime-services-consumer-grp" 
                value-decoder="purchaseDecoder" 
                key-decoder="kafkaReflectionDecoder"
                max-messages="5" >
            <int-kafka:topic id="purchase" streams="1" />
        </int-kafka:consumer-configuration>
        <int-kafka:consumer-configuration 
                group-id="realtime-services-consumer-gw"
                value-decoder="eventDecoder" 
                key-decoder="kafkaReflectionDecoder" 
                max-messages="10" >
            <int-kafka:topic id="event" streams="1" />
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

<int-kafka:inbound-channel-adapter
    id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
    auto-startup="true" channel="inputFromKafka">
    <int:poller fixed-delay="20" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>

例如,当我评论第一个 consumer-configuration 时,我每分钟可以有 300 个事件,没有问题。但是当两者都被激活时。我的吞吐量非常低。来自这两个主题的总吞吐量低于每分钟 50 个。

有人知道为什么我在阅读 2 个主题时表现如此糟糕吗?我在配置中做错了什么?

【问题讨论】:

  • 你的kafkaReflectionDecoder 和其他人是什么?他们不能成为瓶颈吗?
  • 我的消息中的key总是空的,另一个是avro解码器。问题是当我有两个消费者时会发生瓶颈,而我在解码器中没有同步。
  • 你介意看看&lt;int-kafka:message-driven-adaptep&gt;,直到我们发现发生了什么? spring.io/blog/2015/02/09/…
  • @ArtemBilan 在我看来,这种消费消息的方式是循环方式,因为我在主题购买中的消息很少,但有很多消息事件,看起来它被卡住了大部分时间从购买轮询。让事件没有机会被消耗。我的 CPU 平均为 0%。购买并没有滞后,而事件有很大的滞后。

标签: spring spring-integration apache-zookeeper apache-kafka


【解决方案1】:

感谢您指出这一点!

在与我当地的 Kafka claster 进行了一番激烈的斗争后,我已经能够重现您的问题,并且我为您提供了一些解决方法:-)。

首先不是round-robin,而是一个接一个:

for (final ConsumerConfiguration<K, V> consumerConfiguration : getConsumerConfigurations().values()) {
    Map<String, Map<Integer, List<Object>>> messages = consumerConfiguration.receive();

如果KafkaStream 现在没有消息,那么每个consumerConfigurationconsumer-timeout="5000" 期间在后台被阻止。因此,来自&lt;int-kafka:inbound-channel-adapter&gt; 的整个poll 任务被阻塞,直到超时或更糟:如果每个主题都没有消息,则整个等待超时就是超时的总和!

要解决此问题,您可以减少 consumer-timeout="5000" 或为每个主题提供多个 &lt;int-kafka:consumer-context&gt;&lt;int-kafka:inbound-channel-adapter&gt;

是的,它看起来很奇怪,而且我们在发布前没有时间来看看这个真的很糟糕,但无论如何,请随时提出 JIRA 问题来解决它。

谢谢!

【讨论】:

  • 这个问题会在下一个即将发布的版本中修复吗?
  • 我暂时采用了您推荐的解决方案,方法是创建多个消费者上下文并将消费者超时减少到一个更小的值。它对我的情况很好。
猜你喜欢
  • 2013-04-16
  • 2020-10-13
  • 2012-07-05
  • 2022-01-22
  • 1970-01-01
  • 1970-01-01
  • 2020-09-19
  • 2014-07-11
  • 2011-10-24
相关资源
最近更新 更多