【问题标题】:Storm HDFS Bolt not workingStorm HDFS 螺栓不工作
【发布时间】:2015-04-08 04:32:21
【问题描述】:

所以我刚刚开始使用storm并试图理解它。我正在尝试连接到 kafka 主题,读取数据并将其写入 HDFS 螺栓。 起初我在没有 shuffleGrouping("stormspout") 的情况下创建了它,我的 Storm UI 显示 spout 正在消耗来自主题的数据,但没有任何东西写入 bolt(除了它在 HDFS 上创建的空文件) .然后我添加了 shuffleGrouping("stormspout");现在螺栓似乎给出了错误。如果有人能提供帮助,我将不胜感激。

谢谢, 科尔曼

错误

2015-04-13 00:02:58 s.k.PartitionManager [INFO] 从以下位置读取分区信息:/storm/partition_0 --> null 2015-04-13 00:02:58 s.k.PartitionManager [INFO] 未找到分区信息,使用配置确定偏移量 2015-04-13 00:02:58 s.k.PartitionManager [INFO] 来自动物园管理员的上次提交偏移量:0 2015-04-13 00:02:58 s.k.PartitionManager [INFO] 提交偏移量 0 大于 9223372036854775807 后面,重置为 startOffsetTime=-2 2015-04-13 00:02:58 s.k.PartitionManager [INFO] 从偏移量 0 开始 Kafka 192.168.134.137:0 2015-04-13 00:02:58 s.k.ZkCoordinator [INFO] 任务 [1/1] 完成刷新 2015-04-13 00:02:58 b.s.d.task [INFO] 发射:stormspout 默认 [colmanblah] 2015-04-13 00:02:58 b.s.d.executor [INFO] 传输元组任务:2 TUPLE:来源:stormspout:3,流:默认,id:{462820364856350458=5573117062061876630},[colmanblah] 2015-04-13 00:02:58 b.s.d.task [INFO] 发射:stormspout __ack_init [462820364856350458 5573117062061876630 3] 2015-04-13 00:02:58 b.s.d.executor [INFO] 传输元组任务:1 TUPLE:来源:stormspout:3,流:__ack_init,id:{},[462820364856350458 5573117062061876630 3] 2015-04-13 00:02:58 b.s.d.executor [INFO] 为 1 个元组处理收到的消息:源:stormspout:3,流:__ack_init,id:{},[462820364856350458 5573117062061876630 3] 2015-04-13 00:02:58 b.s.d.executor [INFO] BOLT ack TASK: 1 TIME: TUPLE: source:stormspout:3, stream: __ack_init, id: {}, [462820364856350458 5573117062061876630 3] 2015-04-13 00:02:58 b.s.d.executor [INFO] 执行完成的 TUPLE 源:stormspout:3,流:__ack_init,id:{},[462820364856350458 5573117062061876630 3] 任务:1 个 DELTA: 2015-04-13 00:02:59 b.s.d.executor [INFO] 准备螺栓风暴:(2) 2015-04-13 00:02:59 b.s.d.executor [INFO] 处理收到的消息 FOR 2 TUPLE:来源:stormspout:3,流:默认,id:{462820364856350458=5573117062061876630},[colmanblah]

2015-04-13 00:02:59 b.s.util [错误] 异步循环死了!

            java.lang.RuntimeException: java.lang.NullPointerException
                    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$fn__5697$fn__5710$fn__5761.invoke(executor.clj:794) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.util$async_loop$fn__452.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
                    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
            Caused by: java.lang.NullPointerException: null
                    at org.apache.storm.hdfs.bolt.HdfsBolt.execute(HdfsBolt.java:92) ~[storm-hdfs-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$fn__5697$tuple_action_fn__5699.invoke(executor.clj:659) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$mk_task_receiver$fn__5620.invoke(executor.clj:415) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.disruptor$clojure_handler$reify__1741.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    ... 6 common frames omitted
            2015-04-08 04:26:39 b.s.d.executor [ERROR]
            java.lang.RuntimeException: java.lang.NullPointerException
                    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$fn__5697$fn__5710$fn__5761.invoke(executor.clj:794) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.util$async_loop$fn__452.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
                    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
            Caused by: java.lang.NullPointerException: null
                    at org.apache.storm.hdfs.bolt.HdfsBolt.execute(HdfsBolt.java:92) ~[storm-hdfs-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$fn__5697$tuple_action_fn__5699.invoke(executor.clj:659) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$mk_task_receiver$fn__5620.invoke(executor.clj:415) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.disruptor$clojure_handler$reify__1741.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]

代码:

    TopologyBuilder builder = new TopologyBuilder();    
    Config config = new Config();
    //config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 7000);
    config.setNumWorkers(1);
    config.setDebug(true);  
    //LocalCluster cluster = new LocalCluster();


    //zookeeper
    BrokerHosts brokerHosts = new ZkHosts("192.168.134.137:2181", "/brokers");      

    //spout
    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "myTopic", "/kafkastorm", "KafkaSpout");
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    spoutConfig.forceFromStart = true;
    builder.setSpout("stormspout", new KafkaSpout(spoutConfig),4);

    //bolt
    SyncPolicy syncPolicy = new CountSyncPolicy(10); //Synchronize data buffer with the filesystem every 10 tuples
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); // Rotate data files when they reach five MB
    FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/stormstuff"); // Use default, Storm-generated file names
    builder.setBolt("stormbolt", new HdfsBolt()
                                 .withFsUrl("hdfs://192.168.134.137:8020")//54310
                                 .withSyncPolicy(syncPolicy)
                                 .withRotationPolicy(rotationPolicy)
                                 .withFileNameFormat(fileNameFormat),2
                    ).shuffleGrouping("stormspout");        


    //cluster.submitTopology("ColmansStormTopology", config, builder.createTopology());     

    try {
        StormSubmitter.submitTopologyWithProgressBar("ColmansStormTopology", config, builder.createTopology());

    } catch (AlreadyAliveException e) {
        e.printStackTrace();
    } catch (InvalidTopologyException e) {
        e.printStackTrace();
    }

