【问题标题】:Error - Rabbit Template publish Confirm - reply-code=403, reply-text=ACCESS_REFUSED - cannot publish to internal exchange错误 - 兔子模板发布确认 - 回复代码 = 403,回复文本 = ACCESS_REFUSED - 无法发布到内部交换
【发布时间】:2019-08-20 06:44:13
【问题描述】:

我的用例是:

  1. 订阅 Q1 并按指定大小批量读取消息。
  2. 传递读取的消息集合进行处理。
  3. 将收集到的消息发布到 Q2 并在成功确认 q2 发布后向 Q1 确认消息。

代码

@Component
public class EPPQ2Subscriber {
    private static final Logger LOGGER = LoggerFactory.getLogger(EPPQ2Subscriber.class);

    @Autowired
    RabbitMqConfig rabbitMqConfig;

    @Autowired
    AppConfig appConfig;

    List<Message> messageList = new ArrayList<Message>();
    List<Long> diliveryTag = new ArrayList<Long>();
    /**
     * Method is listener's receive message method , invoked when there is message ready to read  
     * @param message - Domain object of message encapsulated 
     * @param channel - rabitmq client channel 
     * @param messageId - @TODO Delete it later.
     * @param messageProperties - amqp message properties contains message properties such as delivery tag etc..
     */
    @RabbitListener(id="messageListener",queues = "#{rabbitMqConfig.getSubscriberQueueName()}",containerFactory="queueListenerContainer")
    public void receiveMessage(Message message, Channel channel, @Header("id") String messageId, 
            MessageProperties messageProperties) {

        LOGGER.info("Result:" + message.getClass() + ":" + message.toString());
        if(messageList.size() <= appConfig.getSubscriberChunkSize() ) {
            messageList.add(message);
            diliveryTag.add(messageProperties.getDeliveryTag());
        } else {
            // call the service here to decrypt, read pan, call danger to scrub, encrypt pan and re-pack them in message again.
            //after this branch messageList should have scrubbed and encrypted message objects ready to publish.

            // Here is call for publish and ack messages..

        }

    }
}

@Component
@Configuration
public class TopicConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(TopicConfiguration.class);

    @Autowired
    RabbitMqConfig rabbitMqConfig;

    @Autowired EPPQ2Publisher eppQ2Publisher;

    /**
     * Caching connection factory 
     * @return CachingConnectionFactory
     */
    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =  new CachingConnectionFactory(rabbitMqConfig.getPublisherHosts(), rabbitMqConfig.getPublisherPort());
        connectionFactory.setUsername(rabbitMqConfig.getPublisherUsername());
        connectionFactory.setPassword(rabbitMqConfig.getPublisherPassword());

        return connectionFactory;
    }

    /**
     * Bean RabbitTemplate
     * @return RabbitTemplate
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());

rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
            RetryTemplate retryTemplate = new RetryTemplate();
            ExponentialBackOffPolicy backOffPolicy = new 
            ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(500);
            backOffPolicy.setMultiplier(10.0);
            backOffPolicy.setMaxInterval(10000);
            retryTemplate.setBackOffPolicy(backOffPolicy);
            rabbitTemplate.setRetryTemplate(retryTemplate);
    /*      rabbitTemplate.setExchange(rabbitMqConfig.getPublisherTopic());
            rabbitTemplate.setRoutingKey(rabbitMqConfig.getRoutingKey());*/
            rabbitTemplate.setConfirmCallback((correlation, ack, reason) ->    
            if(correlation != null ) {
             LOGGER.info("Received " + (ack ? " ack " : " nack ") +            
             "for correlation: " + correlation);
                    if(ack) {
                        // this is confirmation received..
                        // here is code to ack Q1. correlation.getId and ack
                        eppQ2Publisher.ackMessage(new 
                       Long(correlation.getId().toString()));
                    } else {
                // no confirmation received and no need to do any                        
                 thing for retry..
                    }
                }

            });
            rabbitTemplate.setReturnCallback((message, replyCode, 
            replyText, exchange, routingKey) -> 
           {
            LOGGER.error("Returned: " + message + "\nreplyCode: " +                                               

    replyCode+ "\nreplyText: " + replyText + 
"\nexchange/rk: " + exchange + "/" + routingKey);
                });
                return rabbitTemplate;
            }

        /**
         * Bean Jackson2JsonMessageConverter
         * @return Jackson2JsonMessageConverter
         */


    @Bean
            public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
                return new Jackson2JsonMessageConverter();
            }

        }

