【问题标题】:Cannot send messages between several RabbitMQ exchanges无法在多个 RabbitMQ 交换之间发送消息
【发布时间】:2018-06-17 10:47:09
【问题描述】:

我想在多个 RabbitMQ 交换之间路由多条消息。这是我要使用的路由表:

// | exchange | type | routing key | queue |
// |-----------------------------------------------------------------|
// | processing | topic | processing.event.transaction | processing.transaction.queue |
// | database | topic | database.event.transaction | database.transaction.queue |
// | database | topic | database.event.api_attempts | database.api_attempts.queue |
// | database | topic | database.event.event_logs | database.event_logs.queue |

我有 3 个模块要配置为以这种方式发送消息:

REST API Module -> Gateway module
REST API Module -> Database Module

REST API 模块配置

String QUEUE_PROCESSING_TRANSACTION = "processing.transaction.queue";
String QUEUE_DATABASE_TRANSACTION = "database.transaction.queue";   
String QUEUE_DATABASE_API_ATTEMPT = "database.api_attempts.queue";
String QUEUE_DATABASE_EVENT_LOGS = "database.event_logs.queue";  
String EXCHANGE_PROCESSING = "processing";
String EXCHANGE_DATABASE = "database";  
String ROUTING_KEY_PROCESSING = "processing.event.transaction";
String ROUTING_KEY_DATABASE = "database.event.transaction"; 
String ROUTING_KEY_API_ATTEMPTS = "database.event.api_attempts";
String ROUTING_KEY_EVENT_LOGS = "database.event.event_logs";

channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_PROCESSING, BuiltinExchangeType.TOPIC);
channel.exchangeDeclare(EXCHANGE_DATABASE, BuiltinExchangeType.TOPIC);

channel.queueDeclare(QUEUE_PROCESSING_TRANSACTION, false, false, false, null);
channel.queueDeclare(QUEUE_DATABASE_TRANSACTION, false, false, false, null);            
channel.queueDeclare(QUEUE_DATABASE_API_ATTEMPT, false, false, false, null);
channel.queueDeclare(QUEUE_DATABASE_EVENT_LOGS, false, false, false, null);

channel.queueBind(QUEUE_PROCESSING_TRANSACTION, EXCHANGE_PROCESSING, ROUTING_KEY_PROCESSING);
channel.queueBind(QUEUE_DATABASE_TRANSACTION, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_API_ATTEMPT, EXCHANGE_DATABASE, ROUTING_KEY_API_ATTEMPTS);
channel.queueBind(QUEUE_DATABASE_EVENT_LOGS, EXCHANGE_DATABASE, ROUTING_KEY_EVENT_LOGS);

将 Java 对象发送到其他模块:

TransactionsBean obj = new TransactionsBean();
obj.setId(Long.valueOf(111222333));
channel.basicPublish(EXCHANGE_PROCESSING, ROUTING_KEY_PROCESSING, null, SerializationUtils.serialize(obj));
channel.basicPublish(EXCHANGE_DATABASE, ROUTING_KEY_DATABASE, null, SerializationUtils.serialize(obj));

ApiAttemptsBean obj = new ApiAttemptsBean();
obj.setId(Long.valueOf(2332));
channel.basicPublish(EXCHANGE_DATABASE, ROUTING_KEY_API_ATTEMPTS, null, SerializationUtils.serialize(obj));

EventLogsBean obj = new EventLogsBean();
obj.setId(Long.valueOf(111222));
channel.basicPublish(EXCHANGE_DATABASE, ROUTING_KEY_EVENT_LOGS, null, SerializationUtils.serialize(obj));

模块网关配置:

String QUEUE_PROCESSING_TRANSACTION = "processing.transaction.queue";
String QUEUE_DATABASE_TRANSACTION = "database.transaction.queue";   
String QUEUE_DATABASE_API_ATTEMPT = "database.api_attempts.queue";
String QUEUE_DATABASE_EVENT_LOGS = "database.event_logs.queue";  
String EXCHANGE_DATABASE = "database";  
String ROUTING_KEY_DATABASE = "database.event.transaction"; 
String ROUTING_KEY_API_ATTEMPTS = "database.event.api_attempts";
String ROUTING_KEY_EVENT_LOGS = "database.event.event_logs";

channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_PROCESSING, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_PROCESSING_TRANSACTION, false, false, false, null);
channel.queueBind(QUEUE_PROCESSING_TRANSACTION, EXCHANGE_PROCESSING, ROUTING_KEY_PROCESSING);

Map<String, Consumer<byte[]>> queueToConsumer = new HashMap<>();
        queueToConsumer.put(QUEUE_DATABASE_TRANSACTION, this::process_transaction);
        queueToConsumer.put(QUEUE_DATABASE_API_ATTEMPT, this::process_api_attempt);
        queueToConsumer.put(QUEUE_DATABASE_EVENT_LOGS, this::process_event_logs);

        queueToConsumer.forEach((queueName, consumer) -> {
            try {
                channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                            byte[] body) throws IOException {
                        consumer.accept(body);
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

private void process_transaction(byte[] object) {
        TransactionsBean obj = (TransactionsBean) SerializationUtils.deserialize(object);
        System.out.println("!!!! Received id " + obj.getId() + " in gateway");
    }

    private void process_api_attempt(byte[] object) {
        ApiAttemptsBean obj = (ApiAttemptsBean) SerializationUtils.deserialize(object);
        System.out.println("!!!! Received id " + obj.getId() + " in gateway");
    }

    private void process_event_logs(byte[] object) {
        EventLogsBean obj = (EventLogsBean) SerializationUtils.deserialize(object);
        System.out.println("!!!! Received id " + obj.getId() + " in gateway");
    }

模块数据库:

String QUEUE_PROCESSING_TRANSACTION = "processing.transaction.queue";
String QUEUE_DATABASE_TRANSACTION = "database.transaction.queue";
String QUEUE_DATABASE_API_ATTEMPT = "database.api_attempts.queue";
String QUEUE_DATABASE_EVENT_LOGS = "database.event_logs.queue";
String EXCHANGE_DATABASE = "database";  
String ROUTING_KEY_PROCESSING = "processing.event.transaction";
String ROUTING_KEY_DATABASE = "database.event.transaction"; 
String ROUTING_KEY_API_ATTEMPTS = "database.event.api_attempts";
String ROUTING_KEY_EVENT_LOGS = "database.event.event_logs";

channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_DATABASE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_DATABASE_TRANSACTION, false, false, false, null);            
channel.queueDeclare(QUEUE_DATABASE_API_ATTEMPT, false, false, false, null);
channel.queueDeclare(QUEUE_DATABASE_EVENT_LOGS, false, false, false, null);
channel.queueBind(QUEUE_DATABASE_TRANSACTION, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_API_ATTEMPT, EXCHANGE_DATABASE, ROUTING_KEY_API_ATTEMPTS);
channel.queueBind(QUEUE_DATABASE_EVENT_LOGS, EXCHANGE_DATABASE, ROUTING_KEY_EVENT_LOGS);

Map<String, Consumer<byte[]>> queueToConsumer = new HashMap<>();
queueToConsumer.put(QUEUE_DATABASE_TRANSACTION, this::process_transaction);
queueToConsumer.put(QUEUE_DATABASE_API_ATTEMPT, this::process_api_attempt);
queueToConsumer.put(QUEUE_DATABASE_EVENT_LOGS, this::process_event_logs);

queueToConsumer.forEach((queueName, consumer) -> {
            try {
                channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                            byte[] body) throws IOException {
                        consumer.accept(body);
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        });

private void process_transaction(byte[] object) {       
    TransactionsBean obj = (TransactionsBean) SerializationUtils.deserialize(object);
    System.out.println("!!!! Received id " + obj.getId() + " in database");
}

private void process_api_attempt(byte[] object) {
    ApiAttemptsBean obj = (ApiAttemptsBean) SerializationUtils.deserialize(object);
    System.out.println("!!!! Received id " + obj.getId() + " in database");
}

private void process_event_logs(byte[] object) {
    EventLogsBean obj = (EventLogsBean) SerializationUtils.deserialize(object);
    System.out.println("!!!! Received id " + obj.getId() + " in database");
}

但是消息传递不正确:

11:33:00,783 ERROR [com.rabbitmq.client.impl.ForgivingExceptionHandler] (pool-17-thread-6) Consumer org.database.context.ContextServer$1@6fee6ab4 (amq.ctag-arvcrYNc61cslclCTAnpDQ) method handleDelivery for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1): java.lang.ClassCastException: deployment.db.war//plugin.factories.TransactionsBean cannot be cast to deployment.database.war//org.plugin.factories.EventLogsBean

看起来消息没有正确路由可能是因为我的路由表不正确。

你能给我一些指导我如何解决这个问题吗?

编辑: 错误堆栈:

22:19:26,584 ERROR [com.rabbitmq.client.impl.ForgivingExceptionHandler] (pool-19-thread-6) Consumer org.database.context.ContextServer$1@49ee659f (amq.ctag-vjArBDtmtruIgeCMLipHGQ) method handleDelivery for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1): java.lang.ClassCastException: deployment.db.war//org.plugin.database.bean.TransactionsBean cannot be cast to deployment.db.war//org.plugin.database.bean.ApiAttemptsBean
    at deployment.db.war//org.database.context.ContextServer.process_api_attempt(ContextServer.java:79)
    at deployment.db.war//org.database.context.ContextServer$1.handleDelivery(ContextServer.java:64)
    at deployment.db.war//com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
    at deployment.db.war//com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)

22:19:26,586 INFO  [javax.enterprise.resource.webcontainer.jsf.config] (ServerService Thread Pool -- 111) Initializing Mojarra 2.2.13.SP5  for context '/rest_api'
22:19:26,619 INFO  [stdout] (pool-21-thread-6) !!!! Received id 2332 in gateway
22:19:26,667 ERROR [com.rabbitmq.client.impl.ForgivingExceptionHandler] (pool-19-thread-6) Consumer org.database.context.ContextServer$1@29ba98e4 (amq.ctag-RMVncG2xQn3KBJ561F9HNQ) method handleDelivery for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1): java.lang.ClassCastException: deployment.db.war//org.plugin.database.bean.TransactionsBean cannot be cast to deployment.db.war//org.plugin.database.bean.EventLogsBean
    at deployment.db.war//org.database.context.ContextServer.process_event_logs(ContextServer.java:84)
    at deployment.db.war//org.database.context.ContextServer$1.handleDelivery(ContextServer.java:64)
    at deployment.db.war//com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
    at deployment.db.war//com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)

