【问题标题】:Kafka - Consume until empty卡夫卡 - 消费直到空
【发布时间】:2018-04-17 12:48:51
【问题描述】:

我有一个用例,在获取 KafkaConsumer 中的所有消费者记录之前不要继续操作是至关重要的。在这个用例中,不会有任何东西进入管道。什么是确保绝对没有任何东西可以获取的正确方法?

【问题讨论】:

  • 您确定这是正确的做法吗?好像您正在将流式解决方案变成批量解决方案?
  • 当(如果)我们的网关已经完成并且消息还没有到达 Cassandra 时,我们想要使用队列中的消息来查找我们最后发布的修订号。
  • 如果知道没有追加新数据,可以通过Consumer#endOffsets获取日志末尾,当Consumer#position到达末尾时终止读取。

标签: apache-kafka kafka-consumer-api


【解决方案1】:

Kafka 旨在处理无限的数据流,因此“全部消耗”仅意味着在一段时间(1 分钟)、1 小时等内没有人发送任何数据 - 这取决于您。

你可以使用类似(伪代码):

int emptyCount = 0;
while (true) {
   records = Consumer.poll(500);
   if (records.empty()) {
      emptyCount++;
      if (emptyCount >= 100) {
         break;
      }
      continue;
   }
   emptyCount = 0;
   ...process records...
}

您可以调整轮询超时和空周期数以达到必要的等待期。

【讨论】:

  • 嗯,这可能行得通。现在我正在查看 seekToBeginning/seekToEnd 然后使用位置来计算两者是否相同并将其视为流为空,但这可能在语义上不正确,我不知道。
  • 这样做与长时间超时调用 poll 之间的语义区别是什么?如前所述,在执行此检查时,没有任何内容输入到 Kafka。
  • 如果你调用poll的超时时间长于心跳,那么你的消费者将被视为死亡。
  • 请注意,数据会一直保留在主题中,直到过期 - 默认情况下可能是几天
  • @AlexOtt "如果你调用 poll 的超时时间超过了心跳,那么你的消费者将被认为是死的。" -- 这仅适用于较旧的 Kafka 版本。比较:stackoverflow.com/questions/39730126/…
【解决方案2】:

如果您使用kafka-console-consumer,您可以指定timeout-ms 参数来定义它将等待多长时间,直到它被认为不再有消息到来。

--timeout-ms <Integer: timeout_ms>      If specified, exit if no message is    
                                          available for consumption for the    
                                          specified interval.  

【讨论】:

    猜你喜欢
    • 2020-10-28
    • 1970-01-01
    • 2018-11-07
    • 2019-07-03
    • 2018-05-05
    • 2021-08-22
    • 1970-01-01
    • 2020-10-28
    • 2015-12-18
    相关资源
    最近更新 更多