1. 消费端集群消费(负载均衡)

 示例代码:

/**
 * Producer,发送消息
 * 
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("message_producer");
        producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
        producer.start();

        for (int i = 0; i < 100; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                    "Tag1",// tag
                    ("Hello RocketMQ " + i).getBytes()// body
                        );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

/**
 * Consumer,订阅消息
 */
public class Consumer1 {

    public Consumer1() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                    
                    System.out.println("======暂停=====");
                    Thread.sleep(60000);
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        Consumer1 consumer1 = new Consumer1();
        System.out.println("Consumer1 Started.");
    }
}

/**
 * Consumer,订阅消息
 */
public class Consumer2 {

    public Consumer2() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        Consumer2 consumer2 = new Consumer2();
        System.out.println("Consumer2 Started.");
    }
}
View Code

相关文章:

  • 2021-12-31
  • 2021-07-10
  • 2021-11-25
  • 2021-05-10
  • 2021-05-28
  • 2022-12-23
  • 2021-04-04
猜你喜欢
  • 2021-05-25
  • 2022-12-23
  • 2021-06-11
  • 2021-08-22
  • 2021-11-26
  • 2021-07-13
  • 2021-06-05
相关资源
相似解决方案