【问题标题】:KafkaSpout working example [closed]KafkaSpout 工作示例 [关闭]
【发布时间】:2015-07-27 23:09:17
【问题描述】:

我最近熟悉了 Apache Kafka,并且有一个生产者-消费者的工作示例。

我的下一步是将 Kafka 与 Spout 和 Bolt 集成,我很难让可用的示例(它们大多是旧的)在本地工作。

我得到了以下示例,它正在从本地文本文件中读取数据的storm-b​​ook/examples-ch02-getting_started。

同一个 repo 有一个 Storm-b​​ook/examples-ch04-spouts kafka-spout 的示例,但我无法让它工作。

我也尝试了以下示例cep.kafka,但出现以下错误-

5034 [Thread-11-words] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
5047 [Thread-11-words] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
        at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:169) ~[curator-framework-2.4.0.jar:na]
        at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.ConnectionState.reset(ConnectionState.java:219) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.ConnectionState.start(ConnectionState.java:103) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:188) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:234) ~[curator-framework-2.4.0.jar:na]
        at storm.kafka.ZkState.<init>(ZkState.java:62) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
        at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_05]
5049 [Thread-11-words] ERROR backtype.storm.daemon.executor -
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
        at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:169) ~[curator-framework-2.4.0.jar:na]
        at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.ConnectionState.reset(ConnectionState.java:219) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.ConnectionState.start(ConnectionState.java:103) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:188) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:234) ~[curator-framework-2.4.0.jar:na]
        at storm.kafka.ZkState.<init>(ZkState.java:62) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
        at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_05]
5088 [Thread-11-words] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$fn__4693$fn__4694.invoke(worker.clj:491) [storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_05]

【问题讨论】:

    标签: java apache-kafka apache-storm


    【解决方案1】:

    当我在学习如何运行创建和运行 Kafka Spout 时遇到同样的问题时,我发现这个 Github repo 非常有用,并且我能够让我的 KafkaSpout 将元组发射到其余的螺栓。

    这是一个关于我如何为此创建拓扑的高级示例。

    public class TestTopology {
    
        public static void main(String[] args) {
    
            String zkIp = "192.168.59.103";
    
            String nimbusHost = "192.168.59.103";
    
            String zookeeperHost = zkIp +":2181";
    
            ZkHosts zkHosts = new ZkHosts(zookeeperHost);
    
            SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "myKafkaTopic", "", "storm");
    
            kafkaConfig.scheme = new SchemeAsMultiScheme(new JsonScheme() {
                @Override
                public Fields getOutputFields() {
                    return new Fields("events");
                }
            });
    
            KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
    
            TopologyBuilder builder = new TopologyBuilder();
    
            builder.setSpout("eventsEmitter", kafkaSpout, 8);
    
            builder.setBolt("eventsProcessor", new RollingCountBolt(2, 1), 8)
                    .fieldsGrouping("requestsEmitter", new Fields("request"));
    
            //More bolts stuffzz
    
            Config config = new Config();
    
            config.setMaxTaskParallelism(5);
            config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2);
            config.put(Config.NIMBUS_HOST, nimbusHost);
            config.put(Config.NIMBUS_THRIFT_PORT, 6627);
            config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
            config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkIp));
    
            try {
                StormSubmitter.submitTopology("my-topology", config, builder.createTopology());
            } catch (Exception e) {
                throw new IllegalStateException("Couldn't initialize the topology", e);
            }
        }
    
    }
    

    希望这会有所帮助,

    何塞·路易斯

    【讨论】:

    • 嗨 Jose,非常感谢您提供指向 repo 的链接。我可以在那里成功运行示例表单。
    • 当我使用 LocalCluster 时,数据被印迹和 kafka spout 消耗。但是当我使用 StormSubmitter 时,Kafka 的数据没有被消耗。我正在使用 Kafka 控制台生产者并使用 Storm vs 1.0.3 将数据推送到 kafka
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2010-11-24
    • 2010-10-25
    • 1970-01-01
    • 2014-10-30
    • 1970-01-01
    • 2014-05-08
    • 2011-08-19
    相关资源
    最近更新 更多