分布式实时数据处理框架——Storm

1. Storm简介与核心概念

1.1 Storm 简介

全称为 Apache Storm,是一个分布式实时大数据处理系统。它是一个流数据框架,具有最高的获取率。它比较简单,可以并行地对实时数据执行各种操作。它通过Apache ZooKeeper 集群管理分布式环境和集群状态。Apache Storm 继续成为实时数据分析的领导者。Storm 易于设置和操作,并且它保证每个消息将通过拓扑至少处理一次。

Storm区别于Hadoop,但又被业界称为实时版的Hadoop,源于越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计、推荐系统、预警系统、金融系统(高频交易、股票)等等,大数据实时处理解决方案(流计算)的应用日趋广泛,目前已是分布式技术领域最新爆发点,而Storm更是流计算技术中的佼佼者和主流。

1.2 Storm 核心概念(组件)

  • Nimbus:Storm 集群主节点(master),负责资源分配和任务调度。我们提交任务和截止任务都是在 Nimbus 上操作的。一个 Storm 集群只有一个 Nimbus 节点;
  • Supervisor:Storm 集群工作节点(slave),负责接收Nimbus分配的任务,管理所有Worker,一个Supervisor节点中包含多个Worker进程;
  • Worker:工作进程,每个工作进程中都有多个 Task;
  • Task:任务,每个 Spout 和 Bolt 都是一个任务,每个任务都是一个线程;
  • Topology:计算拓扑,是对实时计算应用逻辑的封装,它的作用与 MapReduce 的任务(Job)很相似,区别在于 MapReduce 的一个 Job 在得到结果之后总会结束,而拓扑会一直在集群中运行,直到你手动去终止它。拓扑还可以理解成由一系列通过数据流(Stream Grouping)相互关联的 Spout 和 Bolt 组成的的拓扑结构;
  • Stream:消息流,是拓扑中数据流的来源,关键抽象概念,是没有边界的 Tuple 序列;
  • Spout:消息流的源头,Topology 的消息生产者,一般 Spout 会从一个外部的数据源读取元组然后将他们发送到拓扑中,这些数据源可以来自数据库、文件、消息队列;
  • Bolt:消息处理单元,可以完成过滤、聚合、联结、查询数据库等几乎所有的数据处理需求;
  • Stream grouping:消息分发策略,一共 8 种,定义每个 Bolt 接受何种输入;
  • Reliability:可靠性,Storm 保证每个 Tuple 都会被处理。

大数据014——Storm 简介及入门案例

1.3 Storm 架构原理

1.3.1 Storm 集群架构

大数据014——Storm 简介及入门案例

Zookeeper 集群负责Nimbus 节点和 Supervior 节点之间的通信,监控各个节点之间的状态,它在 Storm 集群中逻辑上是独立的,但在实际部署的时候,一般会将 zk节点部署在 Nimbus 节点或 Supervisor 节点上。

1.3.2 Storm 数据处理流程

Storm 处理数据的特点:适合管理实时数据,数据源源不断,不断处理:

大数据014——Storm 简介及入门案例

  • Spout:消息流的源头,Topology 的消息生产者;
  • Bolt:消息处理单元,可以过滤、聚合、查询数据库;
  • tuple:就是一个值列表,是 storm 主要数据结构,是 storm 中使用的最基本单元、数据模型和元组。

1.3.3 Storm 拓扑图(spouts 和 bolts 组成的图)

storm 中是没有数据存储结构的,本身是不存储数据,需要我们自己设计数据落地接口,指明数据存储到哪一部分中:

大数据014——Storm 简介及入门案例

  • 第一种简单类型:由一个Spout获取数据,然后交给一个Bolt进行处理;
  • 第二种稍复杂类型:由一个Spout获取数据,然后交给一个Bolt进行处理一部分,然后在交给下一个Bolt进行处理其他部分。
  • 第三种复杂类型一个Spout可以同时发送数据到多个Bolt,而一个Bolt也可以接受多个Spout或多个Bolt,最终形成多个数据流。但是这种数据流必须是有方向的,有起点和终点,不然会造成死循环,数据永远也处理不完。就是Spout发给Bolt1,Bolt1发给Bolt2,Bolt2又发给了Bolt1,最终形成了一个环状。

1.4 Storm 的主要应用

1.4.1 数据流处理( stream processing)

Storm 可用来实时处理新数据和更新数据库,兼具容错性和可扩展性,即 Storm 可以用来处理源源不断流进来的消息,处理之后将结果写入某个存储中。如:

  • 条件过滤:这是Storm最基本的处理方式,对符合条件的数据进行实时过滤,将符合条件的数据保存下来,这种实时查询的业务需求再实际应用中很常见。
  • 中间计算:我们需要改变数据中某一个字段(例如是数值),我们需要利用一个中间值经过计算(值比较、求和、求平均等等)后改变该值,然后将数据重新输出。
  • 求TopN:相信大家对TopN类的业务需求也比较熟悉,在规定时间窗口内,统计数据出现的TopN,该类处理在购物及电商业务需求中,比较常见。

1.4.2 持续计算( continuous computation)

Storm 可进行连续查询并把结果即时反馈给客户端,如把 Twitter 上的热门话题发送到浏览器中 ,Storm能保证计算可以永久运行,直到用户结束计算进程为止。如:

  • 推荐系统:有时候在实时处理时会从mysql及hadoop中获取数据库中的信息,例如在电影推荐系统中,传入数据为:用户当前点播电影信息,从数据库中获取的是该用户之前的一些点播电影信息统计,例如点播最多的电影类型、最近点播的电影类型,及其社交关系中点播信息,结合本次点击及从数据库中获取的信息,生成推荐数据,推荐给该用户。并且该次点击记录将会更新其数据库中的参考信息,这样就是实现了简单的智能推荐。
  • 批处理:所谓批处理就是数据积攒到一定触发条件,就批量输出,所谓的触发条件类似事件窗口到了,统计数量够了即检测到某种数据传入等等。
  • 热度统计:热度统计实现依赖于Storm提供的TimeCacheMap数据结构,现在可能推荐用RotatingMap,该结构能够在内存中保存近期活跃的对象。我们可以使用它来实现例如论坛中热帖排行计算等。

1.4.3 分布式远程程序调用( distributed RPC)

轻松地并行化CPU密集型操作。如:

  • 分布式RPC:Storm有对RPC进行专门的设计,分布式RPC用于对Storm上大量的函数进行并行计算,最后将结果返回给客户端。

2. Storm 入门案例

通过Strom API 编写入门案例,在非集群的本地环境下运行体验 Storm 实时处理数据的底层实现,有助于更好的理解 Storm 实时数据处理的开发流程。

2.1 数字累加操作

业务 对1,2,3,4....这种递增数字进行累加求和
topology spout 负责产生从1开始的递增数据,每次加1
bolt 负责对spout发送出来的数据进行累加求和
  1. eclipse 开发工具借助 Maven 环境新建项目;
  2. 添加 Storm 依赖:
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
</dependency>
  1. 编写代码:
public class LocalTopology {
	
	/**
	 * 自定义的spout需要继承BaseRichSpout
	 */
	public static class MySpout extends BaseRichSpout{
		
		private Map conf;
		private TopologyContext context;
		private SpoutOutputCollector collector;
		/**
		 * 初始化方法,在spout组件初始化的时候只执行一次;执行初始化操作,如:如果需要实现对mysql的操		  * 作,需要使用连接池,那么连接池初始化的代码就需要放在open方法里面。
		 * 		Map conf:其实是storm的配置类,这里面可以保存一个配置信息在storm中进行传递。
		 * 		TopologyContext context:topology的上下文对象
		 *		SpoutOutputCollector collector:发射器,负责发射数据
		 */
		@Override
		public void open(Map conf, TopologyContext context,
				SpoutOutputCollector collector) {
			this.conf = conf;
			this.context = context;
			this.collector = collector;
		}

