1. 消费端集群消费(负载均衡)
示例代码:
/** * Producer,发送消息 * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("message_producer"); producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876"); producer.start(); for (int i = 0; i < 100; i++) { try { Message msg = new Message("TopicTest",// topic "Tag1",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } } /** * Consumer,订阅消息 */ public class Consumer1 { public Consumer1() { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer"); consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3"); consumer.registerMessageListener(new Listener()); consumer.start(); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } class Listener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags); System.out.println("======暂停====="); Thread.sleep(60000); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public static void main(String[] args) throws InterruptedException, MQClientException { Consumer1 consumer1 = new Consumer1(); System.out.println("Consumer1 Started."); } } /** * Consumer,订阅消息 */ public class Consumer2 { public Consumer2() { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer"); consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876"); consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3"); consumer.registerMessageListener(new Listener()); consumer.start(); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } class Listener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public static void main(String[] args) throws InterruptedException, MQClientException { Consumer2 consumer2 = new Consumer2(); System.out.println("Consumer2 Started."); } }