【发布时间】:2017-02-05 00:49:33
【问题描述】:
喜欢storm-kafka-client,用了storm-kafka-client但是不能很好用,写一个新的spout也不好用。 谁能帮我写一个示例kafka spout。
【问题讨论】:
标签: apache-kafka apache-storm producer-consumer kafka-consumer-api
喜欢storm-kafka-client,用了storm-kafka-client但是不能很好用,写一个新的spout也不好用。 谁能帮我写一个示例kafka spout。
【问题讨论】:
标签: apache-kafka apache-storm producer-consumer kafka-consumer-api
定义拓扑.java
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
public class Topology{
public static void main(String[] args){
TopologyBuilder builder = new TopologyBuilder();
String zkHosts = StringUtils.join("127.0.0.1", ',');
BrokerHosts hosts = new ZkHosts(zkHosts);
SpoutConfig spoutConfig = new SpoutConfig(hosts, "kafkaTopic_name", "/kafkaTopic_name", "kafkaGroup_name");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.forceFromStart = forceFromStart;
builder.setSpout("events", new KafkaSpout(spoutConfig), 5).setNumTasks(5);
//...
}
}
基本上,您需要创建 SpoutConfig 才能创建 kafkaSpout。
【讨论】: