1.nameserver是无状态的,控制着broker,producer,consumer集群的同步
2.broker是处理消息中转,负责储存和过滤消息
3.rocketmq有两种消息消费模型,广播模式,一个消费者组中的每一个消费者都消费;集群模式,一个消费者组中的每一个消费者平均消费消息
4.顺序消费,生产者向一个topic的一个队列,顺序发送消息,切生产者只能单线程发送,消费者在消费时使用顺序消费模式
rocketmq是什么
1.一种队列模型的消息中间件,高性能,高可靠,高实时,分布式
2.producer,consumer,broker都可以分布式
3.默认情况下,prducer会向一个topic下的一些队列轮流发送消息
api
生产者
//顺序发送
DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP);
producer.setNamesrvAddr(ROCKET_SERVER);
Message message = new Message("test", "tags", "1", info.getBytes());
producer.start();
for (int i = 0; i < 10; i++) {
SendResult result = producer.send(message,new SelectMessageQueueByHash(),i);
}
producer.shutdown();
//随机发送
DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP);
producer.setNamesrvAddr(ROCKET_SERVER);
Message message = new Message("test", "tags", "1", info.getBytes());
producer.start();
for (int i = 0; i < 10; i++) {
SendResult result = producer.send(message);
}
producer.shutdown();
消费者
//普通消费
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setNamesrvAddr(ROCKET_SERVER);
consumer.subscribe("test", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isNotEmpty(list)) {
Iterator it = list.iterator();
while (it.hasNext()) {
MessageExt msg = (MessageExt) it.next();
System.out.println(msg.toString());
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
//顺序消费
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setNamesrvAddr(ROCKET_SERVER);
consumer.subscribe("test", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
if (CollectionUtils.isNotEmpty(list)) {
Iterator it = list.iterator();
while (it.hasNext()) {
MessageExt msg = (MessageExt) it.next();
System.out.println(msg.toString());
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
启动广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);