【问题标题】:Pooling newly created aws sqs messages合并新创建的 aws sqs 消息
【发布时间】:2018-07-17 08:39:56
【问题描述】:
@Async
public void scanFile() {
    log.info("Start scanning");
    String queueUrl = sqs.getQueueUrl("bucket-antivirus").getQueueUrl();
    List<Message> messages = sqs.receiveMessage(new ReceiveMessageRequest()
                .withQueueUrl(queueUrl)
                .withWaitTimeSeconds(20))
                    .getMessages();
        for (Message message : messages) {
            try {
                // move clear file to file bucket
            }
            ...
            log.info("Scanning complete");
        }
}

当用户上传文件时,SQS 会收到一条消息。应用程序会将文件移动到新存储桶。

但即使我添加了 SQS 客户端的等待时间,我也无法获得最新消息。通过多次测试,我上传一个文件后,我只得到之前文件的消息。如何让 SQS 客户端等待最新消息/特定文件的消息?

【问题讨论】:

  • 在相关注释上 - “当用户上传文件时,SQS 将收到一条消息。应用程序会将文件移动到新存储桶中。” -- 你为什么要为此通过 SQS? Cloudwatch Events 或 Lambda 可能更适合此特定流程

标签: java amazon-web-services asynchronous amazon-sqs


【解决方案1】:

标准 SQS 队列没有保证的交付顺序。因此,如果您要求一条消息(或多条消息),您可能会得到旧消息或新消息(但它们通常是有序的)。这应该没问题,因为基于队列的系统不应该依赖消息的顺序。 (但是,SQS FIFO 队列确实可以保证顺序。)

听上去,如果您收到的是基于“旧文件”的消息,那么您没有正确处理之前检索到的消息。当您的进程从 SQS 检索消息时,它应该完成必要的工作,然后调用 DeleteMessage()将其从队列中删除

您的用例实际上听起来是使用 AWS Lambda 函数的完美场景。将新对象添加到 S3 存储桶时,您可以将其配置为触发 Lambda 函数(而不是发送 SQS 消息)。然后 Lambda 函数可以处理该文件。这是一个比让您的代码不断轮询 SQS 队列更简单的解决方案。

【讨论】:

  • 消息的顺序不是问题。当队列中没有消息时,sqsclient.getMessages() 不会等待飞行中的消息,即使我设置了withWaitTime,它也会立即返回一个空列表。
  • waitasec - 所以即使withWaitTimeSeconds(20),它会返回立即(即:不到一秒)而没有消息?这听起来不对。根据文档和我多年使用 SQS 的经验,withWaitTimeSeconds 应该等到等待时间到期或有消息可用。听起来这里还有其他事情……
  • @Krease 是由@Async 引起的吗?因为我在添加Thread.sleep(5000) 之后可以在getMessages() 之前等待5 秒后收到消息。
  • 可能 - 你也异步发布消息吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-02-02
  • 2018-02-11
  • 1970-01-01
  • 1970-01-01
  • 2020-02-14
相关资源
最近更新 更多