【问题标题】:How to write a samplest kafka spout with New Kafka Consumer API?如何使用 New Kafka Consumer API 编写 samplet kafka spout?
【发布时间】:2017-02-05 00:49:33
【问题描述】:

喜欢storm-kafka-client,用了storm-kafka-client但是不能很好用,写一个新的spout也不好用。 谁能帮我写一个示例kafka spout。

【问题讨论】:

    标签: apache-kafka apache-storm producer-consumer kafka-consumer-api


    【解决方案1】:

    定义拓扑.java

    import storm.kafka.BrokerHosts;
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    
    public class Topology{
      public static void main(String[] args){
         TopologyBuilder builder = new TopologyBuilder();
         String zkHosts = StringUtils.join("127.0.0.1", ',');
    
            BrokerHosts hosts = new ZkHosts(zkHosts);
            SpoutConfig spoutConfig = new SpoutConfig(hosts, "kafkaTopic_name", "/kafkaTopic_name", "kafkaGroup_name");
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            spoutConfig.forceFromStart = forceFromStart;
            builder.setSpout("events", new KafkaSpout(spoutConfig), 5).setNumTasks(5);
            //...
      }
    }
    

    基本上,您需要创建 SpoutConfig 才能创建 kafkaSpout。

    【讨论】:

    • 也许我让你感到困惑,但我需要的是 KafkaSpout.java,而不是 Topology。这是你的新东西。
    猜你喜欢
    • 2016-11-03
    • 2015-08-31
    • 2018-11-27
    • 2013-06-24
    • 2017-12-14
    • 1970-01-01
    • 2023-03-02
    • 2014-12-03
    • 2023-03-23
    相关资源
    最近更新 更多