【问题标题】:Apache Kafka example error: Failed to send message after 3 triesApache Kafka 示例错误:3 次尝试后发送消息失败
【发布时间】:2014-05-28 06:00:07
【问题描述】:

我正在运行其网站中提到的这个 kafka 生产者示例

代码:

public class TestProducer {

    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
        Properties props = new Properties();
        props.put("metadata.broker.list", "host.broker-1:9093, host.broker-2:9093, host.broker-3:9095");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "test.app.SimplePartitioner");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = "192.168.2." + rnd.nextInt(255); 
               String msg = runtime + ",www.example.com," + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}

public class SimplePartitioner implements Partitioner{
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }

}

更多细节:

我在主机代理[1-3]远程的主机(调用是生产者)上运行此应用程序

  • 我可以从生产者主机 ping 和 ssh 代理主机。

  • 在server.properties中提供了advertised.host.name(它们在brokers中分别命名为server[1-3].properties

属性:

broker.id=1
port=9093
host.name=host.broker.internal.name
advertised.host.name=host-broker1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/1/kafka-logs-1,/data/2/kafka-logs-2
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.connection.timeout.ms=6000

知道如何解决这个错误吗?

【问题讨论】:

  • 显然,您的生产者无法向 Kafka 集群发送消息。最常见的原因是生产者配置错误或集群已关闭,或存在网络问题(例如,广告代理名称是否可从生产者主机路由?)。很难从这么多信息中说出更多信息。
  • @sandris 请现在看一下描述,如果有帮助的话。
  • 检查您是否可以从运行 java 应用程序的计算机远程登录:“telnet host.broker-1 9093”从 kafka Broker 远程登录到 zookeeper:“telnet zk1 2181”我将删除所有 dns 名称并尝试原始 IP 地址。例如: "props.put("metadata.broker.list", "0.0.0.0:9093")" 并删除属性: "host.name=host.broker.internal.name" "advertised.host.name=主机代理1"

标签: apache-kafka


【解决方案1】:

我在运行 Kafka 生产者时遇到了这些错误:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

找到了解决方案:

在我的Mac机器上,下载scala-2.10kafka_2.10-0.8.1后,在kafka_2.10-0.8.1目录下,当我启动zookeeper,kafka server,并创建一个测试主题时,一切都很好。然后我需要为测试主题启动一个生产者。但是有一个错误:

yhuangMac:kafka_2.10-0.8.1 yhuang$ ./bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

原因是在kafka libs目录下,kafka release zip文件只包含了slf4j-api的jar文件,他们漏掉了一个jar文件:slf4j-nop.jar,所以我们只好去http://www.slf4j.org,下载slf4j-1.7.7.zip,然后解压,将slf4j-api-1.7.7、slf4j-nop-1.7.7.jar复制到kafka的libs目录下。

再次重启kafka producer,现在没有报错了。

来源:SOLUTION

【讨论】:

  • 试过这个但没有帮助。
  • 这个答案与问题无关。
【解决方案2】:

您需要添加 SLF4j 日志记录实现。如果您使用 maven 作为构建工具,请尝试将以下内容添加到您的 pom.xml 中,看看它是否有效..

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.5</version>
    </dependency>

【讨论】:

  • 这只是提示如何设置记录器,而不是如何解决实际问题。
【解决方案3】:

这是 Krish 提出的原始问题中异常的解决方案:“kafka.common.FailedToSendMessageException: 3 次尝试后发送消息失败。”

常见问题解答herehere 说应该正确设置您的主机名。我没有经历过这种情况。但是当kafka生产者给出这个错误信息时,我发现了另一种情况:当你的生产者中的分区键错误时。也就是说,如果你有一个带有一个分区的主题,那么生产者中的分区键可以为 null(消息发送到随机分区)或 0(kafka 中的分区从 0 开始编号)。如果您尝试使用 1 的分区键,则会在生产者中引发此异常。或者如果您在主题中有 3 个分区,并且您使用的分区键为 3(3 的键无效,因为有效的分区号为 0,1,2),则会引发此异常。当生产者的 send() 方法中的分区号与主题中的分区范围不匹配时,此错误是一致的。 我使用了 kafka 0.8.2 版。我使用的客户端 API 是包 kafka.javaapi.producer.Producer。

【讨论】:

    【解决方案4】:

    如果客户端无法同时访问 kafka 代理的主机名和 IP,就会发生这种情况。

    进入客户端 \etc\hosts 或 C:\Windows\System32\drivers\etc\hosts 并为我解决了这个问题。

    【讨论】:

    • 我在 OS X 上遇到了同样的问题,并为代理添加主机条目修复了它。
    【解决方案5】:

    我从 apache kafka 得到错误:

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details
    

    我的设置:

    OS: Ubuntu 14.04
    sbt: sbt launcher version 0.13.5
    scala: Scala code runner version 2.9.2
    

    能够使用以下命令修复它:

    cd /home/el/apachekafka/kafka_2.10-0.8.1.1/libs
    wget http://www.slf4j.org/dist/slf4j-1.7.7.tar.gz
    tar -xvf slf4j-1.7.7.tar.gz
    cd /home/el/apachekafka/kafka_2.10-0.8.1.1/libs/slf4j-1.7.7
    cp slf4j-api-1.7.7.jar ..
    cp slf4j-nop-1.7.7.jar ..
    

    然后重新运行命令,生产者不会抛出任何错误。

    【讨论】:

    • 没用。没有帮助
    【解决方案6】:

    对于 HDP kafka 使用代理端口:6667

    对于独立 kafka,使用代理端口:9092

    错误是因为我们使用的端口号(HDP 使用 6667 但我们使用的是 9092)

    bin/kafka-console-producer.sh --broker-list broker-ip:9092 --topic test //不工作

    bin/kafka-console-producer.sh --broker-list broker-ip:6667 --topic test //工作

    链接:Kafka console producer Error in Hortonworks HDP 2.3 Sandbox

    【讨论】:

      【解决方案7】:

      不确定,但一种可能是主题不是在 Kafka 上创建的。

      检查 kafka 的 Web UI 并确保您使用的主题(即“page_visits”)在此处创建以发送数据。

      如果不是,使用 GUI 创建主题非常容易。

      【讨论】:

        【解决方案8】:

        我在默认端口设置为 6667 的 Hortonworks HDP 2.2 中遇到了这个错误。 如果您的 kafka 服务器在 HDP 沙箱上运行,则分辨率将设置为 metadata.broker.list as 10.0.2.15:6667 请按照此代码操作。

                    Properties props = new Properties();
                    props.put("metadata.broker.list", "10.0.2.15:6667");
                    props.put("serializer.class", "kafka.serializer.StringEncoder");
                    //props.put("producer.type","async");
                    props.put("request.required.acks", "1");
                    ProducerConfig config = new ProducerConfig(props);
        
                    Producer<String, String> producer = new Producer<String, String>(config);
                    try{        
                    producer.send(new KeyedMessage<String, String>("zerg.hydra", jsonPayload));
        
                    producer.close();
                }catch(Exception e){
                    e.printStackTrace();
                }
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2011-12-28
          • 2022-01-18
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2011-02-13
          相关资源
          最近更新 更多