		/**
		 * 这个方法会被框架一直调用,死循环的调用
		 * spout负责在nextTuple中向外发射数据
		 * 这个方法每执行一次,都会向外发射一条数据。
		 */
		int num=0;
		@Override
		public void nextTuple() {
			num++;//这样就可以产生递增的数字
			this.collector.emit(new Values(num));
			System.out.println("spout:"+num);
			Utils.sleep(1000);//线程休眠1000sm,防止耗尽资源
		}

		/**
		 * 声明输出字段
		 * 这个方法也是在开始执行一次
		 */
		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			//注意:Fields中封装的字段和values中封装的数据是一一对应的
			declarer.declare(new Fields("num"));
		}
		
	}
	
	/**
	 * 自定义的bolt需要继承baserichbolt
	 */
	public static class SumBolt extends BaseRichBolt{
		
		private Map stormConf;
		private TopologyContext context;
		private OutputCollector collector;
		/**
		 * 是一个初始化方法,也是只会执行一次
		 */
		@Override
		public void prepare(Map stormConf, TopologyContext context,
				OutputCollector collector) {
			this.stormConf = stormConf;
			this.context = context;
			this.collector = collector;
		}

		/**
		 * 这个方法也会执行很多次,只要上一个组件发射一条数据,那么这个bolt就收到这个条数据,然后调用		   * execute方法去处理数据
		 */
		int sum = 0;
		@Override
		public void execute(Tuple input) {
			Integer num = input.getIntegerByField("num");
			//因为tuple其实就是一个list,list有角标,所以也可以通过脚本获取数据
			//Integer num = input.getInteger(0);
			sum+=num;
			System.out.println("和为:"+sum);
			//注意:这个bolt已经是最后一个bolt了,所以就不需要向外面发射数据了,也就不用调用emit方法了。
		}
		
		/**
		 * 注意:如果这个组件没有调用emit向外发射数据,那么这个方法就不需要实现。
		 * 如果这个组件向外发射了数据,那么在这个方法内部必须要声明输出字段。
		 */
		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			
		}
		
	}
	
	public static void main(String[] args) {
		//把spout和bolt组装成一个topology去执行
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		topologyBuilder.setSpout("spoutid", new MySpout());
		topologyBuilder.setBolt("boltid", new SumBolt()).shuffleGrouping("spoutid");		
		//创建本地集群,即模拟集群
		LocalCluster localCluster = new LocalCluster();
		//拓扑的名称
		String simpleName = LocalTopology.class.getSimpleName();
		//配置类
		Config config = new Config();
		localCluster.submitTopology(simpleName, config, topologyBuilder.createTopology());
	}
	
}
  1. 结果:
spout:1
和为:1
spout:2
和为:3
spout:3
和为:6
spout:4
和为:10
.....

2.2 实时累计成交订单金额

业务 模拟解析天猫用户订单数据,对用户下单总金额进行实时汇总
topology tmalldataspout 负责读取文件中的一行行内容
parsebolt 负责对订单信息进行解析
sumbolt 负责对订单金额进行汇总
  1. 项目根目录下传入tmalldata.txt文件:
100010	会飞的鱼_123	13210982233	2016/12/1 10:11	89.9	请发顺丰快递
100011	别来我回忆里微笑	18712317876	2016/12/1 12:21	199	无
100012	真心不如红钞票	18623450989	2016/12/1 12:43	39.9	无
100013	qingfeng7109	18901292615	2016/12/1 13:11	167	无
100014	梦醒de泪痕	13910002697	2016/12/1 13:49	132	请开具纸质发票
100015	wuah1314	17911112836	2016/12/1 14:07	248	无
100016	黑白色	15611595966	2016/12/1 14:18	178	无
100017	仰望天猪	13311295111	2016/12/1 14:29	86.9	无
100018	天涯浪子	18511512315	2016/12/1 15:20	78.2	无
100019	dingdang	17210963299	2016/12/1 15:39	156	无
100020	堕落的永恒	15610615388	2016/12/1 15:54	124	无
  1. 编写代码:
