1,启动流程

RocketMQ原理解析-producer(转)

Producer如何感知要发送消息的broker(brokerAddrTable中的值是怎么获得的)?

  • producer本地集合中没有,会根据指定topic到namesrv获取TopicPublishInfo,并放入本地集合。
  • 定时从namesrv更新topic路由信息。

Producer与broker间的心跳

Producer定时发送心跳,将producer信息(其实就是procduer的group)定时发送到broker(brokerAddrTable集合中列出的)。

master接收消息,slave拷贝master

Producer发送消息,只发送到broker_master机器,通过broker的主从复制机制拷贝到broker_slave。

2,如何发送消息

producer轮询某topic下的所有queue的方式 实现发送方的负载均衡。

RocketMQ原理解析-producer(转)

Topic下的所有队列如何理解?

Broker Topic queue 注册namesrv队列(Topic_A)
broker1 Topic_A queue0 , queue1 broker1_queue0 ,broker1_queue1
broker2 Topic_A queue0, queue2, queue3 broker2_queue0,broker2_queue1,broker2_queue2
broker3 Topic_A queue0 broker3_queue0

Producer如何实现轮询队列?

Producer从namesrv获得topic_A路由信息TopicPublishInfo。

//Topic_A的所有的队列
private List<MessageQueue> messageQueueList
//自增整型
private volatile ThreadLocalIndex sendWhichQueue
/**
  *选择一个发送队列
  *lastBrokerName不为空,代表上次选择的queue失败,本次避开同一个queue
  */
public MessageQueue selectOneMessageQueue(final String lastBrokerName){
  //计算队列下标
  //sendWhichQueue.getAndIncrement()每次调用+1
  Math.abs(sendWhichQueue.getAndIncrement()) % this.messageQueueList.size()
}

Producer发消息系统重试?

//消息发送失败重试次数默认为2
private int retryTimesWhenSendFailed = 2;
//发送消息超时时间
private int sendMsgTimeout = 3000;

发送失败,换个队列继续发送所需条件:

1. 重试次数不到retryTimesWhenSendFailed (默认2次)
2. 发送此条消息花费时间还没有到sendMsgTimeout (默认3000毫秒)

3,如何发送顺序消息

4,发送分布式事物消息

5,消息在broker落地之普通消息

6,消息在broker落地之事物消息

相关文章: