【问题标题】:How to have more than one handler in AWS Lambda Function?如何在 AWS Lambda 函数中拥有多个处理程序?
【发布时间】:2018-11-14 04:14:43
【问题描述】:

我有一个非常大的 python 文件,其中包含多个已定义的函数。如果您熟悉 AWS Lambda,那么在创建 lambda 函数时,您会指定一个处理程序,该处理程序是 AWS Lambda 在服务执行我的代码时可以调用的代码中的一个函数,它在下面的 my_handler.py 文件中表示:

    def handler_name(event, context):
    ...
    return some_value

链接来源:https://docs.aws.amazon.com/lambda/latest/dg/python-programming-model-handler-types.html

但是,正如我上面提到的,我在 my_handler.py 中有多个定义的函数,它们有自己的事件和上下文。因此,这将导致错误。 python3.6中有没有办法解决这个问题?

【问题讨论】:

  • 为处理程序方法指定不同的名称,并配置每个 Lambda 函数处理程序以调用相关方法。例如,“main.myhandler”会调用 main.py 中定义的 myhandler 方法。
  • 您仍然需要有不同的 lambda 函数/在需要时不断更改 lambda 配置(不推荐)。
  • 似乎一个公平的问题是为什么您将这些文件放在同一个文件中? Lambda 函数是一个函数。

标签: python-3.x amazon-web-services aws-lambda python-3.6


【解决方案1】:

您的单个​​处理函数需要负责解析传入事件,并确定要采用的适当路径。例如,假设您的其他函数称为helper1helper2。您的 Lambda 处理程序函数将检查传入事件,然后根据传入事件中的一个字段(即,我们称之为 EventType)调用 helper1helper2,同时传入事件和上下文对象。

def handler_name(event, context):
  if event['EventType'] == 'helper1':
    helper1(event, context)
  elif event['EventType'] == 'helper2':
    helper2(event, context)

def helper1(event, context):
  pass

def helper2(event, context):
  pass

这只是伪代码,我自己没有测试过,但它应该可以理解这个概念。

【讨论】:

    【解决方案2】:

    游戏有点晚了,但认为分享不会有什么坏处。最佳实践建议将处理程序与 Lambda 的核心逻辑分开。不仅可以添加额外的定义,它可以导致更清晰的代码并减少浪费——例如​​。对 S3 的多个 API 调用。因此,尽管它可能会失控,但我不同意对您最初问题的一些批评。将您的处理程序用作将完成您的各种工作的附加功能的逻辑接口是有效的。在数据架构和工程领域,以这种方式工作通常成本更低,效率更高。特别是如果您正在构建 ETL 管道,遵循面向服务的架构模式。诚然,我有点特立独行,有些人可能会觉得这不守规矩/令人震惊,但出于各种原因,我什至在我的 Lambda 中构建类——例如。集中的、类似于数据湖的 S3 存储桶,可容纳各种文件类型,减少不必要的请求等……我支持它。这是我不久前放在集线器上的 CDK 示例项目中的一个处理程序文件的示例。希望它能给你一些有用的想法,或者至少在想要增强你的 Lambdas 时不会感到孤单。

    import requests
    import json
    from requests.exceptions import Timeout
    from requests.exceptions import HTTPError
    from botocore.exceptions import ClientError
    from datetime import date
    import csv
    import os
    import boto3
    import logging
    
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    
    class Asteroids:
        """Client to NASA API and execution interface to branch data processing by file type.
        Notes:
            This class doesn't look like a normal class. It is a simple example of how one might
            workaround AWS Lambda's limitations of class use in handlers. It also allows for 
            better organization of code to simplify this example. If one planned to add
            other NASA endpoints or process larger amounts of Asteroid data for both .csv and .json formats,
            asteroids_json and asteroids_csv should be modularized and divided into separate lambdas
            where stepfunction orchestration is implemented for a more comprehensive workflow.
            However, for the sake of this demo I'm keeping it lean and easy.
        """
    
        def execute(self, format):
            """Serves as Interface to assign class attributes and execute class methods
            Raises:
                Exception: If file format is not of .json or .csv file types.
            Notes:
                Have fun!
            """
            self.file_format=format
            self.today=date.today().strftime('%Y-%m-%d')
            # method call below used when Secrets Manager integrated. See get_secret.__doc__ for more.
            # self.api_key=get_secret('nasa_api_key')
            self.api_key=os.environ["NASA_KEY"]
            self.endpoint=f"https://api.nasa.gov/neo/rest/v1/feed?start_date={self.today}&end_date={self.today}&api_key={self.api_key}"
            self.response_object=self.nasa_client(self.endpoint)
            self.processed_response=self.process_asteroids(self.response_object)
            if self.file_format == "json":
                self.asteroids_json(self.processed_response)
            elif self.file_format == "csv":
                self.asteroids_csv(self.processed_response)
            else:
                raise Exception("FILE FORMAT NOT RECOGNIZED")
            self.write_to_s3()
    
        def nasa_client(self, endpoint):
            """Client component for API call to NASA endpoint.
            Args:
                endpoint (str): Parameterized url for API call.
            Raises:
                Timeout: If connection not made in 5s and/or data not retrieved in 15s.
                HTTPError & Exception: Self-explanatory
            Notes:
                See Cloudwatch logs for debugging.
            """
            try:
                response = requests.get(endpoint, timeout=(5, 15))
            except Timeout as timeout:
                print(f"NASA GET request timed out: {timeout}")
            except HTTPError as http_err:
                print(f"HTTP error occurred: {http_err}")
            except Exception as err:
                print(f'Other error occurred: {err}')
            else:
                return json.loads(response.content)
    
        def process_asteroids(self, payload):
            """Process old, and create new, data object with content from response.
            Args:
                payload (b'str'): Binary string of asteroid data to be processed.
            """
            near_earth_objects = payload["near_earth_objects"][f"{self.today}"]
            asteroids = []
            for neo in near_earth_objects:
                asteroid_object = {
                    "id" : neo['id'],
                    "name" : neo['name'],
                    "hazard_potential" : neo['is_potentially_hazardous_asteroid'],
                    "est_diameter_min_ft": neo['estimated_diameter']['feet']['estimated_diameter_min'],
                    "est_diameter_max_ft": neo['estimated_diameter']['feet']['estimated_diameter_max'],
                    "miss_distance_miles": [item['miss_distance']['miles'] for item in neo['close_approach_data']],
                    "close_approach_exact_time": [item['close_approach_date_full'] for item in neo['close_approach_data']]
                }
                asteroids.append(asteroid_object)
    
            return asteroids
    
        def asteroids_json(self, payload):
            """Creates json object from payload content then writes to .json file.
            Args:
                payload (b'str'): Binary string of asteroid data to be processed.
            """
            json_file = open(f"/tmp/asteroids_{self.today}.json",'w')
            json_file.write(json.dumps(payload, indent=4))
            json_file.close()
    
        def asteroids_csv(self, payload):
            """Creates .csv object from payload content then writes to .csv file.
            """
            csv_file=open(f"/tmp/asteroids_{self.today}.csv",'w', newline='\n')
            fields=list(payload[0].keys())
            writer=csv.DictWriter(csv_file, fieldnames=fields)
            writer.writeheader()
            writer.writerows(payload)
            csv_file.close()
    
        def get_secret(self):
            """Gets secret from AWS Secrets Manager
            Notes:
                Have yet to integrate into the CDK. Leaving as example code.
            """
            secret_name = os.environ['TOKEN_SECRET_NAME']
            region_name = os.environ['REGION']
            session = boto3.session.Session()
            client = session.client(service_name='secretsmanager', region_name=region_name)
            try:
                get_secret_value_response = client.get_secret_value(SecretId=secret_name)
            except ClientError as e:
                raise e
            else:
                if 'SecretString' in get_secret_value_response:
                    secret = get_secret_value_response['SecretString']
                else:
                    secret = b64decode(get_secret_value_response['SecretBinary'])
            return secret
    
        def write_to_s3(self):
            """Uploads both .json and .csv files to s3
            """
            s3 = boto3.client('s3')
            s3.upload_file(f"/tmp/asteroids_{self.today}.{self.file_format}", os.environ['S3_BUCKET'], f"asteroid_data/asteroids_{self.today}.{self.file_format}")
    
    
    def handler(event, context):
        """Instantiates class and triggers execution method.
        Args:
            event (dict): Lists a custom dict that determines interface control flow--i.e. `csv` or `json`.
            context (obj): Provides methods and properties that contain invocation, function and
                execution environment information. 
                *Not used herein.
        """
        asteroids = Asteroids()
        asteroids.execute(event)
    

    【讨论】:

    • 我喜欢你的风格!
    • 摇滚吧,@bruceszalwinski \w/
    猜你喜欢
    • 2021-06-16
    • 2023-01-02
    • 1970-01-01
    • 1970-01-01
    • 2020-02-25
    • 2021-05-01
    • 1970-01-01
    • 2017-05-27
    • 2018-01-28
    相关资源
    最近更新 更多