【问题标题】:How to execute spark submit on amazon EMR from Lambda function?如何从 Lambda 函数在亚马逊 EMR 上执行 spark 提交?
【发布时间】:2018-01-29 10:47:45
【问题描述】:

我想根据 S3 上的文件上传事件在 AWS EMR 集群上执行 spark 提交作业。我正在使用 AWS Lambda 函数来捕获事件,但我不知道如何通过 Lambda 函数在 EMR 集群上提交 spark 提交作业。

我搜索的大多数答案都谈到在 EMR 集群中添加一个步骤。但我不知道我是否可以在添加的步骤中添加任何步骤来触发“spark submit --with args”。

【问题讨论】:

    标签: amazon-web-services apache-spark aws-lambda amazon-emr spark-submit


    【解决方案1】:

    你可以,我上周不得不做同样的事情!

    将 boto3 用于 Python(其他语言肯定会有类似的解决方案),您可以使用定义的步骤启动集群,或者将步骤附加到已经启动的集群。

    使用步骤定义集群

    def lambda_handler(event, context):
        conn = boto3.client("emr")        
        cluster_id = conn.run_job_flow(
            Name='ClusterName',
            ServiceRole='EMR_DefaultRole',
            JobFlowRole='EMR_EC2_DefaultRole',
            VisibleToAllUsers=True,
            LogUri='s3n://some-log-uri/elasticmapreduce/',
            ReleaseLabel='emr-5.8.0',
            Instances={
                'InstanceGroups': [
                    {
                        'Name': 'Master nodes',
                        'Market': 'ON_DEMAND',
                        'InstanceRole': 'MASTER',
                        'InstanceType': 'm3.xlarge',
                        'InstanceCount': 1,
                    },
                    {
                        'Name': 'Slave nodes',
                        'Market': 'ON_DEMAND',
                        'InstanceRole': 'CORE',
                        'InstanceType': 'm3.xlarge',
                        'InstanceCount': 2,
                    }
                ],
                'Ec2KeyName': 'key-name',
                'KeepJobFlowAliveWhenNoSteps': False,
                'TerminationProtected': False
            },
            Applications=[{
                'Name': 'Spark'
            }],
            Configurations=[{
                "Classification":"spark-env",
                "Properties":{},
                "Configurations":[{
                    "Classification":"export",
                    "Properties":{
                        "PYSPARK_PYTHON":"python35",
                        "PYSPARK_DRIVER_PYTHON":"python35"
                    }
                }]
            }],
            BootstrapActions=[{
                'Name': 'Install',
                'ScriptBootstrapAction': {
                    'Path': 's3://path/to/bootstrap.script'
                }
            }],
            Steps=[{
                'Name': 'StepName',
                'ActionOnFailure': 'TERMINATE_CLUSTER',
                'HadoopJarStep': {
                    'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar',
                    'Args': [
                        "/usr/bin/spark-submit", "--deploy-mode", "cluster",
                        's3://path/to/code.file', '-i', 'input_arg', 
                        '-o', 'output_arg'
                    ]
                }
            }],
        )
        return "Started cluster {}".format(cluster_id)
    

    将步骤附加到已运行的集群

    根据here

    def lambda_handler(event, context):
        conn = boto3.client("emr")
        # chooses the first cluster which is Running or Waiting
        # possibly can also choose by name or already have the cluster id
        clusters = conn.list_clusters()
        # choose the correct cluster
        clusters = [c["Id"] for c in clusters["Clusters"] 
                    if c["Status"]["State"] in ["RUNNING", "WAITING"]]
        if not clusters:
            sys.stderr.write("No valid clusters\n")
            sys.stderr.exit()
        # take the first relevant cluster
        cluster_id = clusters[0]
        # code location on your emr master node
        CODE_DIR = "/home/hadoop/code/"
    
        # spark configuration example
        step_args = ["/usr/bin/spark-submit", "--spark-conf", "your-configuration",
                     CODE_DIR + "your_file.py", '--your-parameters', 'parameters']
    
        step = {"Name": "what_you_do-" + time.strftime("%Y%m%d-%H:%M"),
                'ActionOnFailure': 'CONTINUE',
                'HadoopJarStep': {
                    'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar',
                    'Args': step_args
                }
            }
        action = conn.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step])
        return "Added step: %s"%(action)
    

    【讨论】:

    • 如果我需要为 mey 代码文件添加 s3 路径,这个 script-runner.jar 是什么?
    • s3n://elasticmapreduce 存储桶由 Amazon 提供。除了引用它,你不需要做任何事情。
    • 这个 spark-submit 动作是从 lambda 函数同步调用还是只是添加工作流而不实际调用它??
    【解决方案2】:

    如果您想使用 spark submit 命令执行 Spark jar,则使用 AWS Lambda 函数 python 代码:

    from botocore.vendored import requests
    
    import json
    
    def lambda_handler(event, context):
    
    headers = { "content-type": "application/json" }
    
      url = 'http://ip-address.ec2.internal:8998/batches'
    
      payload = {
    
        'file' : 's3://Bucket/Orchestration/RedshiftJDBC41.jar 
    s3://Bucket/Orchestration/mysql-connector-java-8.0.12.jar 
    
    s3://Bucket/Orchestration/SparkCode.jar',
    
        'className' : 'Main Class Name',
    
        'args' : [event.get('rootPath')]
    
      }
    
      res = requests.post(url, data = json.dumps(payload), headers = headers, verify = False)
    
      json_data = json.loads(res.text)
    
      return json_data.get('id')
    

    【讨论】:

    • 能不能把第一句的英文改一下,格式化一下代码?
    • 这使用 livy 提交 Spark 作业。虽然作业可以这样运行,但是很多集群没有配置 Livy,因此这种方法有它的局限性
    猜你喜欢
    • 2016-11-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-25
    • 2019-10-06
    • 1970-01-01
    相关资源
    最近更新 更多