public interface EPPQ2Publisher {
    public void sendMessage(Message msg,Long deliveryTag);
    public void sendMessages(List<Message> msgList, Channel channel, List<Long> deliveryTagList);
    public void ackMessage(Long deliveryTag);
}


@Component
public class EPPQ2PublisherImpl implements EPPQ2Publisher{
    @Autowired
    RabbitMqConfig rabbitMqConfig;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private Channel channel;

    /**
     * Method sendMessage for sending individual scrubbed and encrypted message to publisher queue (Q2).
     * @param msg - message domain object 
     * @param deliveryTag - is message delivery tag.        
     */
    @Override
    public void sendMessage(Message msg,Long deliveryTag) {
        rabbitTemplate.convertAndSend(rabbitMqConfig.getPublisherTopic(), rabbitMqConfig.getRoutingKey(), msg,new CorrelationData(deliveryTag.toString()));
    }

    /**
     * sendMessages for sending list of scrubbed and encrypted messages to publisher queue (Q2)
     * @param msgList - is list of scrubbed and encrypted messages
     * @param channel - is ampq client channel 
     * @param deliveryTagList - is list of incoming message delivery tags. 
     */
    @Override
    public void sendMessages(List<Message> msgList, Channel channel, List<Long>deliveryTagList) {
        if(this.channel == null) {
            this.channel = channel;
        }
        for (int i = 0 ; i < msgList.size(); i ++) {
            sendMessage(msgList.get(i),deliveryTagList.get(i));
        }
    }

    /**
     * Method ackMessage for sending acknowledgement to subscriber Q1
     * @param  deliveryTag - is deliveryTag for each individual message.
     */
    @Override
    public void ackMessage(Long deliveryTag)  { 
        try {
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

org.springframework.amqp.rabbit.connection.CachingConnectionFactory 从 AMQChannel 创建缓存的 Rabbit Channel(amqp://dftp_subscriber@10.15.190.18:5672/hydra.services,2)

我希望成为 dftp_publisher,但我猜我的主题配置没有正确注入。

错误日志:

org.springframework.amqp.rabbit.core.RabbitTemplate[0;39m:在 RabbitMQ 通道上执行回调 RabbitTemplate$$Lambda$285/33478758:缓存的兔子通道:AMQChannel(amqp://dftp_subscriber@10.15.190.18:5672/ hydra.services,2), conn: Proxy@1dc339f Shared Rabbit Connection: SimpleConnection@2bd7c8 [delegate=amqp://dftp_subscriber@10.15.190.18:5672/hydra.services, localPort= 55553] org.springframework.amqp.rabbit.core.RabbitTemplate[0;39m:发布消息(正文:'{"HEADER":{"RETRY_COUNT":0,"PUBLISH_EVENT_TYPE":"AUTH"},"PAYLOAD":{"MTI ":"100","MTI_REQUEST":"100","PAN":"6011000000000000","PROCCODE":"00","PROCCODE_REQUEST":"00","FROM_ACCOUNT":"00","TO_ACCOUNT": "00","TRANSACTION_AMOUNT":"000000000100","TRANSMISSION_MMDDHHMMSS":"0518202930","STAN":"000001","LOCALTIME_HHMMSS":"010054","LOCALDATE_YYMMDD":"180522","EXPIRATION_DATE_YYMM":"2302 ","MERCHANT_TYPE":"5311","ACQUIRING_COUNTRY_CODE":"840","POS_ENTRY_MODE":"02","POS_PIN_ENTRY_CAPABILITIES":"0","FUNCTION_CODE":"100","ACQUIRING_ID_CODE":"000000", "FORWARDING_ID_CODE":"000000","RETRIEVAL_REFERENCE_NUMBER":"1410N644D597","MERCHANT_NUMBER":"601100000000596","CARD_ACCEPTOR_NAME":"Discover Acq Simulator","CARD_ACCEPTOR_CITY":"Riverwoods","CARD_ACCEPTOR_STATE":"IL," "CARD_ACCEPTOR_COUNTRY":"840","CARD_ACCEPTOR_COUNTRY_3NUMERIC":"840","NRID":"123456789012345","TRANSACTION_CURRENCY_CODE":"840","POS_TERMINAL_ATTENDANCE_INDICA TOR":"0","POS_PARTIAL_APPROVAL_INDICATOR":"0","POS_TERMINAL_LOCATION_INDICATOR":"0","POS_TRANSACTION_STATUS_INDICATOR":"0","POS_ECOMMERCE_TRAN_INDICATOR":"0","POS_TYPE_OF_TERMINAL_DEVICE":"0","POS_CARD_PRESENCE_INDICATOR" :"0","POS_CARD_CAPTURE_CAPABILITIES_INDICATOR":"0","POS_TRANSACTION_SECURITY_INDICATOR":"0","POS_CARD_DATA_TERMINAL_INPUT_CAPABILITY_INDICATOR":"C","POS_CARDHOLDER_PRESENCE_INDICATOR":"0","DFS_POS_DATA":"0000000000C00","GEODATA_STRE 2500 LAKE COOK ROAD ","GEODATA_POSTAL_CODE":"600150000","GEODATA_COUNTY_CODE":"840","GEODATA_STORE_NUMBER":"10001","GEODATA_MALL_NAME":"DISCOVER FINANCIAL SR","ISS_REFERENCE_ID":"72967956","ISS_PROCESSOR_REFERENCE_ID ":"123459875","VERSION_INDICATOR":"03141"}}' MessageProperties [headers={TypeId=com.discover.dftp.scrubber.domain.Message}, contentType=application/json, contentEncoding =UTF-8,contentLength=1642,deliveryMode=PERSISTENT,priority=0,deliveryTag=0]) 在交换 [hydra.hash2Syphon.exc],routingKey = [100] org.springframework.amqp.rabbit.connection.CachingConnectionFactory$DefaultChannelCloseLogger[0;39m:通道关闭:通道错误;协议方法:#method(reply-code=403,reply-text=ACCESS_REFUSED - 无法发布到 vhost 'hydra.services' 中的内部交换 'hydra.hash2Syphon.exc',class-id=60,method-id=40)


编辑 2.

@Component
@Configuration
public class ListenerContainerFactory {

    static final Logger logger = LoggerFactory.getLogger(ListenerContainerFactory.class);

    @Autowired
    RabbitMqConfig rabbitMqConfig;

    @Autowired
    EPPQ2Subscriber receiver;

    @Autowired
    EPPQ2ChanelAwareSubscriber receiverChanel;


     public ListenerContainerFactory(ConfigurableApplicationContext ctx) {
        printContainerStartMsg();
    }
    private void printContainerStartMsg() {
        logger.info("----------- Scrubber Container Starts   --------------");
    }

    @Bean
    public SimpleRabbitListenerContainerFactory queueListenerContainer(AbstractConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) { 
        connectionFactory.setAddresses(rabbitMqConfig.getSubscriberHosts());
        connectionFactory.setVirtualHost("hydra.services");
        connectionFactory.setPort(rabbitMqConfig.getSubscriberPort());
        connectionFactory.setUsername(rabbitMqConfig.getSubscriberUsername());
        connectionFactory.setPassword(rabbitMqConfig.getSubscriberPassword());
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(errorHandler());
        return factory;
    }

     @Bean
     MessageListenerAdapter listenerAdapter(EPPQ2Subscriber receiver) {
         return new MessageListenerAdapter(receiver, "receiveMessage");
     }

     /*@Bean
     MessageListenerAdapter listenerAdapterWithChanel(EPPQ2ChanelAwareSubscriber receiverChanel) {
         return new MessageListenerAdapter(receiverChanel);
     }*/

     @Bean
        public ErrorHandler errorHandler() {
            return new ConditionalRejectingErrorHandler(fatalExceptionStrategy());
        }

     @Bean
     public  ScrubberFatalExceptionStrategy fatalExceptionStrategy() {
         return new ScrubberFatalExceptionStrategy();
     }
}

和最新的主题配置。

@Component
@Configuration
public class TopicConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(TopicConfiguration.class);

    @Autowired
    RabbitMqConfig rabbitMqConfig;

    @Autowired EPPQ2Publisher eppQ2Publisher;

    /**
     * Bean Queue
     * @return Queue
     */
    @Bean
    Queue queue() {
        return new Queue(rabbitMqConfig.getPublisherQueueName(), false);
    }

    /**
     * Bean TopicExchage
     * @return TopicExchage
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(rabbitMqConfig.getPublisherTopic());
    }
    /**
     * Bean BindingBuilder
     * @param queue - Queue 
     * @param exchange - TopicExchange
     * @return BindingBuilder
     */
    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(rabbitMqConfig.getRoutingKey());
    }


     /**
     * Caching connection factory 
     * @return CachingConnectionFactory
     */
    @Bean
    public CachingConnectionFactory cachingConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMqConfig.getPublisherHosts(),
                rabbitMqConfig.getPublisherPort());
        connectionFactory.setUsername(rabbitMqConfig.getPublisherUsername());
        connectionFactory.setPassword(rabbitMqConfig.getPublisherPassword());
        return connectionFactory;
    }



     /**
         * Bean RabbitTemplate
         * @return RabbitTemplate
         */
        @Bean
        public RabbitTemplate rabbitTemplate() {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
            rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
            RetryTemplate retryTemplate = new RetryTemplate();
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(500);
            backOffPolicy.setMultiplier(10.0);
            backOffPolicy.setMaxInterval(10000);
            retryTemplate.setBackOffPolicy(backOffPolicy);
            rabbitTemplate.setRetryTemplate(retryTemplate);

    /*        rabbitTemplate.setExchange(rabbitMqConfig.getPublisherTopic());
            rabbitTemplate.setRoutingKey(rabbitMqConfig.getRoutingKey());*/
            rabbitTemplate.setConfirmCallback((correlation, ack, reason) -> {
                if(correlation != null ) {
                    LOGGER.info("Received " + (ack ? " ack " : " nack ") + "for correlation: " + correlation);
                    if(ack) {
                        // this is confirmation received..
                        // here is code to ack Q1. correlation.getId() and ack
                        eppQ2Publisher.ackMessage(new 
             Long(correlation.getId().toString()));
                    } else {
                        // no confirmation received and no need to do any      
                    }
                }

            });
            rabbitTemplate.setReturnCallback(
            (message, replyCode, replyText,        
            exchange, routingKey) -> 
            {
                LOGGER.error("Returned: " + message + "\nreplyCode: " + 
              replyCode
                        + "\nreplyText: " + replyText + "\nexchange/rk: " + 
              exchange + "/" + routingKey);
            });
            return rabbitTemplate;
        }

        /**
         * Bean Jackson2JsonMessageConverter
         * @return Jackson2JsonMessageConverter
         */
        @Bean
        public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
            return new Jackson2JsonMessageConverter();
        }

    }

【问题讨论】:

    标签: spring-boot spring-amqp


    【解决方案1】:

    不清楚你在问什么。如果您的意思是订阅用户没有写入该交换的权限,那么您的接线是错误的。

    您没有显示订阅者配置。

    订阅者连接工厂bean是否也可以称为connectionFactory?在这种情况下,一个或另一个将获胜。

    他们需要不同的bean命名。

    【讨论】:

    • Garry 基本上我这里有 2 个问题
    • Garry 基本上我有 2 个问题。我已经添加了我的订阅者配置。 1. 一些我的 CacheConnnection 工厂没有被注入到兔子模板。因为它仍然在错误日志中显示我的订户用户名。 2. 订阅者和发布者 ID 都拥有 Q 和 Exchange 的完全权限。但我得到连接通道关闭:通道错误;协议方法:#method(reply-code=403,reply-text=ACCESS_REFUSED - 用户 'dftp_subscriber' 拒绝访问 vhost 'hydra.services' 中的交换 'sample.exc',class-id=40 , method-id=10)
    • 正如我所说,如果您有冲突的 bean 名称,则可能会发生第一个,但您没有显示订阅者配置,所以我无法判断。我不知道兔子如何/为什么会认为您的交换是“内部的”。您应该在 rabbitmq-users Google 组中询问; RabbitMQ 工程师更密切地监控;他们只是时不时地看这里。
    • 请参阅编辑 2 ListenerContainerFactory 是我的订阅者配置
    • 我通过创建新的交换和绑定修复了第二个问题。但是我的第一个问题仍然没有解决。那就是注入问题连接工厂和主题配置中的兔子模板没有被使用而不是默认的兔子模板被注入。因为,消息已成功发布到主题,但我的兔子模板的回调方法没有被调用。
    【解决方案2】:

    如果在启动消费者/生产者时我们没有交换名称,请检查您的用户的权限,它将回退到默认值

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-11-12
      • 1970-01-01
      • 1970-01-01
      • 2013-12-06
      • 1970-01-01
      • 1970-01-01
      • 2018-08-14
      • 1970-01-01
      相关资源
      最近更新 更多