【问题标题】:JmsListener SQS unbale to consume messagesJmsListener SQS 解包消费消息
【发布时间】:2020-10-02 05:44:21
【问题描述】:

我有一个配置,其中一次有 10 条消息并行传入 SQS 队列。 要使用它,我正在使用 JmsListener。

让我告诉你我的配置:

  public SQSConnectionFactory sqsConnectionFactory() {
    // Create a new connection factory with all defaults (credentials and region) set automatically
    return new SQSConnectionFactory(new ProviderConfiguration(),
        AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
            .withCredentials(DefaultAWSCredentialsProviderChain.getInstance()).build());
  }

  @Bean("jmsListenerContainerFactory")
  public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(sqsConnectionFactory());
    factory.setDestinationResolver(new DynamicDestinationResolver());
    factory.setConcurrency("3-10");
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
  }

要使用这个:

  @JmsListener(destination = "queue.fifo",
      containerFactory = "jmsListenerContainerFactory")
  public void receiveCustomerStakeholderKyc(@Payload final Message<?> message) throws Exception {

}

当我使用它时。有些消息甚至没有出现在代码中。 JMS 不消费这些消息,这些消息被转移到 dead_queue。

Queues:
1. queue.fifo

Name:   queue.fifo  
Default Visibility Timeout: 30 seconds
Message Retention Period:   4 days
Maximum Message Size:   256 KB
Created:    2019-09-16 12:50:43 GMT+05:30   
Receive Message Wait Time:  0 seconds
Last Updated:   2020-06-12 16:35:29 GMT+05:30   
Messages Available (Visible):   0
Delivery Delay: 0 seconds   
Messages in Flight (Not Visible):   0
Queue Type: FIFO    
Messages Delayed:   0
Content-Based Deduplication:    Enabled     

2. queue_dead.fifo
Default Visibility Timeout: 30 seconds  
Message Retention Period:   4 days  
Maximum Message Size:   256 KB
Created:    2019-09-16 12:51:08 GMT+05:30   
Receive Message Wait Time:  0 seconds
Last Updated:   2020-06-12 16:47:17 GMT+05:30   
Messages Available (Visible):   5
Delivery Delay: 0 seconds   
Messages in Flight (Not Visible):   0
Queue Type: FIFO    Messages Delayed:   0
Content-Based Deduplication:    Disabled

有什么我想念的

  1. 当我查看控制台时,它显示此时已收到消息,但在我的日志中未收到消息。

有没有办法启用 SQS 日志?

【问题讨论】:

  • 因为它要去 DLQ 这意味着一些处理失败,启用日志并查看。

标签: spring amazon-web-services jms message-queue amazon-sqs


【解决方案1】:

我必须说我犯了一个愚蠢的错误,但这可能发生在任何人身上。

错误原因

我有 AWSDev 和 QA 帐户。为了省钱,我们合并了两个账户,但我们将 SQS 放在不同的账户中。

访问SQS队列的方式

我正在尝试像这样创建 SQS 连接工厂:

 public SQSConnectionFactory sqsConnectionFactory() {
    // Create a new connection factory with all defaults (credentials and region) set automatically
    return new SQSConnectionFactory(new ProviderConfiguration(),
        AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
            .withCredentials(DefaultAWSCredentialsProviderChain.getInstance()).build());
  }

这样,JMS 尝试根据 AWS 访问和密钥解析 SQS 队列 URL。现在我们已经合并了 dev 和 QA 帐户,甚至我的应用程序的 QA 实例也在创建一个 dev SQS URL。

解决方案

我通过使用 AWS 账户 ID 而不是 AWS 访问和密钥动态解析 SQS 连接来解决这个问题。

这里是解决的代码:

传递 ownerAccountId

  public static class CustomDynamicDestinationResolver implements DestinationResolver {

        private String ownerAccountId;

        public CustomDynamicDestinationResolver(String ownerAccountId) {
            this.ownerAccountId = ownerAccountId;
        }

        @Override
        public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
            Assert.notNull(session, "Session must not be null");
            Assert.notNull(destinationName, "Destination name must not be null");
            if (pubSubDomain) {
                return resolveTopic(session, destinationName);
            } else {
                return resolveQueue(session, destinationName);
            }
        }

        protected Topic resolveTopic(Session session, String topicName) throws JMSException {
            return session.createTopic(topicName);
        }

        protected Queue resolveQueue(Session session, String queueName) throws JMSException {
            Queue queue;
            //LOGGER.info("Getting destination for libraryOwnerAccountId: {}, queueName: {}", ownerAccountId, queueName);
            if (ownerAccountId != null && session instanceof SQSSession) {
                queue = ((SQSSession) session).createQueue(queueName, ownerAccountId);
            } else {
                queue = session.createQueue(queueName);
            }
            return queue;
        }
  }

【讨论】:

    猜你喜欢
    • 2019-06-10
    • 1970-01-01
    • 2021-10-04
    • 2019-05-25
    • 2020-08-30
    • 2023-02-17
    • 1970-01-01
    • 2019-07-05
    • 1970-01-01
    相关资源
    最近更新 更多