Storm集群架构中各模块进行外部通信,拓扑程序中各个Task进行内部通信。

内部通信

  Bolt跨服务器发送Tuple的时候,需要借助socketServer保持网络连接状态。每个executor都有一个都有一个缓存队列:executor—>nextTuple/execute;每个worker都有输入和输出的管理器,其中管理器要维持socket连接。
    输出:Map<ip+port,socket object>
    输入:ServerSocket(ip,port)—accept接受—对数据进行分发
Storm内部通信以及ack-fail机制
对于worker进程,为了管理流入和传出的消息,每个worker进程有一个独立的接受线程对配置的TCP端口supervisor.ports进行监听。worker接受线程通过网络接收数据,并根据Tuple中包含的taskId,匹配到对应的executor。然后根据executor找到对应的incoming-queue,将数据发送到incoming-queue队列中。业务逻辑执行现成消费incoming-queue的数据,通过调用Bolt的execute(xxxx)方法,将Tuple作为参数传输到自定义的方法中。业务逻辑执行完毕之后,将计算的中间数据发送给outgoing-queue队列,当outgoing-queue中的Tuple达到一定的阈值,executor的发送线程将批量获取outgoing-queue中的tuple并发送到worker的transfer-queue中。Worker发送线程消费transfer-queue中的数据,计算Tuple的目的地,连接不同的ip+port,将数据通过网络传输到另一个Worker。
  Disruptor是一个有界队列,应用于“生产者-消费者”模型。其是一种线程之间信息无锁的交换方式(使用CAS(Compare And Swap/Set)操作)。因为没有锁机制,所以Disruptor没有竞争速度非常快。所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结构。在每个对象中都能跟踪***(Ring Buffer,claim Strategy,生产者和消费者),再加上cache line padding,意味着没有伪共享和非预期的竞争。
  Disruptor可以看成一个事件监听或消息机制,在队列中一边生产者放入消息,另一边消费者并行取出处理。其底层是单个数据结构:一个ring buffer。每个生产者和消费者都有一个次序计算器,以显示当前缓冲工作方式。每个生产者消费者能够操作自己的次序计数器的读取对方的计数器,生产者能够读取消费者的计数器确保其在没有锁的情况下是可写的。其中,环形缓冲区Ring Buffer负责对通过Disruptor进行交换的数据(事件)进行存储和更新。Sequence通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。
Storm内部通信以及ack-fail机制
  在进行网络传输时,Netty是一个NIO client-server(客户端服务器)框架,使用Netty可以快速开发网络应用,例如服务器和客户端协议。Netty提供了一种新的方式来开发网络应用程序,使其更加容易使用并具有很强的扩展性。Netty提供了api从网络处理代码中解耦业务逻辑,其完全基于NIO实现,因此整个Netty是异步的。

消息容错

  在Storm中,可靠的信息处理机制是从Spout开始的。一个提供了可靠的处理机制的Spout需要记录发射出去的Tuple,当下游Bolt处理Tuple或者子Tuple失败时Spout能够重新发射。Storm通过调用Spout的nextTuple()发送一个Tuple。为实现可靠的消息处理,首先要给每个发出的Tuple带上唯一的ID,并且将ID作为参数传递给SpoutOutputCollector的emit()方法:collector.emit(new Values(“value1”,“value2”),msgid);给每个Tuple指定ID告诉Storm系统,无论处理成功还是失败,Spout都要接受Tuple树上所有节点返回的通知。如果处理成功,Spout的ack()方法将会对编号是msgid的消息应答确认;如果处理失败或者超时,会调用fail()方法。
  Storm系统中有一组叫作“acker”的特殊的任务,负责跟踪DAG(有向无环图)中的每个消息。acker任务保存了Spout id到一对值得映射。第一个值就是Spout的任务id,通过这个id,acker就知道消息处理完成时该通知哪个Spout任务。第二个值是一个64bit的数字,即“ack val”,是树中所有消息的随机id的 异或计算结果。
            <Taskid,<Rootid,ackValue>>
ack val表示了整棵树的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和应答的时候都会有相同的消息id发送过来做异或。每当acker发现一棵树的ack val值为0的时候,就知道这棵树已经被完全处理了。
  有三种方法可以去掉消息的可靠性:
1、将参数Config.TOPOLOGY_ACKERS设置为0,通过此方法,当Spout发送一个消息的时候,它的ack方法将立刻被调用。
2、Spout发送一个消息时,不指定此消息的messageID。当需要关闭特定消息可靠性的时候,可以使用此方法。
3、在emit方法中不指定输入消息,因此这些子孙消息没有被锚定在任何Tuple Tree中,因此失败不会引起任何Spout重新发送消息。

相关文章: