一、RocketMQ 4.X 生产者常见核心配置

  • compressMsgBodyOverHowmuch :消息超过默认字节4096后进行压缩
  • retryTimesWhenSendFailed : 失败重发次数
  • maxMessageSize : 最大消息配置,默认128k
  • topicQueueNums : 主题下面的队列数量,默认是4
  • autoCreateTopicEnable : 是否自动创建主题 Topic, 开发建议为 true,生产要为 false
  • defaultTopicQueueNums : 自动创建服务器不存在的 Topic,默认创建的队列数
  • autoCreateSubscriptionGroup: 是否允许 Broker 自动创建订阅组,建议线下开发开启,线上关闭
  • brokerClusterName : 集群名称
  • brokerId : 0表示Master主节点,大于0表示从节点
  • brokerIP1 : Broker 服务地址
  • brokerRole : Broker 角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE
  • deleteWhen : 每天执行删除过期文件的时间,默认每天凌晨4点
  • flushDiskType :刷盘策略, 默认为 ASYNC_FLUSH (异步刷盘),另外是 SYNC_FLUSH (同步刷盘)
  • listenPort : Broker 监听的端口号
  • mapedFileSizeCommitLog : 单个 conmmitlog 文件大小,默认是 1GB
  • mapedFileSizeConsumeQueue:ConsumeQueue 每个文件默认存30W条,可以根据项目调整
  • storePathRootDir : 存储消息以及一些配置信息的根目录 默认为用户的 ${HOME}/store
  • storePathCommitLog:commitlog 存储目录默认为 ${storePathRootDir}/commitlog
  • storePathIndex: 消息索引存储路径
  • syncFlushTimeout : 同步刷盘超时时间
  • diskMaxUsedSpaceRatio : 检测可用的磁盘空间大小,超过后会写入报错

二、Broker 消息投递状态有四种

  • FLUSH_DISK_TIMEOUT :没有在规定时间内完成刷盘 (刷盘策略需要为 SYNC_FLUSH 才会出这个错误)
  • FLUSH_SLAVE_TIMEOUT :主从模式下,Broker 是 SYNC_MASTER,没有在规定时间内完成主从同步
  • SLAVE_NOT_AVAILABLE : 主从模式下,Broker 是 SYNC_MASTER,没有找到被配置成 Slave 的 Broker
  • SEND_OK :发送成功,没有发生上面的三种问题

三、RocketMQ 消息生产和消费异常重试和阈值设定

生产者 Producer 重试异步和 SendOneWay下配置无效

  • 消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是2,如果网络情况比较差,或者跨集群则建改多几次

消费端重试

  • 原因:消息处理异常、broker 端到 consumer 端各种问题,如网络原因闪断,消费处理失败,ACK 返回失败等等问题。
  • 注意:
    • 重试间隔时间配置 ,默认每条消息最多重试 16 次
    • messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h   超过重试次数人工补偿
    • 消费端去重
    • 一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
    • 消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息

 RocketMQ 生产者核心配置和核心知识

PayController 类代码如下:

package net.xdclass.xdclassmq.controller;

import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;


    @RequestMapping("/api/v1/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

        Message message = new Message(JmsConfig.TOPIC,"taga" ,"6688" , ("hello xdclass rocketmq = "+text).getBytes() );

        SendResult sendResult = payProducer.getProducer().send(message);
        System.out.println(sendResult);

        return new HashMap<>();
    }

}
PayController 类

相关文章:

  • 2021-12-27
  • 2021-11-19
  • 2021-12-31
  • 2021-11-15
  • 2021-08-07
  • 2021-06-03
  • 2021-12-03
猜你喜欢
  • 2022-12-23
  • 2021-04-20
  • 2021-07-03
  • 2021-04-12
  • 2022-12-23
  • 2021-04-24
  • 2021-11-14
相关资源
相似解决方案