一、demo级别
//生产者 public class Producer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { //创建一个消息生产者,并设置一个消息生产者组 DefaultMQProducer producer = new DefaultMQProducer("zs_producer_group"); //指定NameServer地址 producer.setNamesrvAddr("127.0.0.1:9876"); //初始化Producer,在整个应用生命周期中只需要初始化一次 producer.start(); for (int i = 0; i < 10; i++) { //创建一个消息对象,指定其主题、标签和消息内容 Message msg = new Message("topic_example_java","TagA",("Hello Java demo RocketMQ" + i+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); //发送消息并返回结果 //SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000078308DB164304A2C0000, offsetMsgId=AC10026300002A9F000000000000097E, messageQueue=MessageQueue [topic=topic_example_java, brokerName=DESKTOP-BIBQEM5, queueId=0], queueOffset=2] SendResult sendResult = producer.send(msg); System.out.printf("%s%n",sendResult); } //一旦生产者实例不再被使用,则将其关闭,包括清理资源、关闭网络连接等。 producer.shutdown(); } } //消费者 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.io.UnsupportedEncodingException; import java.util.List; public class Consumer { public static void main(String[] args) throws MQClientException { //创建一个消息消费者,并设置一个消息消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("zs_consumer_group"); //指定NameServer地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //设置Consumer第一次启动时是从队列头部还是队列尾部开始消费的 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //订阅指定Topic下的所有消息 consumer.subscribe("topic_example_java","*"); //注册消息监听器 consumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context)->{ //默认list里只有一条消息,可以通过设置参数来批量接收消息 if(list != null){ for (int i = 0; i < list.size(); i++) { MessageExt ext = list.get(i); try { System.out.println(new String(ext.getBody(),"UTF-8")); } catch (UnsupportedEncodingException e) { } } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); //消费者对象在使用之前必须要调用start方法初始化 consumer.start(); System.out.println("消息消费者已启动"); } }