一、demo级别

//生产者
public class Producer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        //创建一个消息生产者,并设置一个消息生产者组
        DefaultMQProducer producer = new DefaultMQProducer("zs_producer_group");
        //指定NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //初始化Producer,在整个应用生命周期中只需要初始化一次
        producer.start();
        for (int i = 0; i < 10; i++) {
            //创建一个消息对象,指定其主题、标签和消息内容
            Message msg = new Message("topic_example_java","TagA",("Hello Java demo RocketMQ" + i+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送消息并返回结果
            //SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000078308DB164304A2C0000, offsetMsgId=AC10026300002A9F000000000000097E, messageQueue=MessageQueue [topic=topic_example_java, brokerName=DESKTOP-BIBQEM5, queueId=0], queueOffset=2]
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n",sendResult);
        }
        //一旦生产者实例不再被使用,则将其关闭,包括清理资源、关闭网络连接等。
        producer.shutdown();
    }

}

//消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        //创建一个消息消费者,并设置一个消息消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("zs_consumer_group");
        //指定NameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //设置Consumer第一次启动时是从队列头部还是队列尾部开始消费的
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //订阅指定Topic下的所有消息
        consumer.subscribe("topic_example_java","*");
        //注册消息监听器
        consumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context)->{
            //默认list里只有一条消息,可以通过设置参数来批量接收消息
            if(list != null){
                for (int i = 0; i < list.size(); i++) {
                    MessageExt ext = list.get(i);
                    try {
                        System.out.println(new String(ext.getBody(),"UTF-8"));
                    } catch (UnsupportedEncodingException e) {

                    }
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        //消费者对象在使用之前必须要调用start方法初始化
        consumer.start();
        System.out.println("消息消费者已启动");
    }
}
View Code

相关文章:

  • 2021-09-28
  • 2022-12-23
  • 2021-12-02
  • 2021-12-13
猜你喜欢
  • 2021-10-13
  • 2022-12-23
  • 2021-12-28
  • 2021-12-02
  • 2022-02-17
  • 2021-07-09
相关资源
相似解决方案