【问题标题】:Why is Kafka Consumer blocked until timeout, even if there are messages?为什么即使有消息,Kafka Consumer 也会阻塞直到超时?
【发布时间】:2016-02-02 20:06:08
【问题描述】:

我正在使用带有 kafka 的高级客户端(遗留代码)。

但是,我观察到即使主题上有很多可用的消息,消费者仍然会被阻塞,直到 zk-connection-timeout 时间过去。

为什么会这样?我该如何解决这个问题?我感兴趣 - 是否找到消息?走得更远。

<int-kafka:zookeeper-connect id="zookeeperConnect"
    zk-connect="#{kafkaConfig['zooKeeperUrl']}" zk-connection-timeout="5000"
    zk-session-timeout="5000" zk-sync-time="2000" />

<int-kafka:consumer-context id="consumerContext" consumer-timeout="5000" zookeeper-connect="zookeeperConnect">

【问题讨论】:

    标签: spring-integration apache-kafka


    【解决方案1】:

    好吧,当您使用多个主题、配置时,当每个 KafkaStream 被阻塞并参与相同的顺序迭代过程时,这是高级消费者实现的一个已知问题。

    更多信息请参见Slow consumer throuput when using 2 consumer-configuration

    更新

    consumer-timeout 在来自KafkaStreamConsumerIterator 中就是这个意思:

    currentDataChunk = (FetchedDataChunk)this.channel().poll((long)this.consumerTimeoutMs, TimeUnit.MILLISECONDS);
    

    channel 就是 BlockingQueue。我认为这并不奇怪为什么我们在consumer-timeout 的消息检索期间被阻止。

    请在&lt;int-kafka:consumer-configuration&gt; 上找到max-messages 选项。默认为1

    【讨论】:

    • 非常感谢您的回答。我对这个话题很陌生。假设我有一条通常会在 5 秒内阅读的消息。如果我将连接超时设置为 2 秒,那么实际读取的消息少于消息,会发生什么?连接超时只是“检查是否有消息的时间”还是“检查是否有消息并在同一时间范围内阅读该消息的时间”?非常感谢
    • 我尝试了一个主题,它执行以下操作:处理消息并等待消费者超时完成,然后才读取新消息。例如消费者超时=5秒;处理时间=1 秒 - 它将等待 4 秒直到获取新消息
    • 你能告诉我是否有办法说如果收集到最大数量的消息,那么不要再等了?
    • 似乎超时来自等待所有流(分区)成为消息。
    • 不知道为什么需要这样的功能,但您可以简单地将timeout 最小化并在那里等待一小段时间。无论如何,&lt;int-kafka:inbound-channel-adapter&gt;pollable,所以在不同的民意调查之间会有一段时间,我们可能会在那里等待,这并不奇怪。
    猜你喜欢
    • 1970-01-01
    • 2012-05-08
    • 2016-10-07
    • 2018-11-28
    • 1970-01-01
    • 2011-10-11
    • 1970-01-01
    • 2015-11-03
    • 2021-09-24
    相关资源
    最近更新 更多