一、RocketMQ 4.X 生产者常见核心配置
- compressMsgBodyOverHowmuch :消息超过默认字节4096后进行压缩
- retryTimesWhenSendFailed : 失败重发次数
- maxMessageSize : 最大消息配置,默认128k
- topicQueueNums : 主题下面的队列数量,默认是4
- autoCreateTopicEnable : 是否自动创建主题 Topic, 开发建议为 true,生产要为 false
- defaultTopicQueueNums : 自动创建服务器不存在的 Topic,默认创建的队列数
- autoCreateSubscriptionGroup: 是否允许 Broker 自动创建订阅组,建议线下开发开启,线上关闭
- brokerClusterName : 集群名称
- brokerId : 0表示Master主节点,大于0表示从节点
- brokerIP1 : Broker 服务地址
- brokerRole : Broker 角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE
- deleteWhen : 每天执行删除过期文件的时间,默认每天凌晨4点
- flushDiskType :刷盘策略, 默认为 ASYNC_FLUSH (异步刷盘),另外是 SYNC_FLUSH (同步刷盘)
- listenPort : Broker 监听的端口号
- mapedFileSizeCommitLog : 单个 conmmitlog 文件大小,默认是 1GB
- mapedFileSizeConsumeQueue:ConsumeQueue 每个文件默认存30W条,可以根据项目调整
- storePathRootDir : 存储消息以及一些配置信息的根目录 默认为用户的 ${HOME}/store
- storePathCommitLog:commitlog 存储目录默认为 ${storePathRootDir}/commitlog
- storePathIndex: 消息索引存储路径
- syncFlushTimeout : 同步刷盘超时时间
- diskMaxUsedSpaceRatio : 检测可用的磁盘空间大小,超过后会写入报错
二、Broker 消息投递状态有四种
- FLUSH_DISK_TIMEOUT :没有在规定时间内完成刷盘 (刷盘策略需要为 SYNC_FLUSH 才会出这个错误)
- FLUSH_SLAVE_TIMEOUT :主从模式下,Broker 是 SYNC_MASTER,没有在规定时间内完成主从同步
- SLAVE_NOT_AVAILABLE : 主从模式下,Broker 是 SYNC_MASTER,没有找到被配置成 Slave 的 Broker
- SEND_OK :发送成功,没有发生上面的三种问题
三、RocketMQ 消息生产和消费异常重试和阈值设定
生产者 Producer 重试(异步和 SendOneWay下配置无效)
- 消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是2,如果网络情况比较差,或者跨集群则建改多几次
消费端重试
- 原因:消息处理异常、broker 端到 consumer 端各种问题,如网络原因闪断,消费处理失败,ACK 返回失败等等问题。
- 注意:
- 重试间隔时间配置 ,默认每条消息最多重试 16 次
- messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 超过重试次数人工补偿
- 消费端去重
- 一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
- 消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
PayController 类代码如下:
package net.xdclass.xdclassmq.controller; import net.xdclass.xdclassmq.jms.JmsConfig; import net.xdclass.xdclassmq.jms.PayProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; @RestController public class PayController { @Autowired private PayProducer payProducer; @RequestMapping("/api/v1/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(JmsConfig.TOPIC,"taga" ,"6688" , ("hello xdclass rocketmq = "+text).getBytes() ); SendResult sendResult = payProducer.getProducer().send(message); System.out.println(sendResult); return new HashMap<>(); } }