【发布时间】:2017-07-07 09:16:09
【问题描述】:
我正在使用 kafka 0.10.1.1 和storm 1.0.2。在 kafka 集成的storm文档中,我可以看到偏移量仍然使用zookeeper维护,因为我们正在使用zookeeper服务器初始化kafka spout。 我如何使用kafka服务器引导spout。有没有这方面的例子。 Storm 文档中的示例
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
这个使用 zookeeper 的选项工作正常并且正在使用消息。但我无法在 kafkamanager ui 中将消费者组或风暴节点视为消费者。
尝试过的替代方法是这样的。
KafkaSpoutConfig<String, String> kafkaSpoutConfig = newKafkaSpoutConfig();
KafkaSpout<String, String> spout = new KafkaSpout<>(kafkaSpoutConfig);
private static KafkaSpoutConfig<String, String> newKafkaSpoutConfig() {
Map<String, Object> props = new HashMap<>();
props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, bootstrapServers);
props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID);
props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
String[] topics = new String[1];
topics[0] = topicName;
KafkaSpoutStreams kafkaSpoutStreams =
new KafkaSpoutStreamsNamedTopics.Builder(new Fields("message"), topics).build();
KafkaSpoutTuplesBuilder<String, String> tuplesBuilder =
new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new TuplesBuilder(topicName)).build();
KafkaSpoutConfig<String, String> spoutConf =
new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, tuplesBuilder).build();
return spoutConf;
}
但是这个解决方案在从 kafka 读取几条消息后显示 CommitFailedException。
【问题讨论】: