对于Storm,它有一个很重要的特性:“Guarantee no data loss” ——可靠性
很显然,要做到这个特性,必须要track每个data的去向和结果。Storm是如何做到的呢——acker机制。
先概括下acker所参与的工作流程:
- Spout创建一个新的Tuple时,会发一个消息通知acker去跟踪;
- Bolt在处理Tuple成功或失败后,也会发一个消息通知acker;
- acker会找到发射该Tuple的Spout,回调其ack或fail方法。
我们说RichBolt和BasicBolt的区别是后者会自动ack。那么是不是我们只要实现了Spout的ack或fail方法就能看到反馈了呢?
试试在RandomSpout(extends BaseRichSpout )中加入如下代码:
1 public class RandomSpout extends BaseRichSpout { 2 3 private SpoutOutputCollector collector; 4 5 private Random rand; 6 7 private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"}; 8 9 @Override 10 public void open(Map conf, TopologyContext context, 11 SpoutOutputCollector collector) { 12 this.collector = collector; 13 this.rand = new Random(); 14 } 15 16 @Override 17 public void nextTuple() { 18 String toSay = sentences[rand.nextInt(sentences.length)]; 19 this.collector.emit(new Values(toSay)); 20 } 21 22 @Override 23 public void declareOutputFields(OutputFieldsDeclarer declarer) { 24 declarer.declare(new Fields("sentence")); 25 } 26 27 }