【发布时间】:2017-05-11 15:18:14
【问题描述】:
我试图生成一些消息并放入主题中,然后从控制台消费者那里获取相同的内容。
使用的代码:
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class SimpleProducer {
private static Producer<String,String> producer;
public SimpleProducer() {
Properties props = new Properties();
// Set the broker list for requesting metadata to find the lead broker
props.put("metadata.broker.list","172.22.96.56:9092,172.22.96.56:9093,172.22.96.56:9094");
//This specifies the serializer class for keys
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 1 means the producer receives an acknowledgment once the lead replica
// has received the data. This option provides better durability as the
// client waits until the server acknowledges the request as successful.
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
}
public static void main(String[] args) {
int argsCount = args.length;
if (argsCount == 0 || argsCount == 1)
throw new IllegalArgumentException(
"Please provide topic name and Message count as arguments");
String topic = (String) args[0];
String count = (String) args[1];
int messageCount = Integer.parseInt(count);
System.out.println("Topic Name - " + topic);
System.out.println("Message Count - " + messageCount);
SimpleProducer simpleProducer = new SimpleProducer();
simpleProducer.publishMessage(topic, messageCount);
}
private void publishMessage(String topic, int messageCount) {
for (int mCount = 0; mCount < messageCount; mCount++) {
String runtime = new Date().toString();
String msg = "Message Publishing Time - " + runtime;
System.out.println(msg);
// Creates a KeyedMessage instance
KeyedMessage<String, String> data =
new KeyedMessage<String, String>(topic, msg);
// Publish the message
producer.send(data);
}
// Close producer connection with broker.
producer.close();
}
}
输出:
主题名称 - 测试 消息数 - 10 log4j:WARN 找不到记录器的附加程序 (kafka.utils.VerifiableProperties)。 log4j:WARN 请正确初始化 log4j 系统。 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - 2016 年 2 月 16 日星期二 02:00:56 IST 消息发布时间 - Tue Feb 16 02:00:56 IST 2016
从命令行我提供主题名称为“kafkatopic”和消息计数“10”。该程序运行良好,没有异常,但是当我尝试从控制台查看消息时,它们没有出现。主题已创建。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
您能帮忙解决一下问题吗!
【问题讨论】:
-
你能添加你正在执行的命令行来启动你的生产者吗?
-
好的,从您粘贴的堆栈跟踪中,我们可以看到您提供的第一个参数为“test”
Topic Name - test,只需将其更改为“kafkatopic”,它应该可以正常工作:)跨度> -
哎呀!!我的错 。感谢您指出 Lousi :)
标签: apache-kafka kafka-consumer-api kafka-producer-api