【问题标题】:Kafka: java client failed to send messages after x triesKafka:Java客户端在x次尝试后未能发送消息
【发布时间】:2016-06-17 23:48:35
【问题描述】:

我尝试从 Java 应用程序向 Kafka 发送消息。

我只能得到“两次尝试后发送消息失败”:

Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 2 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at test.Main.main(Main.java:26)

Kafka 在远程机器上运行,所以我添加到它的server.properties(假设 Kafka 服务器的 IP 地址是 192.168.0.1):

host.name=192.168.0.1
advertised.host.name=192.168.0.1
advertised.port=9092

正在运行的 Kafka 是 kafka_2.11-0.8.2.1,所以我使用(我猜)适当的 Java 客户端版本:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.8.2.1</version>
</dependency>

Java 代码:

public static void main(String[] args) {
  Properties properties = new Properties();
  properties.put("metadata.broker.list", "192.168.0.1:9092");
  properties.put("serializer.class", "test.StringEncoder");
  properties.put("key.serializer.class", "test.StringEncoder");
  properties.put("message.send.max.retries", "2");

  Producer<String, String> kafkaProducer = new Producer<String, String>(new ProducerConfig(properties));

  kafkaProducer.send(new KeyedMessage<String, String>(
      "LOG", 
      "Yo! " + new Date().toString()
  ));

  kafkaProducer.close();
}

LOG 主题已创建。我能够从执行 Java 代码的同一台机器向 Kafka 发送消息(并且它可以工作):

bin/kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic LOG

当 Java 代码失败时,Kafka 和 Zookeeper 不会记录任何内容。

有没有我遗漏的特定参数?

【问题讨论】:

  • 你的属性有问题? properties.put("metadata.broker.list", "192.68.0.1:9092");不是:“192.168.0.1:9092”?
  • 这只是一个错字,因为我没有在这里提供真实的 IP 地址。不过谢谢你的评论,我修好了。

标签: java apache-kafka


【解决方案1】:

Apache kafka 有一个更好的新生产者客户端:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.0</version>
</dependency>

示例: https://github.com/CameronGregory/kafka/blob/master/TestProducer.java

显然你的配置没问题。 “test.StringEncoder”是您的自定义类吗?试试你改用“kafka.serializer.StringEncoder”

【讨论】:

    【解决方案2】:

    即使我也遇到了同样的问题,对我来说,重新启动 Kafka 服务器就像魅力一样。可能你应该尝试重新启动一次。

    【讨论】:

      猜你喜欢
      • 2012-03-10
      • 2020-12-16
      • 1970-01-01
      • 2013-04-24
      • 1970-01-01
      • 2013-01-06
      • 1970-01-01
      • 1970-01-01
      • 2019-12-02
      相关资源
      最近更新 更多