0.简介

a million tuples processed per second per node
单个节点美妙百万数量级的实时计算
scalable 可扩展性
fault-tolerant 容错性

生于Twitter,收购的BackType,并开源到apache
底层语言clojure,java混合体
api:javadoc

hadoop VS storm
hadoop: map reduce
storm:spout(产生数据源) bolt(处理)
storm不同的是没有结束进程,就算没有数据进来,也不会结束(扶梯和电梯的区别)
使用场景不同(实时流处理-离线批处理)

storm VS sparkStreaming
【2019春招准备:106.storm】
并不是一个真正意义的实时流处理 微小批处理 可以设置,属于spark生态圈

1. 核心概念

http://storm.apache.org/releases/1.2.2/Concepts.html

Streams 数据流
Spouts 产生数据流的东西(可能是多个)reliable(ack,fail), unreliable
Bolts 处理数据流的东西(可能是多个)filtering,functions,aggregations.joins,talking todatabases execute 多线程,异步
Tuple 数据流里的数据,nextTuple方法
Topologies 整个数据处理的生产线(类似于mapreduce)
【2019春招准备:106.storm】

2. storm编程

Idea & Maven 构建storm项目
ISpout接口
IComponent接口
IBolt接口
求和 + 词频统计案例

环境:jdk1.8 IDEA2018.2 Maven 3.5.4

【ISpout】:负责将数据发送到topology中处理
storm会跟踪每一个spout发出去的tuple的DAG(tuple中含一个messageID 任意类型)
storm在每一个线程里面执行ack.fail.nextTuple方法,这意味着使用ISpout的时候不用担心并发的问题,因为都是线程安全的。但是这些的前提是用户必须保证,nextTuple是非阻塞的,不然ack和fail会被阻塞掉

void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
一个task在worker上面执行的时候的初始化步骤

void close();
spout 在shutdowm的时候调用的方法,但是并不保证close会被调用

void nextTuple();发送数据
storm 请求spout发送tuple的时候调用这个方法,该方法不能阻塞

void ack(Object msgId);
tuple处理成功,storm返回给spout成功的消息

【IComponent】
给topology里面的所有组件提供公用的方法

void declareOutputFields(OutputFieldsDeclarer declarer);
用于声明Spout/Bolt发送的tuple的名称

【IBolt】
一个ibolt代表一个组件(component),先接受拿出tuple并且处理它
能够filtering joining functions和aggregation

生命周期:
在客户端上面创建,序列化,提交到主集群上面(Nimbus)(有点像yarn中的resourcemanager)。Nimbus启动一个worker去反序列化,启动处理tuple的工作。

void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
准备好一个outputCollector

void execute(Tuple input);
tuple含有自己的元数据:来源地,值(getValue可以访问到)

cleanup
资源清理操作,就算是写了也不一定被执行
【2019春招准备:106.storm】

【2019春招准备:106.storm】

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

/**
 * 使用storm实现累计求和的操作
 */

public class LocalSumStormTopology {


    /**
     * Spout需要继承BaseRichSpout
     * 数据源需要产生数据并发射
     */
    public static class DataSourceSpout extends BaseRichSpout{
        private SpoutOutputCollector collector;
        int number=0;
        /**
         * 只会被调用一次
         * @param conf 配置参数 暂时不管
         * @param context 上下文
         * @param collector 数据的发射器
         */
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector=collector;
        }

        /**
         * 会产生数据,在生产上肯定是从消息队列中获取数据
         * 应该设计成一个死循环,一直发送,因为是处理实时流数据
         */
        @Override
        public void nextTuple() {
            this.collector.emit(new Values(++number));//Values是一个ArrayList数组,所有的构造方法的参数都能加入到数组中
            System.out.println("Spout:"+number);

            //不要一次性太多;防止数据产生太快
            Utils.sleep(1000);
        }


        /**
         * 声明输出字段
         * @param declarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("number_"));//因为前面values里面只有一个东西,因袭这里对应上只给一个名称
        }
    }


    /**
     * 数据的累积求和Bolt:接受数据并处理
     */
    public static class SumBolt extends BaseRichBolt{
        int sum=0;
        /**
         * 初始化方法,只会被执行一次
         * @param stormConf
         * @param context
         * @param collector 因此这一次的业务逻辑很简单,不需要继续往下面一个bolt发送
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        /**
         * 也是一个死循环:获取spout发送给过来的数据
         * @param input
         */
        @Override
        public void execute(Tuple input) {
            //可以bolt中获取值,可以根据index获取,也可以根据field名称获取,建议使用名称获取
            Integer value = input.getIntegerByField("number_");
            sum+=value;
            System.out.println("Bolt: sum=[ "+sum+"]");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    public static void main(String[] args) {
        /**
         * 如果不是本地,而是真正在storm集群上面提交,需要用的不是LocalCluster而是StormSubmitter
         */
        LocalCluster cluster=new LocalCluster();//创建一个模拟的本地storm集群,本地模式运行不需要搭建storm集群


        //builder中可以设置spout和bolt的执行顺序,其中id都是自定义的
        TopologyBuilder builder=new TopologyBuilder();
        builder.setSpout("DataSourceSpout_",new DataSourceSpout());
        builder.setBolt("SumBolt_",new SumBolt()).shuffleGrouping("DataSourceSpout_");//shuffle指定执行顺序
        StormTopology topology = builder.createTopology();
        /**
         * 向集群提交一个topology,参数在源代码中并没有,
         * 第一个是类名称
         * 第二个是new Config
         * 第三个是一StormTopology,使用TopologyBuilder创建
         */
        cluster.submitTopology("LocalSumStormTopology",new Config(),topology);
    }
}

【2019春招准备:106.storm】

相关文章: