【发布时间】:2015-08-31 16:27:03
【问题描述】:
我开始使用最新的 Kafka 文档http://kafka.apache.org/documentation.html。但是当我尝试使用新的消费者 API 时遇到了一些问题。我已经通过以下步骤完成了这项工作:
1.添加新的依赖项
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
2。添加配置
Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"host:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
3.使用 KafkaConsumer API
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");
但是,当我尝试从代理轮询消息时,我得到的只是 null:
Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
process(records);
else
System.err.println("null");
然后我检查了源代码后知道消费者出了什么问题:
@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
// TODO Auto-generated method stub
return null;
}
更糟糕的是,我找不到有关 0.8.2 API 的任何其他有用信息,因为有关 Kafka 的所有用法都与最新版本不兼容。有人可以帮助我吗?非常感谢。
【问题讨论】:
-
新的 KafkaConsumer API 将仅在 0.8.3 cwiki.apache.org/confluence/display/KAFKA/Future+release+plan 中可用。显然,主干中有一些实现,尽管我对状态一无所知。目前我正在使用旧的消费者实现。
-
谢谢@habsq。那么在所有老的api中,使用Kafka 0.8.x时哪个版本是最好的选择呢?
-
我使用的是 0.8.1.1,不知道“最佳”选择。
-
知道了。还是谢谢你。
标签: apache-kafka kafka-consumer-api