POM.XML 依赖项

              <dependencies>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>0.9.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka</artifactId>
                <version>0.9.3</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency> 
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-hdfs</artifactId>
                <version>0.9.3</version>
            </dependency>
                    <dependency>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka_2.10</artifactId>
                        <version>0.8.1.1</version>
                        <exclusions>
                            <exclusion>
                                <groupId>log4j</groupId>
                                <artifactId>log4j</artifactId>
                            </exclusion>
                            <exclusion>
                                <groupId>org.slf4j</groupId>
                                <artifactId>slf4j-simple</artifactId>
                            </exclusion>
                        </exclusions>
                    </dependency>
              </dependencies>  

【问题讨论】:

  • 在将shuffleGrouping 添加到喷口之前,未连接螺栓。连接螺栓后,实际开始处理数据。你需要分享你的 bolt & spout 代码并添加一些 traces 到 bolt
  • 是的,我认为这就是发生的事情,这就是我的全部代码?我应该有别的东西吗?再次,我刚刚开始研究风暴,直到现在我一直在使用水槽。干杯,科尔曼
  • 你可以在本地模式下运行storm,这样你就可以用你的IDE调试它
  • 是的......我不能真正做到这一点,因为我目前正在 Windows 上开发并在 hortonworks 沙箱上运行应用程序,同时我们正在等待我们的集群到达......跨度>
  • ok ,显然这是bolt中的错误,如果你不能在本地模式下运行它,在你的代码中添加跟踪,看看它失败的地方,如果你需要这里的帮助,分享你的和我们一起编码

标签: hadoop hdfs apache-storm hortonworks-data-platform


【解决方案1】:

首先尝试从 execute 方法发出值,如果您是从不同的工作线程发出的,那么让所有工作线程在 LinkedBlockingQueue 中提供数据,并且只有一个工作线程将允许从LinkedBlockingQueue。

其次,尝试将 Config.setMaxSpoutPending 设置为某个值并再次尝试运行代码,并检查场景是否持续尝试减小该值。

参考 - Config.TOPOLOGY_MAX_SPOUT_PENDING:设置一次可以在单个 spout 任务上挂起的 spout 元组的最大数量(挂起意味着元组尚未被确认或失败)。强烈建议您设置此配置以防止队列爆炸。

【讨论】:

  • 我设置了 max_spout_pending 并没有产生任何影响,我没有发出任何东西,因为这一切都是在框架内完成的。您是在谈论覆盖 KafkaSpout 执行方法吗?我真的觉得这是某种程度上的版本不匹配。也许某些 jar 文件引入了不正确的依赖项,或者我的 pom,xml 错误?我在上面的pom文件中添加了依赖,你看看可以吗?
  • 我已经在错误之前包含了错误块,有人可以帮忙吗?!
【解决方案2】:

我最终通过风暴源代码弄清楚了这一点。

我没有设置

RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");

并像这样包含它

    builder.setBolt("stormbolt", new HdfsBolt()
                                      .withFsUrl("hdfs://192.168.134.137:8020")//54310
                                 .withSyncPolicy(syncPolicy)
                                 .withRecordFormat(format)
                                 .withRotationPolicy(rotationPolicy)
                                 .withFileNameFormat(fileNameFormat),1
                    ).shuffleGrouping("stormspout");    

在 HDFSBolt.Java 类中,它会尝试使用它,如果未设置,它基本上会崩溃。这就是 NPE 的来源。

希望这对其他人有所帮助,请确保您已设置本课程所需的所有位。更有用的错误消息,例如“RecordFormat not set”会很好....

【讨论】:

    猜你喜欢
    • 2014-07-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多