【问题标题】:Can AWS SQS publish to SNS or is polling SQS required?AWS SQS 可以发布到 SNS 还是需要轮询 SQS?
【发布时间】:2018-07-25 17:00:38
【问题描述】:

我们目前正在使用 AWS 构建一个应用程序,并且需要将 msgs 推送到 SQS。我的问题是是否可以让 SQS 向 SNS 发布一条消息,该消息将触发 Lambda(订阅 SNS)?然后,lambda 需要向 SQS 返回它收到消息的确认,从而从 SQS 中删除该消息。

上述情况可能吗?或者是从 SQS 获取消息、通过 Lambda 轮询队列等的唯一方法?

提前感谢您提供的任何帮助。

对滥用术语表示歉意,但我对 AWS 比较陌生。

【问题讨论】:

    标签: amazon-web-services aws-lambda amazon-sqs


    【解决方案1】:

    对于将来偶然发现此问题的任何人,我必须在使用 SQS 作为 SNS -> Lambda 服务的死信后编写此脚本。这不是最漂亮的代码,但它确实有效。

    下面是脚本,但您也可以在这里找到它:https://gist.github.com/joshghent/ca4a1272031e2a52af57d5e8ec5d53c5

    sqs_to_sns.py

    # Usage
    # $ python sqs_to_sns.py my-queue-name
    
    import boto3
    import sys
    import queue
    import threading
    from datetime import datetime
    import json
    from uuid import uuid4
    
    work_queue = queue.Queue()
    
    sqs = boto3.resource('sqs')
    sns = boto3.client('sns')
    sqs_client = boto3.client('sqs')
    
    from_q_name = sys.argv[1]
    print(("From: " + from_q_name + " To: SNS"))
    
    from_q = sqs.get_queue_by_name(QueueName=from_q_name)
    to_q = sqs.get_queue_by_name(QueueName='backup-queue')
    
    skipped = 0
    processed = 0
    total = 0
    
    
    def process_queue():
        while True:
            messages = work_queue.get()
    
            global total
            total = len(messages)
    
            for message in messages:
                message_content = json.loads(message.body)
    
                message.delete()
    
                print("Backing up Message to dead letter queue - just in case. Id: " +
                      message_content['MessageId'])
                bodies = list()
                bodies.append({'Id': str(uuid4()), 'MessageBody': message.body})
    
                to_q.send_messages(Entries=bodies)
    
                response = sns.publish(
                    TopicArn=message_content['TopicArn'], Message=message_content['Message'])
                print(("Published Message to Topic " + str(message_content['MessageId']) +
                       ". To TopicArn: " + message_content['TopicArn'] + ". Received response " + json.dumps(response)))
    
                global processed
                processed = processed + 1
    
    
    for i in range(10):
        t = threading.Thread(target=process_queue)
        t.daemon = True
        t.start()
    
    while True:
        messages = list()
        for message in from_q.receive_messages(
                MaxNumberOfMessages=10,
                VisibilityTimeout=123,
                WaitTimeSeconds=20):
            messages.append(message)
        work_queue.put(messages)
    
    work_queue.join()
    print("Processed " + str(processed) + " of " + str(total) +
          ". Skipped " + str(skipped) + " messages. Exiting")
    

    【讨论】:

      【解决方案2】:

      SQS 不支持推送。但是,您可以设置一个 lambda 函数,定期从 SQS 轮询相当简单,如下所述:https://cloudonaut.io/integrate-sqs-and-lambda-serverless-architecture-for-asynchronous-workloads/

      【讨论】:

        【解决方案3】:

        SQS 无法将消息发布到 SNS。 SQS 只能存储消息。您必须使用 SQS Api 提取消息。

        希望对您有所帮助!

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2021-08-12
          • 1970-01-01
          • 2011-08-21
          • 2017-11-05
          • 2021-06-07
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多