一、集群提交任务流程分析

  1.集群提交操作

    参考:https://www.jianshu.com/p/6783f1ec2da0

  2.任务分配与启动流程

    参考:https://www.cnblogs.com/heitaok/p/5531535.html

 二、相关目录树

  1.组件本地目录树

  大数据入门第十六天——流式计算之storm详解(三)集群相关进阶

  2.storm zk目录树

  大数据入门第十六天——流式计算之storm详解(三)集群相关进阶

 三、集群通信

  Worker间的通信经常需要通过网络跨节点进行,Storm使用ZeroMQ或Netty(0.9以后默认使用)作为进程间通信的消息框架。

  Worker进程内部通信:不同worker的thread通信使用LMAX Disruptor来完成。

    不同topologey之间的通信,Storm不负责,需要自己想办法实现,例如使用kafka等;

   通信图解:

    大数据入门第十六天——流式计算之storm详解(三)集群相关进阶

  相关博文参考:http://blog.csdn.net/bbaiggey/article/details/55510010?locationNum=10&fps=1

四、消息容错机制ack-fail

  1.概述

  l 在storm中,可靠的信息处理机制是从spout开始的。

  l 一个提供了可靠的处理机制的spout需要记录他发射出去的tuple,当下游bolt处理tuple或者子tuple失败时spout能够重新发射。

  l Storm通过调用Spout的nextTuple()发送一个tuple。为实现可靠的消息处理,首先要给每个发出的tuple带上唯一的ID,并且将ID作为参数传递给  SoputOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), msgId); messageid就是用来标示唯一的tuple的,而rootid是随机生成的

给每个tuple指定ID告诉Storm系统,无论处理成功还是失败,spout都要接收tuple树上所有节点返回的通知。

  如果处理成功,spout的ack()方法将会对编号是 msgId的消息应答确认;如果处理失败或者超时,会调用fail()方法。

  2.基本实现

  Storm 系统中有一组叫做"acker"的特殊的任务,它们负责跟踪DAG(有向无环图)中的每个消息。

  acker任务保存了spout id到一对值的映射。第一个值就是spout的任务id,通过这个id,acker就知道消息处理完成时该通知哪个spout任务。第二个值是一个64bit的数字,我们称之为"ack val", 它是树中所有消息的随机id的异或计算结果。

  ack val表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的消息id发送过来做异或。 每当acker发现一棵树的ack val值为0的时候,它就知道这棵树已经被完全处理了

要实现ack机制:
1,spout发射tuple的时候指定messageId
2,spout要重写BaseRichSpout的fail和ack方法
3,spout对发射的tuple进行缓存(否则spout的fail方法收到acker发来的messsageId,
  spout也无法获取到发送失败的数据进行重发),看看系统提供的接口,
  只有msgId这个参数,这里的设计不合理,其实在系统里是有cache整个msg的,只给用户一个messageid,
  用户如何取得原来的msg貌似需要自己cache,然后用这个msgId去查询,太坑爹了
3,spout根据messageId对于ack的tuple则从缓存队列中删除,对于fail的tuple可以选择重发。 4,设置acker数至少大于0;Config.setNumAckers(conf, ackerParal);

  3.代码示例

package ackfail;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

/**
 * Created by maoxiangyi on 2016/4/25.
 */
public class MyAckFailTopology {

    public static void main(String[] args) throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("mySpout", new MySpout(), 1);
        topologyBuilder.setBolt("mybolt1", new MyBolt1(), 1).shuffleGrouping("mySpout");

        Config conf = new Config();
        String name = MyAckFailTopology.class.getSimpleName();
        if (args != null && args.length > 0) {
            String nimbus = args[0];
            conf.put(Config.NIMBUS_HOST, nimbus);
            conf.setNumWorkers(1);
            StormSubmitter.submitTopologyWithProgressBar(name, conf, topologyBuilder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name, conf, topologyBuilder.createTopology());
            Thread.sleep(60 * 60 * 1000);
            cluster.shutdown();
        }
    }
}
MyAckFailTopology

相关文章:

  • 2022-12-23
  • 2021-10-11
  • 2021-05-26
  • 2022-01-11
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-10-19
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-05-29
  • 2022-01-08
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案