【问题标题】:Kafka 0.9 How to re-consume message when manually committing offset with a KafkaConsumerKafka 0.9 如何在使用 KafkaConsumer 手动提交偏移量时重新使用消息
【发布时间】:2016-04-26 09:38:11
【问题描述】:

我正在编写一个消费者,一旦将一系列记录提交给 Mongo,它就会手动提交偏移量。
在 Mongo 错误或任何其他错误的情况下,尝试将记录保存到错误处理集合中 以便日后重播。 如果 Mongo 已关闭,那么我希望消费者在尝试从 Kakfa 的未提交偏移中读取记录之前停止处理一段时间。
下面的示例有效,但我想知道这种情况的最佳实践是什么?

while (true) {
    boolean commit = false;
    try {
        ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
        kafkaMessageProcessor.processRecords(records);
        commit = true;
    }
    catch (Exception e) {
        logger.error("Unable to consume closing consumer and restarting", e);
        try {
           consumer.close();
        }
        catch (Exception consumerCloseError) {
            logger.error("Unable to close consumer", consumerCloseError);
        }
        logger.error(String.format("Attempting recovery in  [%d] milliseconds.", recoveryInterval), e);
        Thread.sleep(recoveryInterval);
        consumer = createConsumer(properties);
    }
    if (commit) {
        consumer.commitSync();
    }

}

private KafkaConsumer<K, V> createConsumer(Properties properties) {
    KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
    consumer.subscribe(topics);
    return consumer;
}

如果我不重新创建消费者,我会收到以下错误。

o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator 2147483647 dead.
o.a.k.c.c.internals.ConsumerCoordinator  : Error ILLEGAL_GENERATION occurred while committing offsets for group test.consumer

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    这是我使用客户端版本 0.10.0 的代码。

    您的要求似乎还可以。

    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.clients.consumer.OffsetCommitCallback;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MessageProcesser implements Runnable {
    
        private static Logger logger = LoggerFactory.getLogger(MessageProcesser.class);
    
        private final ExecutorService pool = Executors.newFixedThreadPool(4);
    
        private final KafkaConsumer<String, String> consumer;
    
        private final String topic;
    
        private final AtomicBoolean closed = new AtomicBoolean(false);
    
        public MessageProcesser(String groupId, String topic, String kafkaServer) {
            this.topic = topic;
            Properties props = new Properties();
            props.put("bootstrap.servers", kafkaServer);
            props.put("group.id", groupId);
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("enable.auto.commit", "false");
            this.consumer = new KafkaConsumer<>(props);
        }
    
        @Override
        public void run() {
            try {
    
                consumer.subscribe(Collections.singleton(topic));
    
                while (true) {
                    if (closed.get()) {
                        consumer.close();
                    }
    
                    ConsumerRecords<String, String> records = consumer.poll(1000 * 60); 
                    for (ConsumerRecord<String, String> record : records) {
    
                        String value = record.value();
                        if (null == value) {
                            continue;
                        }
    
                        boolean processResult = false;
                        try {
                            Future<Object> f = pool.submit(new ProcessCommand(value));
                            processResult = (boolean) f.get(100, TimeUnit.MILLISECONDS);
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                        }
    
                        if (!processResult) {
                            //here if process fail, seek to current offset
                            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
                        } else {
                            this.commitAsyncOffset(record);
                        }
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                if (!closed.get()) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e1) {
                        // ignore
                    }
                }
            }
        }
    
        public void shutdown() {
            closed.set(true);
        }
    
        public void commitAsyncOffset(ConsumerRecord<String, String> record) {
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
    
            consumer.commitAsync(offsets, new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
                    if (e != null) {
                        logger.error("kafka offset commit fail. {} {}", offsets, PushUtil.getStackString(e.getStackTrace()));
                    }
                }
            });
        }
    }
    

    【讨论】:

    • 先生,有必要寻找吗?
    • 是的,寻找是必要的。 Java 客户端记住当前偏移量。
    • 如果记录有多个并且您收到错误,则此代码有问题。您不应该使用相同的分区处理后面的记录。
    • 有没有办法提交单个消息(consumerRecord)?就我而言,一旦我提交了一条记录,它的记录的整个分区就会被提交!!!
    【解决方案2】:

    如果您没有提交偏移量并且 auto.commit.enable 属性为 false,那么当对 Mongo 的调用失败时,您只需等待您认为必要的时间并重试 poll()。

    你看到的问题是新的消费者使用 poll() 作为心跳机制,所以如果你等待超时请求的时间更长,那么主题的协调器将踢出消费者,因为它会认为已经死了它将重新平衡该组。所以等待 mongo,但你可能想在一段时间内 poll() 。

    编辑:作为一种解决方法,您可以将此属性设置为更高的 request.timeout.ms

    希望对你有帮助!

    【讨论】:

    • 感谢您的帮助。这解决了我的消费者启动的第二个问题。为了重新处理消息而不是重新创建消费者,可以调用 consumer.seekToBeginning()。
    • consumer.seekToBeginning(partitions) 会将偏移量重置为您发送的所有分区中的第一个位置。我看不出这对您的用例有何帮助,如果您重置为乞求,您将不得不重新处理所有事件。
    • 它将重新处理上一次偏移提交的所有事件。这个假设不正确吗?我想继续尝试重新处理,直到 Mongo 再次可用。没有这个,投票只会消耗下一条消息。
    • 如果您轮询消息并且没有提交偏移量,那么下次轮询时您将(可能)收到相同的消息(如果 auto.commit.offset 为 false)如果您执行 consumer.seekToBeginning(partitions) 它将转到您的分区拥有的最早消息,并将从那里开始消费。
    • 即使我没有提交并将 auto.commit.offset 设置为 false,下一次和后续的轮询也不会收到未提交的消息。也许它是一个缺陷。如果我重新启动应用程序,它们将再次被消耗。
    【解决方案3】:

    据我了解,(新)客户端是保留消耗的偏移量的客户端。提交将偏移量发送到服务器,但它对来自该客户端的下一次轮询没有影响,因为客户端对服务器说“给我关于该偏移量的下一条消息”。 那么为什么将偏移量发送到服务器?为下一次再平衡。因此,服务器使用提交的偏移量的唯一情况是当某些客户端死亡/断开连接时 - 然后重新平衡分区,并通过这种重新平衡客户端从服务器获取偏移量。

    所以如果你不提交offset然后调用poll(),你不能期望消息会被再次读取。为此,必须有可能回滚客户端中的偏移量。我没有尝试,但我认为调用 KafkaConsumer.seek 到失败消息的偏移量应该可以解决问题。

    https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)

    顺便说一句,这样你甚至可以提交最后一个成功处理的消息并寻找第一个失败的消息,这样当中间的某些消息发生失败时,你不需要重复整个记录列表。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-01-24
      • 2018-02-07
      • 2021-04-06
      • 2019-07-05
      • 2016-09-04
      • 1970-01-01
      • 2018-11-23
      相关资源
      最近更新 更多