【问题标题】:Error reading field 'topic_metadata' in Kafka在 Kafka 中读取字段“topic_metadata”时出错
【发布时间】:2016-09-26 12:38:33
【问题描述】:

我正在尝试在我的 server.properties 文件中使用 auto.create.topics.enable=true 连接到我的代理。但是,当我尝试使用 Java 客户端生产者连接到代理时,我得到以下error

1197 [kafka-producer-network-thread | producer-1] 错误 org.apache.kafka.clients.producer.internals.Sender - 未捕获的错误 kafka生产者I/O线程: org.apache.kafka.common.protocol.types.SchemaException:读取错误 字段“topic_metadata”:读取大小为 619631 的数组时出错,只有 37 可用字节 org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73) 在 org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380) 在 org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449) 在 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) 在 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229) 在 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) 在 java.lang.Thread.run(Unknown Source)

以下是我的客户生产者代码。

public static void main(String[] argv){
         Properties props = new Properties();
         props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 0);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("block.on.buffer.full",true);
         Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try{ for(int i = 0; i < 10; i++)
        { producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i)));
             System.out.println("Tried sending:"+i);}
        }
        catch (Exception e){
            e.printStackTrace();
        }
         producer.close();
}

有人可以帮我解决这个问题吗?

【问题讨论】:

    标签: apache-kafka kafka-producer-api


    【解决方案1】:

    看起来我在客户端设置了错误的属性,而且我的 server.properties 文件具有不适合我正在使用的客户端的属性。所以我决定使用 maven 将 java 客户端更改为 0.9.0 版本。

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
    </dependency>
    

    我的 server.properties 文件如下。

    broker.id=0
    port=9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=false
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=9000
    delete.topic.enable=true
    advertised.host.name=<aws public Ip>
    advertised.port=9092
    

    我的生产者代码看起来像

        import java.util.Properties;
        import java.util.concurrent.ExecutionException;
    
        import org.apache.kafka.clients.producer.KafkaProducer;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.clients.producer.ProducerRecord;
        import org.apache.kafka.common.serialization.StringSerializer;
        public class HelloKafkaProducer 
         {
    
    
           public static void main(String args[]) throws InterruptedException,      ExecutionException {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);
    
            boolean sync = false;
            String topic="loader1";
            String key = "mykey";
            for(int i=0;i<1000;i++)
            {
            String value = "myvaluehasbeensent"+i+i;
            ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic, key, value);
            if (sync) {
                producer.send(producerRecord).get();
            } else {
                producer.send(producerRecord);
            }
            }
            producer.close();
        }
     }
    

    【讨论】:

      【解决方案2】:

      我通过编辑解决了这个问题

      /etc/hosts file
      

      检查您的主机文件,如果 Zookeeper 或其他代理的 ip 不在此文件中。

      【讨论】:

        【解决方案3】:

        我也遇到过类似的问题。这里的问题是,当 pom 文件中的 kafka 客户端版本不匹配时,kafka 服务器的版本不同。 我使用的是 kafka 客户端 0.10.0.0_1,但 kafka 服务器仍在 0.9.0.0 中。所以我将 kafka 服务器版本升级到 10,问题得到了解决。

        <dependency>
                    <groupId>org.apache.servicemix.bundles</groupId>
                    <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
                    <version>0.10.0.0_1</version>
                </dependency>            
        

        【讨论】:

          【解决方案4】:

          确保您使用正确的版本。假设您使用以下 maven 依赖项:

          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
            <version>${flink.version}</version>
          </dependency>
          

          所以工件等于:flink-connector-kafka-0.8_2.10

          现在检查您是否使用正确的 Kafka 版本:

          cd /KAFKA_HOME/libs
          

          现在找到 kafka_YOUR-VERSION-sources.jar。

          就我而言,我有 kafka_2.10-0.8.2.1-sources.jar。所以它工作正常! :) 如果您使用不同的版本,只需更改 maven 依赖项或下载正确的 kafka 版本。

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2016-10-02
            • 1970-01-01
            • 2016-09-30
            • 1970-01-01
            • 1970-01-01
            • 2021-10-02
            • 2020-06-23
            • 2017-09-18
            相关资源
            最近更新 更多