【问题标题】:Airflow emr cluster termination being called before step completion在步骤完成之前调用气流 emr 集群终止
【发布时间】:2019-10-17 00:21:57
【问题描述】:

我正在 EMR 上启动一个集群,并使用气流在其上提交一些步骤。

我想要什么:

我想在通过EmrAddStepsOperator 添加的所有步骤都完成后终止我的集群

我尝试过的:

我曾尝试同时使用 EmrStepSensorEmrTerminateJobFlowOperator,但我的 spark 步骤被取消并且集群在未完成所有步骤的情况下终止

请任何人建议如何正确地做到这一点。这是我的代码

dag = DAG('emr_job_flow_automatic_steps_17',
         default_args=default_args,
         schedule_interval="@daily",
         max_active_runs=1,
         catchup=True,
)


upload_to_S3_task = PythonOperator(
   task_id='upload_to_S3',
   python_callable=upload_file_to_S3,
   op_kwargs={
       'filename': '/home/ab/projects/test.py',
       'key': 'test.py',
       'bucket_name': 'dep-buck',
   },
   dag=dag)

cluster_creator = EmrCreateJobFlowOperator(
   task_id='create_job_flow2',
   job_flow_overrides=JOB_FLOW_OVERRIDES,
   aws_conn_id='aws_default',
   emr_conn_id='emr_default',
   dag=dag
)

step_adder = EmrAddStepsOperator(
   task_id='add_steps',
   job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
   aws_conn_id='aws_default',
   steps=step,
   dag=dag
)
step_checker = EmrStepSensor(
   task_id='watch_step',
   job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
   step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
   aws_conn_id='aws_default',
   dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
   task_id='remove_cluster',
   job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
   aws_conn_id='aws_default',
   dag=dag
)

upload_to_S3_task >> cluster_creator >> step_adder >> step_checker >> cluster_remover

关于 stackoverflow 的其他问题:

stackoverflow 上有一个类似的问题,但没有正确回答(使用 EmrTerminateJobFlowOperator)

【问题讨论】:

    标签: python apache-spark boto3 airflow amazon-emr


    【解决方案1】:

    所以我设法做到了。问题是我将EmrStepSensor 作为一个stepadder 提供所有步骤,因此一旦完成它就会终止集群。

    解决方法是将所有步骤分开,并将最后一步的'id 给EmrStepSensor。或者,我仅将最后一步及其单独的步骤加法器 (step_adder_actual_step) 与其他步骤分开,并将其提供给 EmrStepSensor

    step_adder_pre_step = EmrAddStepsOperator(
        task_id='pre_step',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=pre_step,
        dag=dag
    )
    
    step_adder_actual_step = EmrAddStepsOperator(
        task_id='actual_step',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=actual_step,
        dag=dag
    )
    
    step_checker = EmrStepSensor(
        task_id='watch_step',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull('actual_step', key='return_value')[0] }}",
        aws_conn_id='aws_default',
        dag=dag
    )
    
    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
        aws_conn_id='aws_default',
        dag=dag
    )
    
    cluster_creator >> step_adder_pre_step >> step_adder_actual_step >> step_checker >> cluster_remover 
    

    【讨论】:

      【解决方案2】:

      我遇到了同样的问题,EMR 只执行了一个步骤就终止了。

      SOLUTION: on step_checker -> step_id, change [0] for .pop()
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-09-16
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-05-03
        • 1970-01-01
        • 1970-01-01
        • 2013-12-10
        相关资源
        最近更新 更多