本文参考至官网:https://www.rabbitmq.com/getstarted.html
生产者:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAM = "Hello";
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机名(IP地址) 这里没有这是端口号,是因为默认采用的默认端口号5672
factory.setHost("localhost");
try {
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAM, false, false, false, null);
String message = "Hello World";
//发布消息
channel.basicPublish("", QUEUE_NAM, null, message.getBytes());
System.out.println("[x] Sent '" + message + "'" );
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者:
import com.rabbitmq.client.*;
import java.io.IOException;
public class Receive {
private final static String QUEUE_NAME = "Hello";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 创建默认消费者
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 将获取的消息打印至控制台
System.out.println(new String(body));
}
};
//监听消息
channel.basicConsume(QUEUE_NAME, consumer);
System.out.println("[*] Waiting for messages, To exit press Ctrl + c");
} catch (Exception e) {
e.printStackTrace();
}
}
}
先执行生产者, 在界面看一看到新建的队列 "Hello" 其中已经又一条消息准备好了!
执行消费者,可以看到队列 "Hello" 中的消息已被消费!
消费者控制台输出:
Hello World 程序Over!!!