【问题标题】:Kafka consumer stops consuming if one of brokers become unavailable如果其中一个代理不可用,Kafka 消费者将停止消费
【发布时间】:2019-06-23 10:20:49
【问题描述】:

我在单个 Windows 主机上安装了两个 Kafka 2.1.0 代理。默认复制因子设置为 2。所有其他设置均为默认值。

即使我关闭了其中一个代理,生产者也能够将消息写入日志。但是在这种情况下,消费者会停止消费消息。即使我重新启动它,它也不会分配分区。它只是写入以记录此警告:

main - org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-1, groupId=sout] 无法建立到节点 -2 (/192.168.0.1:19092) 的连接。经纪人可能不可用。

消费者:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

public final class PumpToConsoleSimple {

  private static final Duration pollTimeout = Duration.ofSeconds(10);

  public static void main(String[] args) {
    final Properties consumerProperties = new Properties();

    consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "sout");
    consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.1:19092");
    consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);

    try (final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerProperties)) {
      kafkaConsumer.subscribe(Collections.singleton("test"), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
          //do nothing
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
          System.out.println("Partitions were assigned");
          kafkaConsumer.seekToBeginning(partitions);
        }
      });
      while (true) {
        final ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(pollTimeout);
        consumerRecords.forEach(r -> System.out.println(r.value()));
        kafkaConsumer.commitSync();
      }
    }
  }
}

制作人:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.locks.LockSupport;

public final class OnceInASecondProducerSimple {
  public static void main(String[] args) {
    final Properties producerProperties = new Properties();
    producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.1:19092");

    long counter = 0;
    while (true) {
      try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
        producer.send(new ProducerRecord<>("test", "msg" + counter++));
      }
      LockSupport.parkNanos(Duration.ofSeconds(1).getNano());
    }
  }
}

只有在我再次启动代理后,消费者才能继续工作。

我错过了什么?如何为 Kafka 消费者获得高可用性?

【问题讨论】:

  • 连接到节点 -2 (/192.168.0.1:19092) .... 19092?您是否错误地指定了错误的端口?
  • 没有。同一台机器上有两个broker:第一个使用9092端口,第二个使用19092。

标签: apache-kafka kafka-consumer-api


【解决方案1】:

通过kafka-topics 脚本检查偏移主题__consumer_offsets 状态。失败的代理必须是您的组的协调者,并且 __consumer_offsets 的复制因子可能为 1,因此消费者找不到协调者。即使你重启了消费者,它仍然无法找到协调者。

在您的情况下,您可以提高 __consumer_offsets 的复制因子并重试以查看它是否按预期工作。

【讨论】:

  • 是的,你是对的。如果我设置 offsets.topic.replication.factor=2,消费者可以正常工作。实际上,这个线程重复了this one。对于那些稍后将阅读此主题的人:如果您在创建主题后需要更改此选项,请关注this advice。非常感谢您的帮助!
猜你喜欢
  • 2017-09-23
  • 2018-06-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多