【问题标题】:KafkaConsumer can't poll message, but kafka-console-consumer.sh can work, why?KafkaConsumer 不能轮询消息,但是 kafka-console-consumer.sh 可以工作,为什么?
【发布时间】:2019-10-15 20:57:22
【问题描述】:

我的服务器安装 kafka 使用 docker: wurstmeister/kafka。

当我使用 kafka-clients 2.2.0 生产和消费 kafka 消息时, 我的生产者运行良好,但消费者无法收到任何消息。

但是当我使用 shell 时,这个命令不能得到消息:

/opt/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 10.2.0.242:9092 --topic mytesttopic --from-beginning

这个命令可以得到所有的消息:

/opt/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 10.2.0.242:9092 --topic mytesttopic --from-beginning --partition 0
String  kafkaServer = "10.2.0.242:9092";
String defaultTopic = "mytesttopic";

// create topic
Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer);
try (AdminClient client = AdminClient.create(props)) {
    CreateTopicsResult ret = client.createTopics(Arrays.asList(new NewTopic(defaultTopic, 1, (short) 1)));
    ret.all().get();
}

// send message
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> msg = new ProducerRecord<>(defaultTopic, "a1", "test1");
Future ret = producer.send(msg);
System.out.println("send ok: " + ret.get());

// recieve message
props = new Properties();
props.put("bootstrap.servers", kafkaServer);
props.put("group.id", "testaaa4aaa");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer consumer = new KafkaConsumer<String, String>(props);
TopicPartition partition1 = new TopicPartition(defaultTopic, 0);
consumer.assign(Arrays.asList(partition1));
//        consumer.subscribe(Arrays.asList(defaultTopic));
Duration duration = Duration.ofMillis(100);
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(duration);
// here always get 0.
    System.out.println(records.count());
}

我尝试在我的代码中设置 TopicPartition,但仍然无法收到消息,有人可以帮助我吗?

【问题讨论】:

  • 我找到控制台消息:[Consumer clientId=consumer-1, groupId=testaaa4aaa] Received FindCoordinator response ClientResponse(receivedTimeMs=1559204286805,latencyMs=13, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-1, correlationId=4), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null)))
  • 当我删除这一行时它起作用了:``` props.put("group.id", "testaaa4aaa"); ``` 但是为什么我必须指定分区??

标签: java apache-kafka


【解决方案1】:

正如控制台消息所说,您的组协调员似乎不可用。您最近是否更新/更改了 Docker 映像?问题是,如果您没有指定 group.id 或者如果您手动为控制台使用者指定分区,那么您就没有使用 Kafka 的组管理(因此没有协调器)。因此,这解决了这个问题是有道理的。如果您使用不同的组 ID,会发生什么情况?

【讨论】:

    猜你喜欢
    • 2018-11-29
    • 1970-01-01
    • 2020-05-31
    • 1970-01-01
    • 2021-04-19
    • 2019-04-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多