【问题标题】:Kafka Ensure At Least Once卡夫卡确保至少一次
【发布时间】:2021-09-11 21:41:29
【问题描述】:

Kafka 的第一个项目,试图证明一个事件至少会被处理一次。到目前为止,没有看到重试处理的证据。 dummy app的结构很简单:订阅、处理、发布、提交;如果异常,中止事务并希望它被重试。我正在记录每条消息。

我希望看到 (1)“处理 messageX”(2)“messageX 错误”(3)“处理 messageX”。相反,我看到处理在 messageX 之后继续,即它不会被重新处理。

我看到的是:(1)“处理 messageX”(2)“messageX 错误”(3)“处理 someOtherMessage”。

使用 Kafka 2.7.0、Scala 2.12。 我错过了什么?在下面显示虚拟应用的相关部分。

我还尝试从代码中删除producer(以及对它的所有引用)。

更新 1:我设法通过使用带有consumer.seek() 的偏移量来重新处理记录,即将消费者发送回记录批次的开头。不知道为什么根本没有到达consumer.commitSync()(因为一个例外)还没有这样做。

import com.myco.somepackage.{MyEvent, KafkaConfigTxn}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.slf4j.LoggerFactory
import java.util
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

// Prove that a message can be re-processed if there is an exception
object TopicDrainApp {
  private val logger = LoggerFactory.getLogger(this.getClass)
  private val subTopic = "input.topic"
  private val pubTopic = "output.topic"
  val producer = new KafkaProducer[String, String](KafkaConfigTxn.producerProps)
  producer.initTransactions()
  val consumer = new KafkaConsumer[String, String](KafkaConfigTxn.consumerProps)
  private var lastEventMillis = System.currentTimeMillis
  private val pollIntervalMillis = 1000
  private val pollDuration = java.time.Duration.ofMillis(pollIntervalMillis)

  def main(args: Array[String]): Unit = {
    subscribe(subTopic)
  }

  def subscribe(subTopic: String): Unit = {
    consumer.subscribe(util.Arrays.asList(subTopic))
    while (System.currentTimeMillis - lastEventMillis < 5000L) {
      try {
        val records: ConsumerRecords[String, String] = consumer.poll(pollDuration)
        records.asScala.foreach { record =>
          try {
            lastEventMillis = System.currentTimeMillis
            val event = MyEvent.deserialize(record.value())
            logger.info("ReceivedMyEvent:" + record.value())
            producer.beginTransaction()            
            simulateProcessing(event) // [not shown] throw exception to test re-processing
            producer.flush()
            val offsetsToCommit = getOffsetsToCommit(records)
            //consumer.commitSync()                                        // tried this; does not work
            //producer.sendOffsetsToTransaction(offsetsToCommit, "group1") // tried this; does not work
            producer.commitTransaction()
          } catch {
            case e: KafkaException => logger.error(s"rollback ${record.value()}", e)
              producer.abortTransaction()
          }
        }
      } catch {
        case NonFatal(e) => logger.error(e.getMessage, e)
      }
    }
  }
  private def getOffsetsToCommit(records: ConsumerRecords[String, String]): util.Map[TopicPartition, OffsetAndMetadata] = {
    records.partitions().asScala.map { partition =>
      val partitionedRecords = records.records(partition)
      val offset = partitionedRecords.get(partitionedRecords.size - 1).offset
      (partition, new OffsetAndMetadata(offset + 1))
    }.toMap.asJava
  }
}

object KafkaConfigTxn {
  // Only relevant properties are shown
  def commonProperties: Properties = {
    val props = new Properties()
    props.put(CommonClientConfigs.CLIENT_ID_CONFIG, "...")
    props.put(CommonClientConfigs.GROUP_ID_CONFIG, "...")
    props
  }
  def producerProps: Properties = {
    val props = new Properties()
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") // "enable.idempotence"
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "...") // "transactional.id"
    props.put(ProducerConfig.ACKS_CONFIG, "all")
    props.put(ProducerConfig.RETRIES_CONFIG, "3")
    commonProperties.asScala.foreach { case (k, v) => props.put(k, v) }
    props
  }
  def consumerProps: Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") // "isolation.level"
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    commonProperties.asScala.foreach { case (k, v) => props.put(k, v) }
    props
  }
}

【问题讨论】:

    标签: scala apache-kafka transactions


    【解决方案1】:

    根据我给你的参考资料,你需要在这个过程中使用 sendOffsetsToTransaction,但是你的消费者不会再收到关于中止事务的消息,因为你正在阅读只提交的事务


    引入事务是为了允许在 kafka 到 kafka 之间只进行一次处理,据说 kafka 从第一天起就支持至少一次且最多一次的交付语义,

    为了获得至少一次行为,您禁用自动提交并在处理成功完成时提交,下次调用 poll() 如果您在提交之前有异常,您将再次读取上次提交偏移量的记录

    如果发生异常,要在处理开始之前以这种方式最多获得一次行为,下次调用 poll() 时会收到新消息(但会丢失其他消息)

    Exactly once 是纯 java 中最难实现的(不是在谈论使一切变得更容易的 spring 框架)-它涉及将偏移量保存到外部数据库(通常在您处理完成的地方)并在启动/重新平衡时从那里读取

    对于 java 中的事务使用示例,您可以阅读 baeldung 的这篇优秀指南

    https://www.baeldung.com/kafka-exactly-once

    【讨论】:

    • 感谢您提供 Baeldung 链接。阅读那篇文章后,我编写了代码。它仍然不重新处理消息。不知道我错过了什么。
    • 你的消费者代码和生产者代码是耦合在一起的,你甚至进入了循环吗?只有当你消费消息时你才开始生产?逻辑中的某些东西没有加起来
    • 但是你忽略了一点,至少有一次你根本不需要交易
    • 代码进入循环,消费并打印消息。作为测试,我以 10% 的概率抛出随机异常。当异常被捕获时,事务被回滚(在生产者上),但我再也没有看到那个 messageID,所以看起来它没有被重新处理。在消费之前,应用程序不会产生任何东西,不是吗?
    • 你通常有生产的东西和消费的东西,它们通常是不同的应用程序“生产者”和“消费者”,除非你在谈论卡夫卡流......根据我给的参考你,你需要在这个过程中使用 sendOffsetsToTransaction ,但是你的消费者不会再收到中止交易的消息,因为你正在阅读只提交的交易
    【解决方案2】:

    为演示应用找出正确的方法调用组合(subscribe、beginTransaction、process、commit/abortTransaction 等)。代码的核心是

      def readProcessWrite(subTopic: String, pubTopic: String): Int = {
        var lastEventMillis = System.currentTimeMillis
        val consumer = createConsumer(subTopic)
        val producer = createProducer()
        val groupMetadata = consumer.groupMetadata()
        var numRecords = 0
        while (System.currentTimeMillis - lastEventMillis < 10000L) {
          try {
            val records: ConsumerRecords[String, String] = consumer.poll(pollDuration)
            val offsetsToCommit = getOffsetsToCommit(records)
            // println(s">>> PollRecords: ${records.count()}")
            records.asScala.foreach { record =>
              val currentOffset = record.offset()
              try {
                numRecords += 1
                lastEventMillis = System.currentTimeMillis
                println(s">>> Topic: $subTopic, ReceivedEvent: offset=${record.offset()}, key=${record.key()}, value=${record.value()}")
                producer.beginTransaction()
                val eventOut = simulateProcessing(record.value()) // may throw
                publish(producer, pubTopic, eventOut)
                producer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata)
                consumer.commitSync()
                producer.commitTransaction()
              } catch {
                case e: KafkaException => println(s"---------- rollback ${record.value()}", e)
                  producer.abortTransaction()
                  offsetsToCommit.forEach { case (topicPartition, _) =>
                    consumer.seek(topicPartition, currentOffset)
                  }
              }
            }
          } catch {
            case NonFatal(e) => logger.error(e.getMessage, e)
          }
        }
        consumer.close()
        producer.close()
        numRecords
      }
    // Consumer created with props.put("max.poll.records", "1")
    

    我能够证明这将只处理每个事件一次,即使simulateProcessing() 抛出异常也是如此。准确地说:当处理工作正常时,每个事件只处理一次。如果出现异常,则重新处理该事件,直到成功。就我而言,这些异常并没有真正的原因,因此重新处理将始终以成功结束。

    【讨论】:

      猜你喜欢
      • 2017-06-29
      • 2022-01-22
      • 1970-01-01
      • 2018-02-18
      • 1970-01-01
      • 1970-01-01
      • 2021-01-20
      • 1970-01-01
      • 2018-11-28
      相关资源
      最近更新 更多