1、与并行度相关:

worker进程数

executor线程数               //并行度指的是executor并行度

task线程数

三者关系:一个机器上有一个supervisor,一个supervisor可以运行多个worker进程,默认是4个,一个worker进程可以运行一个或多个executor线程,一个executor线程可以运行一个或多个task线程。Spout和Bolt都运行在task之上。一个topology可以运行在集群中一台或者多台机器上、一个或多个worker进程上。

对于一个topology结构来说,task的数量是确定的;executor线程的数量可能是改变的,也就是说,线程数<=task数,默认情况下,exexutor数等于task数。

默认:

       一个supervisor节点最多启动4个worker进程 

       每一个topo默认占用一个worker进程        

       每个worker进程会启动一个executor       

       每个executor启动一个task               

2、Storm运行UI界面:

Total slots:4      //配置文件中配置了4个worker

一个acker对应一个executor。

3、具体设置:

①设置worker数量:设置了2个worker

// 代码提交到Storm集群上运行
        String topoName = ClusterSumStormWorkersTopology.class.getSimpleName();
        try {
            Config config = new Config();
            config.setNumWorkers(2);
            config.setNumAckers(0);
            StormSubmitter.submitTopology(topoName,config, builder.createTopology());
        } catch (Exception e) {
            e.printStackTrace();
        }

运行前:

四、Storm并行度

运行后:

四、Storm并行度

四、Storm并行度

四、Storm并行度

因为设置了2个worker,所以worker进程数为2,Used Slot为2,

因为设置了2个task:一个spout、一个bolt,所以有2个executor。

设置executor数量:设置了2个worker、2个Spout(executor)、2个Bolt(executor)

public static void main(String[] args) {

        // TopologyBuilder根据Spout和Bolt来构建出Topology
        // Storm中任何一个作业都是通过Topology的方式进行提交的
        // Topology中需要指定Spout和Bolt的执行顺序
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout", new DataSourceSpout(), 2);
        builder.setBolt("SumBolt", new SumBolt(), 2).shuffleGrouping("DataSourceSpout");

        // 代码提交到Storm集群上运行
        String topoName = ClusterSumStormExecutorsTopology.class.getSimpleName();
        try {
            Config config = new Config();
            config.setNumWorkers(2);
            config.setNumAckers(0);
            StormSubmitter.submitTopology(topoName,config, builder.createTopology());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

运行之前:

四、Storm并行度

运行之后:

四、Storm并行度

四、Storm并行度

因为在代码中设置了2个worker,所以这里用掉了2个slot。

因为在代码中设置了2个Spout(executor)、2个Bolt(executor),所以executor数为4,又因为一个executor启动一个task,所以有4个task。

③设置task数量:2个worker(slot)、2个Spout(executor),2个Bolt(executor),这2个Bolt设置4个task。

TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout", new DataSourceSpout(), 2);
        builder.setBolt("SumBolt", new SumBolt(), 2)
                .setNumTasks(4)
                .shuffleGrouping("DataSourceSpout");

        // 代码提交到Storm集群上运行
        String topoName = ClusterSumStormTasksTopology.class.getSimpleName();
        try {
            Config config = new Config();
            config.setNumWorkers(2);
            config.setNumAckers(0);
            StormSubmitter.submitTopology(topoName,config, builder.createTopology());
        } catch (Exception e) {
            e.printStackTrace();
        }

运行后:

四、Storm并行度

四、Storm并行度

2个worker对应2个Used slot;

在spout上设置了2个executor,又因为一个executor默认启动一个task,所以有2个task;

在bolt上设置了2个executor,又设置了4个task,所以有4个task;

总的来说,有4个executor,6个task。

④设置acker数量:设置2个worker、4个executor、4个task

TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout", new DataSourceSpout(), 2);
        builder.setBolt("SumBolt", new SumBolt(), 2)
                .setNumTasks(4)
                .shuffleGrouping("DataSourceSpout");

        // 代码提交到Storm集群上运行
        String topoName = ClusterSumStormAckerTopology.class.getSimpleName();
        try {
            Config config = new Config();
            config.setNumWorkers(2);
            StormSubmitter.submitTopology(topoName,config, builder.createTopology());
        } catch (Exception e) {
            e.printStackTrace();
        }

运行后:

四、Storm并行度

四、Storm并行度

2个worker对应2个Used slot;

2个worker启动2个acker,2个acker对应2个executor,所以有6个executor;这2个executor对应2个task;所以有8个task。

4、并行度案例分析与动态调整

并行度案例分析:

http://storm.apache.org/releases/current/Understanding-the-parallelism-of-a-Storm-topology.html

四、Storm并行度

Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes

topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout");

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
               .shuffleGrouping("green-bolt");

StormSubmitter.submitTopology(
        "mytopology",
        conf,
        topologyBuilder.createTopology()
    );

动态调整:

http://storm.apache.org/releases/1.1.2/Command-line-client.html

You have two options to rebalance a topology:

  1. Use the Storm web UI to rebalance the topology.
  2. Use the CLI tool storm rebalance as described below.

Here is an example of using the CLI tool:

## Reconfigure the topology "mytopology" to use 5 worker processes, ## the spout "blue-spout" to use 3 executors and ## the bolt "yellow-bolt" to use 10 executors.

Syntax: storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

相关文章: