路由

  1. 绑定
  2. direct exchange
  3. 多重绑定
  4. 发出日志
  5. 整合

 

 

之前的教程中,我们构建了一个简单的日志系统 我们能够将日志消息广播给许多接收者。

在本教程中,我们将添加一个功能 - 我们将可能只订阅一部分消息。例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

绑定

在前面的例子中,我们已经创建了绑定。您可能需要回想一下代码:

 

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定是交换器和队列之间的关系。这可以简单地解读为:队列对来自这个交换器的消息感兴趣。

绑定可以使用一个额外的routingKey 参数。为了避免与basic_publish的其它参数混淆,我们将把它称为 binding key.。这就是我们如何用键创建绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键的含义取决于交换类型。我们之前使用的fanout交换器,会忽略了它。

Direct exchange

我们上一个教程的日志系统向所有的消费者广播所有的消息。我们希望扩展这一功能,允许基于其严重性来过滤消息。例如,我们可能想要一个程序,它将日志消息写入磁盘,只接收关键错误,而不是在警告或信息日志消息上浪费磁盘空间。

由于上一个教程我们使用的是fanout exchange,这并没有给我们带来多大的灵活性——它只能够盲目地进行广播。

 我们将使用direct exchange direct exchange 背后的路由算法很简单——消息传递到binding key与消息的routing key完全匹配的队列。

为了说明这一点,请考虑以下设置

 第四篇:路由

 

 在这个设置中,我们可以看到绑定的两个队列的direct exchange 。The first queue is bound with binding key orange, and the second has two bindings, one with binding key blackand the other one with green.

 在这样的设置中,使用路由键orange发送到交换器的消息将被路由到队列Q1。使用路由键black和路由键green发送到交换器的消息将会进入Q2。所有其他消息将被丢弃。

多重绑定

 第四篇:路由

使用相同的binding key绑定多个队列是完全合法的。在我们的例子中,我们可以使用绑定键black添加XQ1之间的绑定在这种情况下,direct exchange就像fanout exchange一样,并将消息广播到所有匹配的队列。

发出日志

 我们将消息发送到direct exchange我们将提供日志严重性作为路由键这样接收程序将能够选择想要接收的严重程度。我们先关注发出日志。

 一如既往,我们需要先建立一个交换机

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

 接着,我们准备发出一个信息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());      #severity 严重性

 为了简化问题,我们假设“severity”可以是“info”、“warn”、“error”。

 

整合

第四篇:路由

 

EmitLogDirect.java

package com.rabbitmq.tutorials.route;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.0.103");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        int messageCount = 1;
        String routingKey = "";
        while(messageCount<=10) {
            String message = "Message "+ messageCount;
            if(0 == messageCount%4) {
                routingKey = "error";
                message = "error " + message;
            } else if(0 == messageCount%3) {
                routingKey = "info";
                message = "info " + message;
            } else if(0 == messageCount%2) {
                routingKey = "warn";
                message = "warn " + message;
            } else {
                routingKey = "";
                message = "丢弃 " + message;
            }
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.println("[x] Sent'" + message + "'");
            messageCount +=1;
        }
        channel.close();
        connection.close();
    }
    //...
}

 

ReceiveLogsDirect.java

package com.rabbitmq.tutorials.route;

import com.rabbitmq.client.*;
import com.sun.deploy.util.ArrayUtil;
import com.sun.deploy.util.StringUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;

public class ReceiveLogsDirect {
  private static final String EXCHANGE_NAME = "direct_logs";
  private static final String[] ROUTE_KEY = {"error"};
  //private static final String[] ROUTE_KEY = {"info","warn","error"};

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.0.103");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");//声明交换器
    String queueName = channel.queueDeclare().getQueue(); //随机生成queueName。queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

    for (String routeKey: ROUTE_KEY) {
      channel.queueBind(queueName, EXCHANGE_NAME, routeKey); //将交换器和队列绑定
    }

    System.out.println(" [*] Waiting for [" + StringUtils.join(Arrays.asList(ROUTE_KEY)," ") + "] messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

 

执行步骤:

  1. ROUTE_KEY={"error"}启动ReceiveLogsDirect.java

    第四篇:路由

  2. ROUTE_KEY={"info","warn","error"} 启动ReceiveLogsDirect.java

    第四篇:路由

  3. 启动EmitLogDirect.java

    第四篇:路由

    第四篇:路由

    第四篇:路由

  4. 查看交换机队列绑定关系
    [root@bogon ~]# rabbitmqctl list_bindings
    Listing bindings ...
        exchange    amq.gen-_bRNpcff_l5KIEJWC-jIiw    queue    amq.gen-_bRNpcff_l5KIEJWC-jIiw    []
        exchange    amq.gen-rHOVkTH0rqq8oOyk0Ca8UA    queue    amq.gen-rHOVkTH0rqq8oOyk0Ca8UA    []
        exchange    task_queue    queue    task_queue    []
    direct_logs    exchange    amq.gen-_bRNpcff_l5KIEJWC-jIiw    queue    error    []
    direct_logs    exchange    amq.gen-rHOVkTH0rqq8oOyk0Ca8UA    queue    error    []
    direct_logs    exchange    amq.gen-rHOVkTH0rqq8oOyk0Ca8UA    queue    info    []
    direct_logs    exchange    amq.gen-rHOVkTH0rqq8oOyk0Ca8UA    queue    warn    []
    ...done.

     

 

相关文章: