双Master方式:

服务器环境

序号 IP 角色 模式
1 192.168.32.135 nameServer1,brokerServer1  Master1
2 192.168.32.136 nameServer2,brokerServer2  Master2

 

 

 

 

Helloworld代码示例:

/**
 * Producer,发送消息
 * 
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
        producer.setNamesrvAddr("192.168.32.135:9876;92.168.32.136:9876");
        producer.start();

        for (int i = 0; i < 100; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                    "TagA",// tag
                    ("Hello RocketMQ " + i).getBytes()// body
                        );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

/**
 * Consumer,订阅消息
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
        consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "*");
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
//                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                try {
                    for(MessageExt msg : msgs){
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(),"utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:"+" topic:"+topic+" msgBody:"+msgBody+" tags:"+tags);
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}
View Code

相关文章:

  • 2021-11-05
  • 2022-02-19
  • 2022-12-23
  • 2021-09-17
  • 2021-08-21
  • 2021-07-10
  • 2021-06-04
  • 2022-12-23
猜你喜欢
  • 2021-04-19
  • 2022-01-15
  • 2021-11-16
  • 2021-12-29
  • 2021-09-13
  • 2022-02-07
  • 2021-10-03
相关资源
相似解决方案