0.简介
a million tuples processed per second per node
单个节点美妙百万数量级的实时计算
scalable 可扩展性
fault-tolerant 容错性
生于Twitter,收购的BackType,并开源到apache
底层语言clojure,java混合体
api:javadoc
hadoop VS storm
hadoop: map reduce
storm:spout(产生数据源) bolt(处理)
storm不同的是没有结束进程,就算没有数据进来,也不会结束(扶梯和电梯的区别)
使用场景不同(实时流处理-离线批处理)
storm VS sparkStreaming
并不是一个真正意义的实时流处理 微小批处理 可以设置,属于spark生态圈
1. 核心概念
http://storm.apache.org/releases/1.2.2/Concepts.html
Streams 数据流
Spouts 产生数据流的东西(可能是多个)reliable(ack,fail), unreliable
Bolts 处理数据流的东西(可能是多个)filtering,functions,aggregations.joins,talking todatabases execute 多线程,异步
Tuple 数据流里的数据,nextTuple方法
Topologies 整个数据处理的生产线(类似于mapreduce)
2. storm编程
Idea & Maven 构建storm项目
ISpout接口
IComponent接口
IBolt接口
求和 + 词频统计案例
环境:jdk1.8 IDEA2018.2 Maven 3.5.4
【ISpout】:负责将数据发送到topology中处理
storm会跟踪每一个spout发出去的tuple的DAG(tuple中含一个messageID 任意类型)
storm在每一个线程里面执行ack.fail.nextTuple方法,这意味着使用ISpout的时候不用担心并发的问题,因为都是线程安全的。但是这些的前提是用户必须保证,nextTuple是非阻塞的,不然ack和fail会被阻塞掉void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
一个task在worker上面执行的时候的初始化步骤void close();
spout 在shutdowm的时候调用的方法,但是并不保证close会被调用void nextTuple();发送数据
storm 请求spout发送tuple的时候调用这个方法,该方法不能阻塞void ack(Object msgId);
tuple处理成功,storm返回给spout成功的消息
【IComponent】
给topology里面的所有组件提供公用的方法void declareOutputFields(OutputFieldsDeclarer declarer);
用于声明Spout/Bolt发送的tuple的名称
【IBolt】
一个ibolt代表一个组件(component),先接受拿出tuple并且处理它
能够filtering joining functions和aggregation生命周期:
在客户端上面创建,序列化,提交到主集群上面(Nimbus)(有点像yarn中的resourcemanager)。Nimbus启动一个worker去反序列化,启动处理tuple的工作。void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
准备好一个outputCollectorvoid execute(Tuple input);
tuple含有自己的元数据:来源地,值(getValue可以访问到)cleanup
资源清理操作,就算是写了也不一定被执行
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
/**
* 使用storm实现累计求和的操作
*/
public class LocalSumStormTopology {
/**
* Spout需要继承BaseRichSpout
* 数据源需要产生数据并发射
*/
public static class DataSourceSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
int number=0;
/**
* 只会被调用一次
* @param conf 配置参数 暂时不管
* @param context 上下文
* @param collector 数据的发射器
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector=collector;
}
/**
* 会产生数据,在生产上肯定是从消息队列中获取数据
* 应该设计成一个死循环,一直发送,因为是处理实时流数据
*/
@Override
public void nextTuple() {
this.collector.emit(new Values(++number));//Values是一个ArrayList数组,所有的构造方法的参数都能加入到数组中
System.out.println("Spout:"+number);
//不要一次性太多;防止数据产生太快
Utils.sleep(1000);
}
/**
* 声明输出字段
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("number_"));//因为前面values里面只有一个东西,因袭这里对应上只给一个名称
}
}
/**
* 数据的累积求和Bolt:接受数据并处理
*/
public static class SumBolt extends BaseRichBolt{
int sum=0;
/**
* 初始化方法,只会被执行一次
* @param stormConf
* @param context
* @param collector 因此这一次的业务逻辑很简单,不需要继续往下面一个bolt发送
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
/**
* 也是一个死循环:获取spout发送给过来的数据
* @param input
*/
@Override
public void execute(Tuple input) {
//可以bolt中获取值,可以根据index获取,也可以根据field名称获取,建议使用名称获取
Integer value = input.getIntegerByField("number_");
sum+=value;
System.out.println("Bolt: sum=[ "+sum+"]");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
/**
* 如果不是本地,而是真正在storm集群上面提交,需要用的不是LocalCluster而是StormSubmitter
*/
LocalCluster cluster=new LocalCluster();//创建一个模拟的本地storm集群,本地模式运行不需要搭建storm集群
//builder中可以设置spout和bolt的执行顺序,其中id都是自定义的
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("DataSourceSpout_",new DataSourceSpout());
builder.setBolt("SumBolt_",new SumBolt()).shuffleGrouping("DataSourceSpout_");//shuffle指定执行顺序
StormTopology topology = builder.createTopology();
/**
* 向集群提交一个topology,参数在源代码中并没有,
* 第一个是类名称
* 第二个是new Config
* 第三个是一StormTopology,使用TopologyBuilder创建
*/
cluster.submitTopology("LocalSumStormTopology",new Config(),topology);
}
}