public class LocalTmallTopology {
	
	public static class TmallDataSpout extends BaseRichSpout{
		
		private Map conf; 
		private TopologyContext context;
		private SpoutOutputCollector collector;
		private BufferedReader bufferedReader;
		@Override
		public void open(Map conf, TopologyContext context,
				SpoutOutputCollector collector) {
			this.conf = conf;
			this.context  =context;
			this.collector = collector;
			try {
                //读取项目根目录下的tmalldata.txt文件
				bufferedReader = new BufferedReader(new FileReader("tmalldata.txt"));
			} catch (FileNotFoundException e) {
				e.printStackTrace();
			}
		}
		String line = null;
		@Override
		public void nextTuple() {
			try {
				line = bufferedReader.readLine();//每次读取一行
				if(line!=null){
					this.collector.emit(new Values(line));//若该行为非空数据,提交这一行数据
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			declarer.declare(new Fields("line"));
		}
	}
	
	/**
	 * 解析数据的bolt
	 */
	public static class ParseBolt extends BaseRichBolt{
		
		private Map stormConf; 
		private TopologyContext context;
		private OutputCollector collector;
		@Override
		public void prepare(Map stormConf, TopologyContext context,
				OutputCollector collector) {
			this.stormConf = stormConf;
			this.context = context;
			this.collector = collector;
		}

		@Override
		public void execute(Tuple input) {
			String line = input.getStringByField("line");
            //按制表符切分每一行数据
			String[] splits = line.split("\t");
			//可能会存储数据异常的情况,在这对数据进行过滤
			if(splits.length==6){
				//把订单id和订单金额发送出去
				this.collector.emit(new Values(splits[0],splits[4]));
			}
		}

		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			declarer.declare(new Fields("id","actualFee"));
		}
	}
	
	/**
	 * 汇总订单总金额
	 */
	public static class SumBolt extends BaseRichBolt{

		private Map stormConf; 
		private TopologyContext context;
		private OutputCollector collector;
		@Override
		public void prepare(Map stormConf, TopologyContext context,
				OutputCollector collector) {
			this.stormConf = stormConf;
			this.context = context;
			this.collector = collector;
		}
		
		double sum = 0;
		@Override
		public void execute(Tuple input) {
			String price = input.getStringByField("actualFee");
			sum+=Double.parseDouble(price);
			System.out.println("订单总金额:"+sum);
		}

		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			
		}
	}	
	
	public static void main(String[] args) {
		//注意:同一个topology内部,组件id不能相同,并且id不能以__开头,因为这个是系统保留id
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		topologyBuilder.setSpout("spoutid", new TmallDataSpout());
		topologyBuilder.setBolt("boltid1", new ParseBolt()).shuffleGrouping("spoutid");
		topologyBuilder.setBolt("boltid2", new SumBolt()).shuffleGrouping("boltid1");
		
		LocalCluster localCluster = new LocalCluster();
		String simpleName = LocalTmallTopology.class.getSimpleName();
		Config config = new Config();
		localCluster.submitTopology(simpleName, config, topologyBuilder.createTopology());
	}	

}
  1. 结果:
订单总金额:89.9
订单总金额:288.9
订单总金额:328.79999999999995
订单总金额:495.79999999999995
订单总金额:627.8
订单总金额:875.8
订单总金额:1053.8
订单总金额:1140.7
订单总金额:1218.9
订单总金额:1374.9
订单总金额:1498.9

相关文章:

  • 2021-11-05
  • 2021-07-12
  • 2021-08-24
  • 2021-08-17
  • 2022-01-19
猜你喜欢
  • 2021-05-26
  • 2021-05-02
  • 2021-11-18
  • 2021-05-29
  • 2021-10-15
  • 2021-12-17
  • 2021-06-11
相关资源
相似解决方案