【问题标题】:Kafka Restart from same offsetKafka 从相同的偏移量重新启动
【发布时间】:2017-11-25 19:26:54
【问题描述】:

我有一个 kafka 消费者,它连接到具有 3 个分区的主题。一旦我从 kafka 获得记录,我想捕获偏移量和分区。重新启动时,我想从上次读取的偏移量恢复消费者的位置

来自 kafka 文档:

每条记录都有自己的偏移量,因此要管理自己的偏移量,您只需执行以下操作:

配置 enable.auto.commit=false

使用每个 ConsumerRecord 提供的偏移量来保存您的 位置。

重启时使用 seek 恢复消费者的位置 (TopicPartition,长)。

这是我的示例代码:

constructor{    
    load data into offsetMap<partition,offset>
    initFlag=true;
}

Main method
{
    ConsumerRecords<String, String> records = consumer.poll(100);
    if(initFlag) // is this correct way to override offset position?
    {
        seekToPositions(offsetMap); 
        initFlag=false;
    }
    while(!shutdown)
    {
        for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                getOffsetPositions();// dump offsets and partitions to db/disk
        }   
   }
}

//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{

    Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
    //code to put partition and offset into map
    //write to disk or db

    }
} // Overrides the fetch offsets that the consumer

public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
            //code get partitions and offset from offsetMap
            consumer.seek(partition, offset);

    }

这是正确的做法吗?有没有更好的办法?

【问题讨论】:

    标签: java apache-kafka


    【解决方案1】:

    如果您提交偏移量,Kafka 将为您存储它们(默认情况下最多 24 小时)。

    这样,如果您的消费者死了,您可以在另一台机器上启动相同的代码,然后从您离开的地方继续。无需外部存储。

    参见https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html中的“偏移量和消费者位置”

    并建议您考虑使用 commitSync

    【讨论】:

      【解决方案2】:

      这对我来说没关系,只是要小心你的消费者是如何构建的(手动分区分配或自动)

      如果分区分配是自动完成的,则需要特别注意处理分区分配发生变化的情况。这可以通过在调用 subscribe(Collection, ConsumerRebalanceListener) 和 subscribe(Pattern, ConsumerRebalanceListener) 时提供一个 ConsumerRebalanceListener 实例来完成。例如,当从消费者那里获取分区时,消费者将希望通过实现 ConsumerRebalanceListener.onPartitionsRevoked(Collection) 来提交这些分区的偏移量。当将分区分配给消费者时,消费者将希望查找这些新分区的偏移量,并通过实现 ConsumerRebalanceListener.onPartitionsAssigned(Collection) 将消费者正确初始化到该位置。

      https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

      【讨论】:

      • 感谢您指出自动分区处理,是的,我正在手动处理分区,如前所述,我只有 1 个连接到所有 3 个分区的消费者,所以我应该没问题跨度>
      【解决方案3】:

      这可以通过控制我们提交的偏移量来解决。

      首先要做的是在消费者应用程序中关闭配置“enable.auto.commit”为“false”,这样您就可以控制何时提交偏移量。

      我们使用 Map 手动跟踪偏移量,如下所示:

      Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
          
          consumer.subscribe(topic, new CommitCurrentOffset());
      
          try {
              ConsumerRecords<String, String> records = consumer.poll(100);
              for (ConsumerRecord<String, String> record : records) {
                  // process the record (ex : save in DB / call external service etc..)
      
                  currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                                     new OffsetAndMetadata(record.offset() + 1, null));  // 1
              }
                  consumer.commitAsync(currentOffsets, null);  // 2
          }
          finally {
              consumer.commitSync(currentOffsets);  // 3
          }
      
        class CommitCurrentOffset implements ConsumerRebalanceListener {  // 4
           public void onPartitionRevoked(Collection<TopicPartition> topicPartitions) {
             consumer.commitSync(currentOffsets);
             consumer.close();
           }
        }
      
      1. 当我们处理每条消息时,我们会在地图中添加已处理消息的偏移量,如下所示:

           currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                                    new OffsetAndMetadata(record.offset() + 1, null)); 
        
      2. 我们将异步处理的消息的偏移量提交给代理。

      3. 如果在处理消息时出现任何错误/异常,我们会提交为每个分区处理的最新消息的偏移量。

      4. 当我们由于重新平衡而即将丢失分区时,我们需要提交偏移量。在这里,我们提交的是我们已处理的最新偏移量(每个循环的 In),而不是我们仍在处理的批处理中的最新偏移量。我们通过实现 ConsumerRebalanceListener 接口来实现这一点。每当触发再平衡时,将在再平衡开始之前和消费者停止处理消息之后调用 onPartitionRevoked() 方法。

      【讨论】:

        猜你喜欢
        • 2017-01-31
        • 2017-12-31
        • 1970-01-01
        • 1970-01-01
        • 2021-01-17
        • 1970-01-01
        • 1970-01-01
        • 2018-01-30
        • 2019-05-18
        相关资源
        最近更新 更多