rocketmq
1.nameserver是无状态的,控制着broker,producer,consumer集群的同步

2.broker是处理消息中转,负责储存和过滤消息

3.rocketmq有两种消息消费模型,广播模式,一个消费者组中的每一个消费者都消费;集群模式,一个消费者组中的每一个消费者平均消费消息

4.顺序消费,生产者向一个topic的一个队列,顺序发送消息,切生产者只能单线程发送,消费者在消费时使用顺序消费模式


rocketmq是什么

1.一种队列模型的消息中间件,高性能,高可靠,高实时,分布式

2.producer,consumer,broker都可以分布式

3.默认情况下,prducer会向一个topic下的一些队列轮流发送消息


api

生产者

//顺序发送
DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP);
        producer.setNamesrvAddr(ROCKET_SERVER);
        Message message = new Message("test", "tags", "1", info.getBytes());
        producer.start();
        for (int i = 0; i < 10; i++) {
            SendResult result = producer.send(message,new    SelectMessageQueueByHash(),i);
        }
        producer.shutdown();

 

//随机发送
DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP);
        producer.setNamesrvAddr(ROCKET_SERVER);
        Message message = new Message("test", "tags", "1", info.getBytes());
        producer.start();
        for (int i = 0; i < 10; i++) {
            SendResult result = producer.send(message);
        }
        producer.shutdown();



消费者

//普通消费
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setNamesrvAddr(ROCKET_SERVER);
        consumer.subscribe("test", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                if (CollectionUtils.isNotEmpty(list)) {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        MessageExt msg = (MessageExt) it.next();
                        System.out.println(msg.toString());
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

 

//顺序消费
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setNamesrvAddr(ROCKET_SERVER);
        consumer.subscribe("test", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                if (CollectionUtils.isNotEmpty(list)) {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        MessageExt msg = (MessageExt) it.next();
                        System.out.println(msg.toString());
                    }
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();


启动广播模式

consumer.setMessageModel(MessageModel.BROADCASTING);

 

相关文章: