【问题标题】:Kafka-consumer. commitSync vs commitAsync卡夫卡消费者。 commitSync 与 commitAsync
【发布时间】:2018-03-14 17:51:35
【问题描述】:

https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1的引述

缺点是虽然 commitSync() 会重试提交,直到它 成功或遇到不可重试的失败,commitAsync() 不会重试。

这句话我不清楚。我想消费者向代理发送提交请求,如果代理在某个超时内没有响应,则意味着提交失败。我错了吗?

能否详细说明commitSynccommitAsync的区别?
另外,请提供我应该更喜欢哪种提交类型的用例。

【问题讨论】:

    标签: java apache-kafka offset kafka-consumer-api


    【解决方案1】:

    使用 commitAsync() 进行强大的重试处理

    在“Kafka - The Definitive Guide”一书中,有关于如何缓解由于异步提交而导致提交较低偏移量的潜在问题的提示:

    重试异步提交:获得正确异步重试的提交顺序的简单模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到 commitAsync 回调中。当您准备发送重试时,检查回调获得的提交序列号是否等于实例变量;如果是,则没有更新的提交,可以安全地重试。如果实例序列号更高,请不要重试,因为已经发送了更新的提交。

    以下代码描述了一个可能的解决方案:

    import java.util._
    import java.time.Duration
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
    import org.apache.kafka.common.{KafkaException, TopicPartition}
    import collection.JavaConverters._
    
    object AsyncCommitWithCallback extends App {
    
      // define topic
      val topic = "myOutputTopic"
    
      // set properties
      val props = new Properties()
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter")
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      // [set more properties...]
      
    
      // create KafkaConsumer and subscribe
      val consumer = new KafkaConsumer[String, String](props)
      consumer.subscribe(List(topic).asJavaCollection)
    
      // initialize global counter
      val atomicLong = new AtomicLong(0)
    
      // consume message
      try {
        while(true) {
          val records = consumer.poll(Duration.ofMillis(1)).asScala
    
          if(records.nonEmpty) {
            for (data <- records) {
              // do something with the records
            }
            consumer.commitAsync(new KeepOrderAsyncCommit)
          }
    
        }
      } catch {
        case ex: KafkaException => ex.printStackTrace()
      } finally {
        consumer.commitSync()
        consumer.close()
      }
    
    
      class KeepOrderAsyncCommit extends OffsetCommitCallback {
        // keeping position of this callback instance
        val position = atomicLong.incrementAndGet()
    
        override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
          // retrying only if no other commit incremented the global counter
          if(exception != null){
            if(position == atomicLong.get) {
              consumer.commitAsync(this)
            }
          }
        }
      }
    
    }
    

    【讨论】:

      【解决方案2】:

      commitAync 不会重试,因为如果它重试就会弄得一团糟。

      假设您尝试提交偏移量 20(异步),但它没有提交(失败),然后下一个轮询块尝试提交偏移量 40(异步),并且成功了。

      现在,提交偏移量 20 仍在等待提交,如果它重新绑定并成功,它将变得一团糟。

      问题在于提交的偏移量应该是 40 而不是 20。

      【讨论】:

      • 查看我的答案以解决这个潜在问题。
      【解决方案3】:

      commitSync 和 commitAsync 都使用了 kafka 偏移量管理功能,并且都有缺点。 如果消息处理成功并且提交偏移失败(非原子)并且同时发生分区重新平衡,则您处理的消息将被其他消费者再次处理(重复处理)。如果您对重复消息处理没问题,那么您可以选择 commitAsync(因为它不会阻塞并提供低延迟,并且它提供了更高阶的提交。所以您应该没问题)。否则,请使用自定义偏移管理,在处理和更新偏移时处理原子性(使用外部偏移存储)

      【讨论】:

        【解决方案4】:

        正如 API 文档中所说:


        这是一个同步提交,将阻塞直到提交成功或遇到不可恢复的错误(在这种情况下,它被抛出给调用者)。

        这意味着,commitSync 是一种阻塞方法。调用它会阻塞你的线程,直到它成功或失败。

        例如,

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                consumer.commitSync();
            }
        }
        

        对于for循环中的每次迭代,只有在consumer.commitSync()成功返回或中断并抛出异常后,您的代码才会移动到下一次迭代。


        这是一个异步调用,不会阻塞。遇到的任何错误都将传递给回调(如果提供)或丢弃。

        这意味着,commitAsync 是一种非阻塞方法。调用它不会阻塞你的线程。相反,它将继续处理以下指令,无论最终成功还是失败。

        例如,类似于前面的例子,但这里我们使用commitAsync

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                consumer.commitAsync(callback);
            }
        }
        

        对于 for 循环中的每次迭代,无论 consumer.commitAsync() 最终会发生什么,您的代码都将移至下一次迭代。而且,提交的结果将由您定义的回调函数处理。


        权衡:延迟与数据一致性

        • 如果你必须保证数据的一致性,选择commitSync(),因为它会确保在做任何进一步的操作之前,你会知道偏移提交是成功还是失败。但由于它是同步和阻塞的,您将花费更多时间等待提交完成,从而导致高延迟。
        • 如果您对某些数据不一致并希望有低延迟,请选择commitAsync(),因为它不会等待完成。相反,它只会发送提交请求并稍后处理来自 Kafka 的响应(成功或失败),同时,您的代码将继续执行。

        这都是一般来说,实际行为将取决于您的实际代码以及您调用该方法的位置。

        【讨论】:

        • consumer.poll 已弃用,如何将数据转换为 ConsumerRecords ?
        • 还值得一提的是,commitAsync(callback)callback 将在与consumer.poll() 相同的线程中被调用。
        猜你喜欢
        • 2019-07-03
        • 2018-05-05
        • 2021-08-22
        • 1970-01-01
        • 1970-01-01
        • 2020-10-28
        • 2015-12-18
        • 2019-03-27
        • 2017-01-07
        相关资源
        最近更新 更多