【问题标题】:Kafka Rebalancing. Duplicate processing issue卡夫卡再平衡。重复处理问题
【发布时间】:2018-04-16 15:18:51
【问题描述】:

我有一个消费者工作者应用程序,它在内部启动 X 线程数,每个线程都在生成它的 KafkaCosnumer。 Cosnumers 拥有相同的groupId 并订阅相同的主题。因此,每个消费者都能获得公平的分区份额。

处理的本质是我不能丢失消息,也不能允许重复。我运行的kafka版本是0.10.2.1。

这是我面临的问题:消费者线程 1 开始使用消息,并在 poll() 上获得一批消息。我还实现了ConsumerRebalanceListener,因此每次成功处理消息时,它都会被添加到offsets 映射中。 (见下面的代码。)因此,一旦发生重新平衡,我可以在我的分区重新分配给其他消费者之前提交我的偏移量。 有时,为了处理该批次,它需要比max.poll.interval.ms 更长的时间,这是重新平衡发生的地方,分区从消费者 1 中拉出并分配给消费者 2。消费者 1 不知道分区已被撤销并继续处理消息,与此同时,消费者 2 从最后一个偏移量(由 RebalanceListener 提交)拾取并处理相同的消息。

有没有办法通知消费者他已经撤销了分区,以便他可以停止处理循环中已经分配给其他消费者的消息?

public class RebalanceListener<K, V> implements ConsumerRebalanceListener {

    private final KafkaConsumer<K, V> consumer;

    private static final ConcurrentMap<TopicPartition, OffsetAndMetadata> CURRENT_OFFSETS =
            Maps.newConcurrentMap();

    private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);

    public RebalanceListener(KafkaConsumer<K, V> consumer) {
        this.consumer = consumer;
    }

    public void addOffset(String topic, int partition, long offset) {
        LOGGER.debug("message=Adding offset to offsets map, topic={}, partition={}, offset={}",
                topic, partition, offset);
        CURRENT_OFFSETS.put(new TopicPartition(topic, partition),
                new OffsetAndMetadata(offset, "commit"));
    }

    public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
        return CURRENT_OFFSETS;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        LOGGER.debug("message=following partitions have been revoked from consumer: [{}]",
                partitions.stream().map(
                        topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
                        .collect(joining(",")));
        LOGGER.debug("message=Comitting offsets for partititions [{}]",
                CURRENT_OFFSETS.keySet().stream().map(
                        topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
                        .collect(joining(",")));
        consumer.commitSync(CURRENT_OFFSETS);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        LOGGER.debug("message=following partitions have been assigned to consumer: [{}]",
                partitions.stream().map(
                        topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
                        .collect(joining(",")));
    }

}

我想我可以在RebalanceListener 内部创建consumerId -- TopicPartition 的并发映射,然后在处理每条消息之前检查当前消费者是否仍与记录相关联(每个ConsumerRecord 都有topicpartition 字段)。 如果没有 - 打破循环并创建下一个poll()

如果我的工作应用程序将在一个实例中运行,这将是一个可行的解决方案,即使有多个 KafkaConsumer 线程在旋转。但是一旦我扩大规模,我将无法在静态地图中存储偏移量和消费者主题分区映射。这必须是某种集中式存储、数据库,或者说是 Redis。

但是,在我每次处理一个项目之前,我都必须询问我的记录是否可以被当前的消费者线程合法地处理。在扩展工作应用程序的情况下,这将是对外部存储的网络调用,这会破坏使用 kafka 的目的,因为它会减慢处理速度。我可能只是选择在处理单个项目后执行偏移提交。

【问题讨论】:

  • Kafka 1.0 于上周发布。 0.11 的特性之一应该是 once delivery
  • @StuartLC 我知道 exactly once 语义是在 0.11 中引入的,不幸的是,我还不能升级到那个版本。
  • @StuartLC 所以,我阅读了您在上面链接的一篇文章。我不太明白 0.11 中的消费者如何处理我的情况。我消费的消息仍有可能需要超过max.poll.interval.ms 才能得到处理。 (我在处理完一整批味精后提交)。消息将处于进程循环中,直到所有消息都得到处理。在重新平衡发生之前的某个时间。即使发生了再平衡,我的旧消费者仍在继续处理。与此同时,其他消费者接管被撤销的分区。并且消息将被处理两次。

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

你需要实现 onPartitionsRevoked()

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked(java.util.Collection)

保证所有消费者进程都会调用 onPartitionsRevoked 在任何进程调用之前 onPartitionsAssigned。因此,如果偏移量或其他状态保存在 onPartitionsRevoked 调用它保证在 接管该分区的进程具有其 onPartitionsAssigned 调用回调以加载状态。

【讨论】:

  • 我想你没有仔细阅读我的问题。在我提供的 sn-p 中,我确实实现了 onPartitionsRevoked()。问题不在于接管处理的消费者,而在于被认为已死但仍在处理上次调用 poll() 期间消耗的消息的消费者
  • 是的,但是您没有按照文档中的说明在 onPartitionsRevoked() 实现中外部存储偏移量。我认为您不能将已撤销的分区的偏移量提交回 Kafka。如果您想向 Kafka 提交偏移量,您需要在完成处理时以及在消费者撤销其分区之前执行此操作。
  • 我有几个消费者线程在同一个 JVM 中运行。我的重新平衡侦听器具有addOffset() 方法,它正在向静态并发映射添加偏移量。这意味着我所有的消费者线程都将处理相同的偏移图。现在,如果我们回到上面引用的 sn-p。它表示所有消费者进程都将调用onPartitionsRevoked()。这意味着我所有的偏移量都将在内部保持原样。
  • 我知道我的实现有缺陷,如果我扩展我的应用程序,我将不再在同一个 JVM 上运行它,我将不得不使用外部存储。但这不是我要解决的问题。我不知道如何停止处理已被协调器宣布死亡的消费者已经消费的消息,但显然仍然存在(超过max.poll.interval.ms)并且仍在循环中处理这些消息。
  • 要么将偏移量保存在 Kafka 外部。否则你会得到重复。性能和重复总是相互权衡
【解决方案2】:

ConsumerRebalanceListener 的 javadoc 说

此回调将仅在用户线程中作为 每当分区分配发生变化时调用 poll(long)。

因此,您不必担心在处理 poll() 返回的最后一批消息的过程中会发生分区重新分配。在您处理完所有这些消息并再次调用 poll() 之前,它不会发生。

javadoc 还说:

保证所有消费者进程都会调用 onPartitionsRevoked 在任何进程调用之前 onPartitionsAssigned。因此,如果偏移量或其他状态保存在 onPartitionsRevoked 调用它保证在 接管该分区的进程具有其 onPartitionsAssigned 调用回调以加载状态。

【讨论】:

  • 这是 kafka 的正常行为,但他询问 max.poll.interval.ms 限制。当您的工作时间超过此限制时,您先前轮询的消息将由新的分区受让人再次轮询。在这种情况下,两个工作人员同时处理相同的消息。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-05-08
  • 1970-01-01
  • 2019-10-31
  • 2022-08-08
  • 1970-01-01
  • 2018-12-21
  • 1970-01-01
相关资源
最近更新 更多