【本篇文章主要是通过一个单词计数的案例学习,来加深对storm的基本概念的理解以及基本的开发流程和如何提交并运行一个拓扑】

 

  单词计数拓扑WordCountTopology实现的基本功能就是不停地读入一个个句子,最后输出每个单词和数目并在终端不断的更新结果,拓扑的数据流如下:

  Storm入门2-单词计数案例学习

  • 语句输入Spout:  从数据源不停地读入数据,并生成一个个句子,输出的tuple格式:{"sentence":"hello world"}
  • 语句分割Bolt: 将一个句子分割成一个个单词,输出的tuple格式:{"word":"hello"}  {"word":"world"}
  • 单词计数Bolt: 保存每个单词出现的次数,每接到上游一个tuple后,将对应的单词加1,并将该单词和次数发送到下游去,输出的tuple格式:{"hello":"1"}  {"world":"3"}
  • 结果上报Bolt: 维护一份所有单词计数表,每接到上游一个tuple后,更新表中的计数数据,并在终端将结果打印出来。

  开发步骤:

    1.环境

  • 操作系统:mac os 10.10.3
  • JDK: jdk1.8.0_40
  • IDE: intellij idea 15.0.3
  • Maven: apache-maven-3.0.3

  2.项目搭建

  • 在idea新建一个maven项目工程:storm-learning
  • 修改pom.xml文件,加入strom核心的依赖,配置slf4j依赖,方便Log输出
<dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.6.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.0.2</version>
        </dependency>
</dependencies>

 3. Spout和Bolt组件的开发

  • SentenceSpout
  • SplitSentenceBolt
  • WordCountBolt
  • ReportBolt

SentenceSpout.java

 1 public class SentenceSpout extends BaseRichSpout{
 2 
 3     private SpoutOutputCollector spoutOutputCollector;
 4 
 5     //为了简单,定义一个静态数据模拟不断的数据流产生
 6     private static final String[] sentences={
 7             "The logic for a realtime application is packaged into a Storm topology",
 8             "A Storm topology is analogous to a MapReduce job",
 9             "One key difference is that a MapReduce job eventually finishes whereas a topology runs forever",
10             " A topology is a graph of spouts and bolts that are connected with stream groupings"
11     };
12 
13     private int index=0;
14 
15     //初始化操作
16     public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
17         this.spoutOutputCollector = spoutOutputCollector;
18     }
19 
20     //核心逻辑
21     public void nextTuple() {
22         spoutOutputCollector.emit(new Values(sentences[index]));
23         ++index;
24         if(index>=sentences.length){
25             index=0;
26         }
27     }
28 
29     //向下游输出
30     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
31         outputFieldsDeclarer.declare(new Fields("sentences"));
32     }
33 }
View Code

相关文章: