【问题标题】:Error producing to embedded kafka生成嵌入式 kafka 时出错
【发布时间】:2016-06-27 18:48:18
【问题描述】:

我正在尝试在我的代码中嵌入一个 kafkaserver。我使用下面的示例代码来尝试学习如何做到这一点,但由于某种原因,我的生产者无法向嵌入式服务器发送消息(它在 60 秒后超时)。我正在使用卡夫卡 0.8.2.2。谁能告诉我我做错了什么?

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.TopicMetadata; 
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import kafka.utils.Utils;
import org.apache.commons.collections.functors.ExceptionPredicate;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.Properties;

public class KafkaLocalBroker {

public static final String TEST_TOPIC = "test-topic";

public KafkaConfig kafkaConfig;
public KafkaServer kafkaServer;
public TestingServer zookeeper;


public KafkaLocalBroker() throws Exception{

        zookeeper = new TestingServer(true);
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper.getConnectString());
        props.put("broker.id", 0);
        kafkaConfig = new KafkaConfig(props);

        kafkaServer = new KafkaServer(kafkaConfig, new Time() {
            public long nanoseconds() {
                return System.nanoTime();
            }

            public long milliseconds() {
                return System.currentTimeMillis();
            }

            public void sleep(long ms) {
                try {
                    Thread.sleep(ms);
                } catch(InterruptedException e){
                    // Do Nothing
                }
            }
        });
        kafkaServer.startup();
        System.out.println("embedded kafka is up");
    }

    public void stop(){
        kafkaServer.shutdown();
        System.out.println("embedded kafka stop");
    }

    /**
     * a main that tests the embedded kafka
     * @param args
     */
    public static void main(String[] args) {

    KafkaLocalBroker kafkaLocalBroker = null;
        //init kafka server and start it:
        try {
            kafkaLocalBroker = new KafkaLocalBroker();
        } catch (Exception e){

        }
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 1);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //send one message to local kafka server:
        for (int i=0; i<10; i++){
            ProducerRecord<String, String> data = new ProducerRecord<String, String>(TEST_TOPIC, "test-message" + i);
            producer.send(data, (metadata, exception) -> {
                if (exception != null) {

                    System.out.println("Failed to write log message: " + exception.getMessage());

                } else {
                    System.out.println("Successful write to offset {} in partition {} on topic {}: " +
                            metadata.offset() + ", " + metadata.partition() + ", "+ metadata.topic());

                }
            });
        }

        //consume messages from Kafka:
        SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 10000, 1024000, "clientId");
        long offset = 0L;
        while (offset < 160) { //this is an exit criteria just for this test so we are not stuck in enless loop
            // create a fetch request for topic “test”, partition 0, current offset, and fetch size of 1MB
            FetchRequest fetchRequest = new FetchRequestBuilder().addFetch(TEST_TOPIC, 0, offset, 100000).build();//new FetchRequest(TEST_TOPIC, 0, offset, 1000000);

            // get the message set from the consumer and print them out
            FetchResponse messages = consumer.fetch(fetchRequest);
            for(MessageAndOffset msg : messages.messageSet(TEST_TOPIC, 0)) {

                ByteBuffer payload = msg.message().payload();
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                try {
                    System.out.println(new String(bytes, "UTF-8"));
                } catch (Exception e){

                }
                // advance the offset after consuming each message
                offset = msg.offset();
            }
        }

        producer.close();
        //close the consumer
        consumer.close();
        //stop the kafka broker:
        if(kafkaLocalBroker != null) {
            kafkaLocalBroker.stop();
        }
    }
}

编辑:我在下面包含了从生产者返回的异常:

org.apache.kafka.common.errors.TimeoutException: 60000 毫秒后更新元数据失败。

【问题讨论】:

  • 你如何描述失败的特征?任何具体的错误信息?你确定是生产者而不是消费者有问题吗?
  • producer.send 挂起 60 秒,然后吐出我在上面的编辑中包含的 TimeoutException。
  • 你解决过这个问题吗?
  • 我将我的版本升级到 0.10.0.0 并且它工作正常。

标签: apache-kafka


【解决方案1】:

用于创建 kafka producer 的属性对 0.8 无效。通过producerconfig 并更改属性。或者更新kafka版本

【讨论】:

  • 我使用'org.apache.kafka.clients.producer.KafkaProducer'还是'org.apache.kafka.producer.Producer'?
  • 当我使用包含链接中给出的配置时出现此异常:线程“main”中的异常 org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which没有默认值。
  • 看来kafka生产者至少需要配置的bootstrap.servers、key.serializer和value.serializer
猜你喜欢
  • 2015-04-04
  • 2017-05-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-07-12
  • 1970-01-01
  • 2019-11-11
  • 2020-10-01
相关资源
最近更新 更多