22:19:26,669 INFO  [stdout] (pool-21-thread-6) !!!! Received id 111222 in gateway

【问题讨论】:

  • 在该消息之前你有没有得到任何输出(我看到你一路上有很多 printlns)。另外,您是否调试过它并知道它发生在哪一行?
  • 是的,我只得到网关模块输出。我想我的路由配置不正确。
  • 也许你没有在网关配置中添加处理事件事务:Map&lt;String, Consumer&lt;byte[]&gt;&gt; queueToConsumer = new HashMap&lt;&gt;(); queueToConsumer.put(QUEUE_DATABASE_TRANSACTION, this::process_transaction); queueToConsumer.put(QUEUE_DATABASE_API_ATTEMPT, this::process_api_attempt); queueToConsumer.put(QUEUE_DATABASE_EVENT_LOGS, this::process_event_logs); 并且它选择了第一个可用的?

标签: java rabbitmq


【解决方案1】:

Module Database 中,您使用相同的routing_key 设置所有队列绑定:ROUTING_KEY_DATABASE

channel.queueBind(QUEUE_DATABASE_TRANSACTION, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_API_ATTEMPT, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_EVENT_LOGS, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);

应该是:

channel.queueBind(QUEUE_DATABASE_TRANSACTION, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_API_ATTEMPT, EXCHANGE_DATABASE, ROUTING_KEY_API_ATTEMPTS);
channel.queueBind(QUEUE_DATABASE_EVENT_LOGS, EXCHANGE_DATABASE, ROUTING_KEY_EVENT_LOGS);

这只是一个复制粘贴错误(-:

编辑
代码现在看起来不错,所以需要检查几件事:

-1-
在运行新代码之前是否清除了队列(所以没有旧消息)

-2-
尝试在以下位置添加断点或至少 println:

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        consumer.accept(body);
                    }

尝试从envelope 获取一些关于消息路由密钥和原始交换的值。

编辑 2

我又检查了一遍,肉眼看来一切都很好。我唯一能给您的就是更好地调试您的代码以了解问题所在。

-1-
您是否尝试登录 rabbitMQ UI 来监控消息?

-2-
尝试添加更好的日志,例如在 println 中添加函数名称:

System.out.println("!!!! Received id " + obj.getId() + " in database");

更改为

System.out.println("process_transaction: Received id " + obj.getId() + " in database");

并在所有函数中执行此操作,以通过消息知道您的确切位置。

-3-
在信封中,您可以envelope.getExchange() 也可以envelope. getRoutingKey() 来查看消息的确切调用方式。

【讨论】:

  • 我更新了代码,但这解决了一些问题。请查看附加的堆栈跟踪。
  • 一般问题是无法将 TransactionsBean 强制转换为 ApiAttemptsBean 并且我找不到消息未正确路由的原因。
  • @PeterPenzov - 我添加了一些步骤,可以帮助您验证这种奇怪行为的根源。
  • 例如我在handleDelivery中添加了System.out.println("?????????????? Exchange " + envelope.getExchange() + " Exchange " + envelope.getExchange());。结果: ?????????????? Exchange 数据库 Exchange 数据库
  • @PeterPenzov - 当您使用代码时,这似乎是一个应该很容易解决的问题,但从远处很难做到这一点。我添加了更多调试技巧。我希望他们会帮助你。不能帮助你更多。祝你好运(:
猜你喜欢
  • 1970-01-01
  • 2016-08-13
  • 1970-01-01
  • 1970-01-01
  • 2016-08-23
  • 1970-01-01
  • 1970-01-01
  • 2019-10-10
  • 1970-01-01
相关资源
最近更新 更多