目录
关于storm的并发机制
其中task是spout或者bolt的实例;
并发机制出现在线程级。每个任务在同一个JVM的不同线程中执行;
数据流分组
数据流分组定义了一个数据流中的tuple如何分发给拓扑中的不同bolt的task.
builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2).setNumTasks(4)
.shuffleGrouping(SENTENCE_SPOUT_ID);
并发版本的单词计数拓扑中,分隔语句的bolt指派了4个task,数据流分组中决定了指定的一个tuple会分发到哪个task上。
Storm的七种内置的数据流分组方式:
Shuffle grouping(随机分组):这种方式会随机分发 tuple 给 bolt 的各个 task,每个bolt 实例接收到的相同数量的 tuple。
Fields grouping(按字段分组):根据指定字段的值进行分组。比如说,一个数据流根据“word”字段进行分组,所有具有相同“word”字段值的 tuple 会路由到同一个 bolt 的 task 中。
All grouping(全复制分组):将所有的 tuple 复制后分发给所有 bolt task。每个订阅数据流的 task 都会接收到 tuple 的拷贝。
Globle grouping(全局分组):这种分组方式将所有的 tuples 路由到唯一一个 task 上。Storm 按照最小的 task ID 来选取接收数据的 task。注意,当使用全局分组方式时,设置 bolt 的 task 并发度是没有意义的,因为所有 tuple 都转发到同一个 task 上了。使用全局分组的时候需要注意,因为所有的 tuple 都转发到一个 JVM 实例上,可能会引起 Storm 集群中某个 JVM 或者服务器出现性能瓶颈或崩溃。
None grouping(不分组):在功能上和随机分组相同,是为将来预留的。
Direct grouping(指向型分组):数据源会调用 emitDirect() 方法来判断一个 tuple 应该由哪个 Storm 组件来接收。只能在声明了是指向型的数据流上使用。
Local or shuffle grouping(本地或随机分组):和随机分组类似,但是,会将 tuple 分发给同一个 worker 内的 bolt task(如果 worker 内有接收数据的 bolt task)。其他情况下,采用随机分组的方式。取决于 topology 的并发度,本地或随机分组可以减少网络传输,从而提高 topology 性能。
拓扑结构,数据可能跑到不同的worker上,不在一个主机上的,看分组形式。
还可以写自定义分组:
分组的具体实现
package backtype.storm.grouping;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.task.WorkerTopologyContext;
import java.io.Serializable;
import java.util.List;
public interface CustomStreamGrouping extends Serializable {
void prepare(WorkerTopologyContext var1, GlobalStreamId var2, List<Integer> var3);
List<Integer> chooseTasks(int var1, List<Object> var2);
}
prepare方法在运行时候调用,用来初始化分组信息,分组的具体实现会使用这些信息决定如何向接受task分发tuple。
WorkerTopologyContext 这个对象提供了拓扑的上下文信息,
GlobalStreamId 提供了待分组数据流的属性。
第三个变量var3,是targetTasks,是分组所有待选task的标识符列表。
通常会将
targetTasks的引用存在变量里面作为chooseTasks的参数。
chooseTasks()返回一个tuple发射目标task的标识符列表。
这两个参数是发送tuple的组件id和tuple的值。
在并发的情况时候,计算的准确度取决于是否按照tuple的内容进行适当的分组。
以下是自定义分组:
public class CustomGrouping implements CustomStreamGrouping {
//接受目标任务的id集合
private List<Integer> targetTasks ;
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks ;
}
public List<Integer> chooseTasks(int taskId, List<Object> values) {
List<Integer> subTaskIds = new ArrayList<Integer>();
for(int i = 0 ; i <= targetTasks.size() / 2 ; i ++){
subTaskIds.add(targetTasks.get(i));
}
return subTaskIds;
}
}
注意开发时候要在不同的并发度的配置下面,测试拓扑。
有保障机制的数据处理,如果发生执行失败会怎么样?
举个例子:考虑一个负责将数据持久化到数据库的bolt,怎么处理数据库更新失败的情况?
spout的可靠性
在storm中,消息的可靠性处理机制需要从spout开始的。
一个提供了可靠的处理机制的spout需要记录它发出去的tuple,
当下游的bolt处理tuple或者
子tuple失败时spout能够重新发射。
子tuple可以理解为bolt处理spout的原始tuple后,作为结果发射出去的tuple。
从另外一个视角来看,可以将spout发射的数据流看作一个tuple树的主干。
上述是tuple的树。
在有保障机制的处理过程中,bolt每收到一个tuple,都需要向上游确认应答ack 或者报错。
对于主干tuple中的一个tuple,如果tuple树上的每个bolt进行了确认应答,
spout会调用ack方法来表明这条信息已经完全处理了。
如果任何一个bolt处理tuple报错,或者处理超时,spout会调用fail方法。
为了实现可靠的消息处理,首先要给每个发出的tuple都带上唯一ID,将ID作为参数传给SpoutOutputCollector的emit()方法;
给tuple指定ID告诉Storm系统,无论执行成果还是失败,spout都要接受tuple树上所有节点的返回通知,如果处理成功,spout的ack方法将会对编号是ID的消息应答确认,如果执行失败或者超时,会调用fail()方法;
bolt的可靠性
bolt
要实现可靠消息处理机制包含两个步骤:
当发射衍生的tuple时,需要锚定读入的tuple(意思是建立读入tuple和衍生tuple的对应关系,这样下游bolt就可以通过应答确认 报错 或者超时来加入到tuple树结构中)
当处理消息成功或者失败时分别确认应答或者报错
public List<Integer> emit(Tuple anchor, List<Object> tuple) {
return this.emit("default", anchor, tuple);
}
public List<Integer> emit(List<Object> tuple) {
return this.emit("default", tuple);
}
在书中提到的应该是这两种emit,第一个把读入的和发射的新tuple进行锚定。
下游需要进行应答和报错。
如果非锚定的tuple不会对数据流的可靠性起作用。如果下游处理失败,原始的tuple不会重新发送。
那么什么样的是可靠的单词计数呢?
看git v4
到这边,storm基本入门知识补全,先告一段落了!
上面基本上是印象笔记做的笔记,直接拿到csdn做个记录~
感谢《Storm分布式实时计算模式》