Work queues

Work queues

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mq</groupId>
    <artifactId>mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.10</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
    </dependencies>
</project>

连结工厂

public class ConnectionUtils {

    //定义链接工厂
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);//amqp协议 端口 类似与mysql的3306
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/test");
        factory.setUsername("user");
        factory.setPassword("123456");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
        }

生产者

private static String  QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //获取链接
        Connection connection= ConnectionUtils.getConnection();
        Channel channel= connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for(int i=0;i<50;i++) {
            String msg="hello  张国强"+i;
            channel.basicPublish("", QUEUE_NAME, false, null, msg.getBytes());
            System.out.println("send "+msg+"over");
            Thread.sleep(10000);
        }
        channel.close();
        connection.close();
    }

消费者1:

private static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection=ConnectionUtils.getConnection();
        Channel channel=connection.createChannel();
        // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定义一个消费者
        final Consumer consumer=new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
            String msg=new String(body,"UTF-8");
            System.out.println(" [1] Received '" + msg + "'");
            try {
                doWork(msg);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }finally {
                System.out.println(" [x] Done");
            }
            }
        };
        boolean autoAck = true; //消息的确认模式自动应答
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    private static void doWork(String task) throws InterruptedException {
        Thread.sleep(2000);
        }

消费者2:

private static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection=ConnectionUtils.getConnection();
        Channel channel=connection.createChannel();
        // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定义一个消费者
        final Consumer consumer=new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
            String msg=new String(body,"UTF-8");
            System.out.println(" [1] Received '" + msg + "'");
            try {
                doWork(msg);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }finally {
                System.out.println(" [x] Done");
            }
            }
        };
        boolean autoAck = true; //消息的确认模式自动应答
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    private static void doWork(String task) throws InterruptedException {
        Thread.sleep(1000);
        }

       

相关文章:

  • 2022-12-23
  • 2021-07-03
  • 2022-12-23
  • 2021-12-01
  • 2021-12-22
  • 2021-08-16
  • 2021-10-02
  • 2022-01-20
猜你喜欢
  • 2021-06-19
  • 2021-06-17
  • 2022-12-23
  • 2021-08-08
  • 2021-12-26
  • 2022-01-11
  • 2022-12-23
相关资源
相似解决方案