executor线程数 //并行度指的是executor并行度
三者关系:一个机器上有一个supervisor,一个supervisor可以运行多个worker进程,默认是4个,一个worker进程可以运行一个或多个executor线程,一个executor线程可以运行一个或多个task线程。Spout和Bolt都运行在task之上。一个topology可以运行在集群中一台或者多台机器上、一个或多个worker进程上。
对于一个topology结构来说,task的数量是确定的;executor线程的数量可能是改变的,也就是说,线程数<=task数,默认情况下,exexutor数等于task数。
Total slots:4 //配置文件中配置了4个worker
// 代码提交到Storm集群上运行
String topoName = ClusterSumStormWorkersTopology.class.getSimpleName();
try {
Config config = new Config();
config.setNumWorkers(2);
config.setNumAckers(0);
StormSubmitter.submitTopology(topoName,config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
因为设置了2个worker,所以worker进程数为2,Used Slot为2,
因为设置了2个task:一个spout、一个bolt,所以有2个executor。
②设置executor数量:设置了2个worker、2个Spout(executor)、2个Bolt(executor)
public static void main(String[] args) {
// TopologyBuilder根据Spout和Bolt来构建出Topology
// Storm中任何一个作业都是通过Topology的方式进行提交的
// Topology中需要指定Spout和Bolt的执行顺序
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout(), 2);
builder.setBolt("SumBolt", new SumBolt(), 2).shuffleGrouping("DataSourceSpout");
// 代码提交到Storm集群上运行
String topoName = ClusterSumStormExecutorsTopology.class.getSimpleName();
try {
Config config = new Config();
config.setNumWorkers(2);
config.setNumAckers(0);
StormSubmitter.submitTopology(topoName,config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
因为在代码中设置了2个worker,所以这里用掉了2个slot。
因为在代码中设置了2个Spout(executor)、2个Bolt(executor),所以executor数为4,又因为一个executor启动一个task,所以有4个task。
③设置task数量:2个worker(slot)、2个Spout(executor),2个Bolt(executor),这2个Bolt设置4个task。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout(), 2);
builder.setBolt("SumBolt", new SumBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("DataSourceSpout");
// 代码提交到Storm集群上运行
String topoName = ClusterSumStormTasksTopology.class.getSimpleName();
try {
Config config = new Config();
config.setNumWorkers(2);
config.setNumAckers(0);
StormSubmitter.submitTopology(topoName,config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
在spout上设置了2个executor,又因为一个executor默认启动一个task,所以有2个task;
在bolt上设置了2个executor,又设置了4个task,所以有4个task;
④设置acker数量:设置2个worker、4个executor、4个task
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout(), 2);
builder.setBolt("SumBolt", new SumBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("DataSourceSpout");
// 代码提交到Storm集群上运行
String topoName = ClusterSumStormAckerTopology.class.getSimpleName();
try {
Config config = new Config();
config.setNumWorkers(2);
StormSubmitter.submitTopology(topoName,config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
2个worker启动2个acker,2个acker对应2个executor,所以有6个executor;这2个executor对应2个task;所以有8个task。
http://storm.apache.org/releases/current/Understanding-the-parallelism-of-a-Storm-topology.html
Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout");
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
.shuffleGrouping("green-bolt");
StormSubmitter.submitTopology(
"mytopology",
conf,
topologyBuilder.createTopology()
);
http://storm.apache.org/releases/1.1.2/Command-line-client.html
You have two options to rebalance a topology:
- Use the Storm web UI to rebalance the topology.
- Use the CLI tool storm rebalance as described below.
Here is an example of using the CLI tool:
## Reconfigure the topology "mytopology" to use 5 worker processes, ## the spout "blue-spout" to use 3 executors and ## the bolt "yellow-bolt" to use 10 executors.
Syntax: storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10