【问题标题】:When kafka consumer poll return null records?当kafka消费者轮询返回空记录?
【发布时间】:2017-09-22 01:40:56
【问题描述】:

如下所示,我的代码是高级消费者在 kafka 服务器中获取具有 32 个分区的主题,我很困惑为什么有时我会从 consumer.poll() 得到一个空的返回。 我尝试增加轮询超时,然后当我将超时增加到 1000 时,每次轮询都有返回数据,而我将超时设置为 10 或 0,然后我看到很多空返回。

谁能告诉我如何设置正确的超时时间?

  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "kafka-01:9098")
    props.put("group.id", "kch1")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    //props.put("max.poll.records", "1000")
    val consumers = new Array[KafkaConsumer[String, String]](16)
    for(i <- 0 to 15) {
      consumers(i) = new KafkaConsumer[String, String](props)
      consumers(i).subscribe(util.Arrays.asList("veh321"))
    }
    var cnt = 0
    var cacheIterator: Iterator[ConsumerRecord[String, String]] = null
    for(i <- 0 to 15) {
      new Thread(new Runnable {
        override def run(): Unit = {
          var finish = false
          while(!finish) {
            val start = System.currentTimeMillis()
            cacheIterator = consumers(i).poll(100).iterator()
            val end = System.currentTimeMillis() - start
            if (end > 10 ) {
              println(s"${Thread.currentThread().getId} + Duration is ${end}, ${cacheIterator.hasNext} ${cacheIterator.size}")
            }
          }
        }
      }).start()
    }

【问题讨论】:

    标签: java scala apache-spark apache-kafka


    【解决方案1】:

    Java 消费者通过调用 java.nio.channels.Selector.select(timeout) 使用 Linux 的 epoll 作为底层实现方案。如果您只给它 100 毫秒的时间来尝试在该短时间间隔内准备好多少个 SelectionKey,它很可能什么也不返回。

    另外,在同样的 100 毫秒内,消费者还会做一些其他的工作,包括轮询协调器状态,所以记录轮询的实时间隔显然小于 100 毫秒,这使得检索一些真正有用的东西变得更加困难。

    【讨论】:

    • 所以如果我将轮询超时设置为 1000 毫秒,我可以看到所有轮询都有数据返回,其中一些花费大约 200 毫秒,而另一些只花费几毫秒,那么当轮询花费时会发生什么需要几百秒?有什么参数可以转这个吗?
    • 在检索记录之前需要完成许多底层操作,例如获取元数据,管理连接和组等。如果您正在使用,您可以调整max.poll.records来控制单轮投票的消息数Java 消费者。
    • 谢谢你的帮助,所以如果我想要一个大的吞吐量,我可以设置一个大的 poll.records 和一个大的轮询超时,这样可以减少轮询的次数,对吗?
    • 坦白说,我不同意。 TPS 和适用于任何地方的配置之间没有这样的调整。需要进行彻底的性能测试。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-11-15
    • 2019-05-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多