【问题标题】:Consume SQS tasks from App Engine从 App Engine 使用 SQS 任务
【发布时间】:2018-05-19 19:19:56
【问题描述】:

我正在尝试与在 Amazon SQS 队列上发布消息的第三方集成。我需要我的 GAE 后端来接收这些消息。

基本上,我希望启动以下脚本并始终运行

import boto3
sqs_client = boto3.client('sqs', 
    aws_access_key_id=KEY, 
    aws_secret_access_key=SECRET, 
    region_name=REGION)
while True:
    sqs_client.receive_message(QueueUrl=QUEUE_URL, WaitTimeSeconds=60)
    for message in msgs_response.get('Messages', []):
        deferred.defer(process_and_delete_message, message)

我的主要 appengine Web 应用程序处于自动缩放模式(任务超时时间为 60 秒和 10 分钟),但我正在考虑将微服务设置为手动缩放或基本缩放,因为:

请求可以无限期地运行。手动扩展的实例可以选择处理 /_ah/start 并执行程序或脚本数小时而不返回 HTTP 响应代码。任务队列任务最长可以运行 24 小时。

https://cloud.google.com/appengine/docs/standard/python/an-overview-of-app-engine

显然手动和基本缩放也允许“后台线程”,但我很难找到它的文档,我认为这可能是他们弃用后端支持模块之前的遗留物(虽然我确实找到了这个https://cloud.google.com/appengine/docs/standard/python/refdocs/modules/google/appengine/api/background_thread/background_thread#BackgroundThread)。

手动或基本缩放适合这个吗?如果是这样,我应该用什么来收听sqs_client.receive_message()?我担心的一件事是这个任务/后台线程死亡而不是重新启动。

【问题讨论】:

    标签: google-app-engine amazon-sqs


    【解决方案1】:

    这也许是一个可能的解决方案:

    尝试使用 Google Compute Engine 微实例连续运行该脚本并向您的应用引擎应用发送 REST 调用。 Easy Python Example For Compute Engine

    或者:

    我使用了运行实例类型 B2/B1 的模块来执行长时间运行的作业;我从来没有遇到过任何麻烦;但这些工作确实开始和停止。我使用基本缩放:将 max_instances 设置为 1。我运行的作业大约需要 6 个小时才能完成。

    【讨论】:

      【解决方案2】:

      我最终为此创建了一个手动扩展的应用引擎标准微服务。这个微服务有 /_ah/start 的处理程序永远不会返回并无限期地运行(一次很多天),当它停止时,应用引擎会立即重新启动它。

      请求可以无限期地运行。手动扩展的实例可以选择 处理 /_ah/start 并执行程序或脚本数小时 不返回 HTTP 响应代码。任务队列任务可以跑起来 到 24 小时。

      https://cloud.google.com/appengine/docs/standard/python/an-overview-of-app-engine

      我的/_ah/start 处理程序侦听 SQS 队列,并创建我的默认服务设置为侦听的推送队列任务。

      我正在研究 Compute Engine 路由以及 App Engine Flex 路由(本质上是由应用引擎管理的 Compute Engine),但还有其他复杂性,例如无法访问 ndbtaskqueue sdk我没有时间深入研究。

      以下是此微服务的所有文件,不包括我的 lib 文件夹,其中包含我需要的 boto3 和其他一些库的源代码。

      我希望这对某人有所帮助。

      gaesqs.yaml:

      application: my-project-id
      module: gaesqs
      version: dev
      runtime: python27
      api_version: 1
      threadsafe: true
      
      manual_scaling:
        instances: 1
      
      env_variables:
        theme: 'default'
        GAE_USE_SOCKETS_HTTPLIB : 'true'
      builtins:
      - appstats: on #/_ah/stats/
      - remote_api: on #/_ah/remote_api/
      - deferred: on
      
      handlers:
      - url: /.*
        script: gaesqs_main.app
      
      
      libraries:
      - name: jinja2
        version: "2.6"
      - name: webapp2
        version: "2.5.2"
      - name: markupsafe
        version: "0.15"
      - name: ssl
        version: "2.7.11"
      - name: pycrypto
        version: "2.6"
      - name: lxml
        version: latest
      

      gaesqs_main.py:

      #!/usr/bin/env python
      import json
      
      import logging
      
      import appengine_config
      
      try:
          # This is needed to make local development work with SSL.
          # See http://stackoverflow.com/a/24066819/500584
          # and https://code.google.com/p/googleappengine/issues/detail?id=9246 for more information.
          from google.appengine.tools.devappserver2.python import sandbox
          sandbox._WHITE_LIST_C_MODULES += ['_ssl', '_socket']
      
          import sys
          # this is socket.py copied from a standard python install
          from lib import stdlib_socket
          socket = sys.modules['socket'] = stdlib_socket
      except ImportError:
          pass
      
      
      import boto3
      import os
      
      import webapp2
      from webapp2_extras.routes import RedirectRoute
      from google.appengine.api import taskqueue
      
      app = webapp2.WSGIApplication(debug=os.environ['SERVER_SOFTWARE'].startswith('Dev'))#, config=webapp2_config)
      
      
      KEY = "<MY-KEY>"
      SECRET = "<MY-SECRET>"
      REGION = "<MY-REGION>"
      QUEUE_URL = "<MY-QUEUE_URL>"
      
      
      def process_message(message_body):
          queue = taskqueue.Queue('default')
          task = taskqueue.Task(
              url='/task/sqs-process/',
              countdown=0,
              target='default',
              params={'message': message_body})
          queue.add(task)
      
      
      class Start(webapp2.RequestHandler):
      
          def get(self):
              logging.info("Start")
              for loggers_to_suppress in ['boto3', 'botocore', 'nose', 's3transfer']:
                  logger = logging.getLogger(loggers_to_suppress)
                  if logger:
                      logger.setLevel(logging.WARNING)
              logging.info("boto3 loggers suppressed")
              sqs_client = boto3.client('sqs',
                                        aws_access_key_id=KEY,
                                        aws_secret_access_key=SECRET,
                                        region_name=REGION)
              while True:
                  msgs_response = sqs_client.receive_message(QueueUrl=QUEUE_URL, WaitTimeSeconds=20)
                  logging.info("msgs_response: %s" % msgs_response)
                  for message in msgs_response.get('Messages', []):
                      logging.info("message: %s" % message)
                      process_message(message['Body'])
                      sqs_client.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=message['ReceiptHandle'])
      
      
      _routes = [
          RedirectRoute('/_ah/start', Start, name='start'),
      ]
      
      for r in _routes:
          app.router.add(r)
      

      appengine_config.py:

      import os
      
      from google.appengine.ext import vendor
      from google.appengine.ext.appstats import recording
      
      appstats_CALC_RPC_COSTS = True
      
      # Add any libraries installed in the "lib" folder.
      #     Use pip with the -t lib flag to install libraries in this directory:
      #     $ pip install -t lib gcloud
      #     https://cloud.google.com/appengine/docs/python/tools/libraries27
      try:
          vendor.add('lib')
      except:
          print "Unable to add 'lib'"
      
      
      def webapp_add_wsgi_middleware(app):
          app = recording.appstats_wsgi_middleware(app)
          return app
      
      if os.environ.get('SERVER_SOFTWARE', '').startswith('Development'):
          print "gaesqs development"
          import imp
          import os.path
          import inspect
          from google.appengine.tools.devappserver2.python import sandbox
      
          sandbox._WHITE_LIST_C_MODULES += ['_ssl', '_socket']
          # Use the system socket.
      
          real_os_src_path = os.path.realpath(inspect.getsourcefile(os))
          psocket = os.path.join(os.path.dirname(real_os_src_path), 'socket.py')
          imp.load_source('socket', psocket)
          os.environ['HTTP_HOST'] = "my-project-id.appspot.com"
      else:
          print "gaesqs prod"
          # Doing this on dev_appserver/localhost seems to cause outbound https requests to fail
          from lib import requests
          from lib.requests_toolbelt.adapters import appengine as requests_toolbelt_appengine
      
          # Use the App Engine Requests adapter. This makes sure that Requests uses
          # URLFetch.
          requests_toolbelt_appengine.monkeypatch()
      

      【讨论】:

      • 你是如何设法让 boto3 在应用引擎上工作的?当我使用 boto3 时,我看到一个与 Popen 相关的错误。谢谢。
      • 我不记得有这个问题。也许将您的堆栈跟踪发布在一个单独的问题中?但我很确定 popen 在应用引擎标准中是不允许的,所以也许你使用的功能与我不同。
      猜你喜欢
      • 1970-01-01
      • 2011-01-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-03-25
      相关资源
      最近更新 更多