【问题标题】:boto EMR add step and auto terminateboto EMR 添加步骤并自动终止
【发布时间】:2020-08-26 03:23:30
【问题描述】:

Python 2.7.12

boto3==1.3.1

如何将步骤添加到正在运行的 EMR 集群在步骤完成后终止集群,无论它失败还是成功?

创建集群

response = client.run_job_flow(
    Name=name,
    LogUri='s3://mybucket/emr/',
    ReleaseLabel='emr-5.9.0',
    Instances={
        'MasterInstanceType': instance_type,
        'SlaveInstanceType': instance_type,
        'InstanceCount': instance_count,
        'KeepJobFlowAliveWhenNoSteps': True,
        'Ec2KeyName': 'KeyPair',
        'EmrManagedSlaveSecurityGroup': 'sg-1234',
        'EmrManagedMasterSecurityGroup': 'sg-1234',
        'Ec2SubnetId': 'subnet-1q234',
    },
    Applications=[
        {'Name': 'Spark'},
        {'Name': 'Hadoop'}
    ],
    BootstrapActions=[
        {
            'Name': 'Install Python packages',
            'ScriptBootstrapAction': {
                'Path': 's3://mybucket/code/spark/bootstrap_spark_cluster.sh'
            }
        }
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole',
    Configurations=[
        {
            'Classification': 'spark',
            'Properties': {
                'maximizeResourceAllocation': 'true'
            }
        },
    ],
)

添加步骤

response = client.add_job_flow_steps(
    JobFlowId=cluster_id,
    Steps=[
        {
            'Name': 'Run Step',
            'ActionOnFailure': 'TERMINATE_CLUSTER',
            'HadoopJarStep': {
                'Args': [
                    'spark-submit',
                    '--deploy-mode', 'cluster',
                    '--py-files',
                    's3://mybucket/code/spark/spark_udfs.py',
                    's3://mybucket/code/spark/{}'.format(spark_script),
                    '--some-arg'
                ],
                'Jar': 'command-runner.jar'
            }
        }
    ]
)

这成功添加了一个步骤并运行,但是,当该步骤成功完成时,我希望集群自动终止,如 AWS CLI 中所述:http://docs.aws.amazon.com/cli/latest/reference/emr/create-cluster.html

【问题讨论】:

    标签: python amazon-web-services boto3 emr


    【解决方案1】:

    在您的情况下(使用 boto3 创建集群),您可以添加这些标志 'TerminationProtected': False, 'AutoTerminate': True, 到您的集群创建。这样在你的步骤完成后运行集群就会被关闭。

    另一种解决方案是在您要运行的步骤之后立即添加另一个步骤以终止集群。所以基本上你需要将此命令作为步骤运行

    aws emr terminate-clusters --cluster-ids your_cluster_id

    棘手的部分是检索 cluster_id。 在这里您可以找到一些解决方案:Does an EMR master node know it's cluster id?

    【讨论】:

    • 也许我必须使用建议 #2,因为 boto3 似乎没有 AutoTerminate 选项。 Unknown parameter in Instances: "AutoTerminate", must be one of: MasterInstanceType, SlaveInstanceType, InstanceCount, InstanceGroups, Ec2KeyName, Placement, KeepJobFlowAliveWhenNoSteps, TerminationProtected, HadoopVersion, Ec2SubnetId, EmrManagedMasterSecurityGroup, EmrManagedSlaveSecurityGroup, ServiceAccessSecurityGroup, AdditionalMasterSecurityGroups, AdditionalSlaveSecurityGroups
    • 我在使用 cloudformation 时遇到了同样的问题。所以我使用了选项2,效果很好。实际上,我正在使用的步骤是调用一个 lambda 函数,该函数会小心删除 cloudformation 堆栈(包含 EMR 集群)。
    • 谢谢,我会选择选项 #2。
    • 使用boto3==1.4.8,当您调用run_job_flow 时,看起来AutoTerminate 默认设置为TrueKeepJobFlowAliveWhenNoSteps 是更改此行为的参数 - boto3.readthedocs.io/en/latest/reference/services/…
    • @Nobu 感谢这位非常有帮助的人。我将 KeepJobFlowAliveWhenNoSteps 设置为 True 并没有意识到这一点。这才是真正的解决方案……在我看来,这个答案有一个错误的解决方案,而第二个则更像是一个 hack。
    【解决方案2】:

    建议的'AutoTerminate': True 参数对我不起作用。但是,当我将参数 'KeepJobFlowAliveWhenNoSteps'True 设置为 False 时,它起作用了。您的代码应如下所示:

    response = client.run_job_flow(
        Name=name,
        LogUri='s3://mybucket/emr/',
        ReleaseLabel='emr-5.9.0',
        Instances={
            'MasterInstanceType': instance_type,
            'SlaveInstanceType': instance_type,
            'InstanceCount': instance_count,
            'KeepJobFlowAliveWhenNoSteps': False,
            'Ec2KeyName': 'KeyPair',
            'EmrManagedSlaveSecurityGroup': 'sg-1234',
            'EmrManagedMasterSecurityGroup': 'sg-1234',
            'Ec2SubnetId': 'subnet-1q234',
        },
        Applications=[
            {'Name': 'Spark'},
            {'Name': 'Hadoop'}
        ],
        BootstrapActions=[
            {
                'Name': 'Install Python packages',
                'ScriptBootstrapAction': {
                    'Path': 's3://mybucket/code/spark/bootstrap_spark_cluster.sh'
                }
            }
        ],
        VisibleToAllUsers=True,
        JobFlowRole='EMR_EC2_DefaultRole',
        ServiceRole='EMR_DefaultRole',
        Configurations=[
            {
                'Classification': 'spark',
                'Properties': {
                    'maximizeResourceAllocation': 'true'
                }
            },
        ],
    )
    

    【讨论】:

      【解决方案3】:

      您可以通过在 Instances 参数中指定 'KeepJobFlowAliveWhenNoSteps': False 来创建一个在所有步骤运行后自动终止的短期集群。我已向 GitHub 添加了一个完整的示例,说明了如何执行此操作。

      以下是演示中的一些代码:

      def run_job_flow(
              name, log_uri, keep_alive, applications, job_flow_role, service_role,
              security_groups, steps, emr_client):
          try:
              response = emr_client.run_job_flow(
                  Name=name,
                  LogUri=log_uri,
                  ReleaseLabel='emr-5.30.1',
                  Instances={
                      'MasterInstanceType': 'm5.xlarge',
                      'SlaveInstanceType': 'm5.xlarge',
                      'InstanceCount': 3,
                      'KeepJobFlowAliveWhenNoSteps': keep_alive,
                      'EmrManagedMasterSecurityGroup': security_groups['manager'].id,
                      'EmrManagedSlaveSecurityGroup': security_groups['worker'].id,
                  },
                  Steps=[{
                      'Name': step['name'],
                      'ActionOnFailure': 'CONTINUE',
                      'HadoopJarStep': {
                          'Jar': 'command-runner.jar',
                          'Args': ['spark-submit', '--deploy-mode', 'cluster',
                                   step['script_uri'], *step['script_args']]
                      }
                  } for step in steps],
                  Applications=[{
                      'Name': app
                  } for app in applications],
                  JobFlowRole=job_flow_role.name,
                  ServiceRole=service_role.name,
                  EbsRootVolumeSize=10,
                  VisibleToAllUsers=True
              )
              cluster_id = response['JobFlowId']
              logger.info("Created cluster %s.", cluster_id)
          except ClientError:
              logger.exception("Couldn't create cluster.")
              raise
          else:
              return cluster_id
      

      下面是一些使用一些实际参数调用此函数的代码:

      output_prefix = 'pi-calc-output'
      pi_step = {
          'name': 'estimate-pi-step',
          'script_uri': f's3://{bucket_name}/{script_key}',
          'script_args':
              ['--partitions', '3', '--output_uri',
              f's3://{bucket_name}/{output_prefix}']
      }
      cluster_id = emr_basics.run_job_flow(
          f'{prefix}-cluster', f's3://{bucket_name}/logs',
          False, ['Hadoop', 'Hive', 'Spark'], job_flow_role, service_role,
          security_groups, [pi_step], emr_client)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-08-19
        • 2017-08-24
        • 1970-01-01
        • 1970-01-01
        • 2019-10-17
        相关资源
        最近更新 更多