【问题标题】:Read messages from different AWS account using @SqsListener使用 @SqsListener 从不同的 AWS 账户读取消息
【发布时间】:2021-11-10 12:11:10
【问题描述】:

我有一个由第三方供应商提供的 SQS 标准队列,该供应商已授予我们的 IAM 用户从那里读取消息的访问权限。因此队列的 AWS 账户 ID 与我的用户不同。

我正在尝试使用 spring 的 @SqsListener 注释来使用这些消息,但是我无法指定应该从中使用的 accountId。

我的客户端 bean 配置如下所示:

@Bean
fun amazonSQSAsyncClient(): AmazonSQSAsync = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(AWSStaticCredentialsProvider(BasicAWSCredentials(awsProperties.accessKey, awsProperties.secretKey)))                
.withEndpointConfiguration(AwsClientBuilder.EndpointConfiguration(awsProperties.url, awsProperties.region))                
.build() 

我看不到在凭据中指定帐户 ID 的方法,而且我也找不到任何可用于定义帐户 ID 的属性。

我尝试将上面显示的awsProperties.url 设置为https://sqs.us-east-1.amazonaws.com/<accountId> 之类的东西,但这似乎不起作用。它仍在尝试在我自己的帐户 ID 中查找队列并抛出未找到队列的错误。

任何想法如何解决这个问题并强制 Spring AWS bean 从特定的 AwsAccount 消费?

【问题讨论】:

    标签: spring-boot amazon-sqs spring-cloud-aws


    【解决方案1】:

    您有一个用户可以访问另一个帐户中的队列。这意味着您可以在您的帐户中与该用户一起运行代码,并且可以访问另一个帐户上的队列。

    初始化 sqsclient 将始终使用它正在运行的帐户 您不必对此进行调整。

    @Bean
    fun amazonSQSAsyncClient(): AmazonSQSAsync = AmazonSQSAsyncClientBuilder.standard()
    .withCredentials(AWSStaticCredentialsProvider(BasicAWSCredentials(awsProperties.accessKey, awsProperties.secretKey)))                        
    .build() 
    

    您需要确保代码可以访问队列。

    在代码中,您应该像这样设置队列 URL: https://sqs.<region>.amazonaws.com/<account>/<queuename>

    ,我很快尝试从另一个帐户访问队列。如果队列的权限设置正确,您有两种可能。第一个是使用队列 URL 而不是名称(我检查过,它有效)。第二个是创建您自己的 DestinationResolver 并将其提供给 SimpleMessageListenerContainer。我用 Spring Boot 创建了一个小应用程序,它运行良好。我把下面的代码贴给你了。

    在下一个功能版本中,我会想出一个更好的方法来支持这个用例。

    package demo;
    
    import com.amazonaws.services.sqs.AmazonSQS;
    import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
    import com.amazonaws.services.sqs.model.GetQueueUrlResult;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.aws.core.env.ResourceIdResolver;
    import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
    import org.springframework.cloud.aws.messaging.support.destination.DynamicQueueUrlDestinationResolver;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.core.DestinationResolutionException;
    import org.springframework.messaging.core.DestinationResolver;
    import org.springframework.messaging.handler.annotation.MessageMapping;
    import org.springframework.util.Assert;
    
    @SpringBootApplication
    public class DemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(DemoApplication.class, args);
        }
    
        @Bean
        public MessageListener messageListener() {
            return new MessageListener();
        }
    
        @Bean
        public SimpleMessageListenerContainerFactory simpleMessageListenerFactory(AmazonSQS amazonSqs, ResourceIdResolver resourceIdResolver) {
            SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
            factory.setDestinationResolver(new DynamicAccountAwareQueueUrlDestinationResolver(amazonSqs, resourceIdResolver));
    
            return factory;
        }
    
        public static class DynamicAccountAwareQueueUrlDestinationResolver implements DestinationResolver<String> {
    
            public static final String ACCOUNT_QUEUE_SEPARATOR = ":";
            private final AmazonSQS amazonSqs;
            private final DynamicQueueUrlDestinationResolver dynamicQueueUrlDestinationResolverDelegate;
    
            public DynamicAccountAwareQueueUrlDestinationResolver(AmazonSQS amazonSqs, ResourceIdResolver resourceIdResolver) {
                Assert.notNull(amazonSqs, "amazonSqs must not be null");
    
                this.amazonSqs = amazonSqs;
                this.dynamicQueueUrlDestinationResolverDelegate = new DynamicQueueUrlDestinationResolver(amazonSqs, resourceIdResolver);
            }
    
            @Override
            public String resolveDestination(String queue) throws DestinationResolutionException {
                if (queue.contains(ACCOUNT_QUEUE_SEPARATOR)) {
                    String account = queue.substring(0, queue.indexOf(ACCOUNT_QUEUE_SEPARATOR));
                    String queueName = queue.substring(queue.indexOf(ACCOUNT_QUEUE_SEPARATOR) + 1);
                    GetQueueUrlResult queueUrlResult = this.amazonSqs.getQueueUrl(new GetQueueUrlRequest()
                            .withQueueName(queueName)
                            .withQueueOwnerAWSAccountId(account));
                    return queueUrlResult.getQueueUrl();
                } else {
                    return this.dynamicQueueUrlDestinationResolverDelegate.resolveDestination(queue);
                }
            }
        }
    
        public static class MessageListener {
    
            private static Logger LOG = LoggerFactory.getLogger(MessageListener.class);
    
            @MessageMapping("633332177961:queue-name")
            public void listen(String message) {
                LOG.info("Received message: {}", message);
            }
    
        }
    
    }
    

    【讨论】:

      猜你喜欢
      • 2023-02-13
      • 2021-08-17
      • 2022-01-19
      • 1970-01-01
      • 2021-02-15
      • 2022-01-25
      • 2019-02-27
      • 2019-10-17
      • 2020-05-22
      相关资源
      最近更新 更多