【问题标题】:Not able to see kafka messages produced from Java api in the consumer console无法在消费者控制台中看到从 Java api 生成的 kafka 消息
【发布时间】:2017-05-11 15:18:14
【问题描述】:

我试图生成一些消息并放入主题中,然后从控制台消费者那里获取相同的内容。

使用的代码:

import  java.util.Date;
import  java.util.Properties;
import  kafka.javaapi.producer.Producer;
import  kafka.producer.KeyedMessage;
import  kafka.producer.ProducerConfig;


public  class   SimpleProducer  {
    private static  Producer<String,String> producer;
    public  SimpleProducer()    {
            Properties  props   =   new Properties();

            //  Set the broker  list    for requesting  metadata    to  find    the lead    broker
            props.put("metadata.broker.list","172.22.96.56:9092,172.22.96.56:9093,172.22.96.56:9094");

            //This  specifies   the serializer  class   for keys    
            props.put("serializer.class",   "kafka.serializer.StringEncoder");

            //  1   means   the producer    receives    an  acknowledgment  once    the lead replica    
            //  has received    the data.   This    option  provides    better  durability  as  the     
            //  client  waits   until   the server  acknowledges    the request as successful.
            props.put("request.required.acks",  "1");

            ProducerConfig  config  =   new ProducerConfig(props);
            producer    =   new Producer<String,    String>(config);
    }
    public  static  void    main(String[]   args)   {
            int argsCount   =   args.length;
            if  (argsCount  ==  0   ||  argsCount   ==  1)
                    throw   new IllegalArgumentException(
                            "Please provide topic   name    and Message count   as  arguments");

            String  topic   =   (String)    args[0];
            String  count   =   (String)    args[1];
            int messageCount    =   Integer.parseInt(count);
            System.out.println("Topic   Name    -   "   +   topic);
            System.out.println("Message Count   -   "   +   messageCount);
            SimpleProducer  simpleProducer  =   new SimpleProducer();
            simpleProducer.publishMessage(topic,    messageCount);
    }
    private void    publishMessage(String   topic,  int messageCount)   {
            for (int    mCount  =   0;  mCount  <   messageCount;   mCount++)   {
                    String  runtime =   new Date().toString();
                    String  msg =   "Message    Publishing  Time    -   "   +   runtime;
                    System.out.println(msg);
                    //  Creates a   KeyedMessage    instance
                    KeyedMessage<String,    String> data    =   
                            new KeyedMessage<String,    String>(topic,  msg);

                    //  Publish the message
                    producer.send(data);
            }
            //  Close   producer    connection  with    broker.
            producer.close();
    }

}

输出:

主题名称 - 测试 消息数 - 10 log4j:WARN 找不到记录器的附加程序 (kafka.utils.VerifiableProperties)。 log4j:WARN 请正确初始化 log4j 系统。 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - Tue Feb 16 02:00:56 IST 2016

从命令行我提供主题名称为“kafkatopic”和消息计数“10”。该程序运行良好,没有异常,但是当我尝试从控制台查看消息时,它们没有出现。主题已创建。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning

您能帮忙解决一下问题吗!

【问题讨论】:

  • 你能添加你正在执行的命令行来启动你的生产者吗?
  • 好的,从您粘贴的堆栈跟踪中,我们可以看到您提供的第一个参数为“test”Topic Name - test,只需将其更改为“kafkatopic”,它应该可以正常工作:)跨度>
  • 哎呀!!我的错 。感谢您指出 Lousi :)

标签: apache-kafka kafka-consumer-api kafka-producer-api


【解决方案1】:

我想指出两点:

1) 您没有在此处指定 --zookeeper - 您应该使用 --bootstrap-server 参数。

2) 您应该看到server.properties 文件中关于listenersadvertised.listener 的内容。您应该正确地将它们指向经纪人。

我希望这会有所帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-09-24
    • 2018-07-01
    • 1970-01-01
    • 2020-07-14
    • 1970-01-01
    • 2016-07-19
    • 2023-01-21
    • 2020-08-13
    相关资源
    最近更新 更多