【发布时间】:2016-05-10 20:14:10
【问题描述】:
我正在编写一个概念验证应用程序来使用来自 Apache Kafka 0.9.0.0 的消息,看看我是否可以使用它来代替常见的 JMS 消息代理,因为 Kafka 提供了一些好处。这是我的基本代码,使用新的消费者 API:
public class Main implements Runnable {
public static final long DEFAULT_POLL_TIME = 300;
public static final String DEFAULT_GROUP_ID = "ltmjTest";
volatile boolean keepRunning = true;
private KafkaConsumer<String, Object> consumer;
private String servers;
private String groupId = DEFAULT_GROUP_ID;
private long pollTime = DEFAULT_POLL_TIME;
private String[] topics;
public Main() {
}
//getters and setters...
public void createConsumer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configs.put("enable.auto.commit", "true");
configs.put("auto.commit.interval.ms", "1000");
configs.put("session.timeout.ms", "30000");
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(configs);
consumer.subscribe(asList(topics));
}
public static void main(String[] args) {
Main main = new Main();
if (args != null && args.length > 0) {
for (String arg : args) {
String[] realArg = arg.trim().split("=", 2);
String argKey = realArg[0].toLowerCase();
String argValue = realArg[1];
switch (argKey) {
case "polltime":
main.setPollTime(Long.parseLong(argValue));
break;
case "groupid":
main.setGroupId(argValue);
break;
case "servers":
main.setServers(argValue);
break;
case "topics":
main.setTopics(argValue.split(","));
break;
}
}
main.createConsumer();
new Thread(main).start();
try (Scanner scanner = new Scanner(System.in)) {
while(true) {
String line = scanner.nextLine();
if (line.equals("stop")) {
main.setKeepRunning(false);
break;
}
}
}
}
}
我已经使用默认设置启动了一个 kafka 服务器,并使用 shell 工具 kafka-console-producer.sh 启动了一个 kafka 生产者向我的主题写入消息。然后我使用此代码与两个消费者连接,发送正确的服务器进行连接和订阅主题,其他所有内容都使用默认值,这意味着两个消费者具有相同的组 ID。我注意到只有我的一个消费者使用了所有数据。我读过默认行为应该是消费者必须由服务器平衡,来自official tutorial:
如果所有消费者实例都具有相同的消费者组,那么这就像传统的队列平衡消费者负载一样。
如何修复消费者的行为,使其表现得像默认值?还是我遗漏了什么?
【问题讨论】:
标签: java apache-kafka kafka-consumer-api