【发布时间】:2019-10-17 00:21:57
【问题描述】:
我正在 EMR 上启动一个集群,并使用气流在其上提交一些步骤。
我想要什么:
我想在通过EmrAddStepsOperator 添加的所有步骤都完成后终止我的集群
我尝试过的:
我曾尝试同时使用 EmrStepSensor 和 EmrTerminateJobFlowOperator,但我的 spark 步骤被取消并且集群在未完成所有步骤的情况下终止
请任何人建议如何正确地做到这一点。这是我的代码
dag = DAG('emr_job_flow_automatic_steps_17',
default_args=default_args,
schedule_interval="@daily",
max_active_runs=1,
catchup=True,
)
upload_to_S3_task = PythonOperator(
task_id='upload_to_S3',
python_callable=upload_file_to_S3,
op_kwargs={
'filename': '/home/ab/projects/test.py',
'key': 'test.py',
'bucket_name': 'dep-buck',
},
dag=dag)
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow2',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
aws_conn_id='aws_default',
steps=step,
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
upload_to_S3_task >> cluster_creator >> step_adder >> step_checker >> cluster_remover
关于 stackoverflow 的其他问题:
stackoverflow 上有一个类似的问题,但没有正确回答(使用 EmrTerminateJobFlowOperator)
【问题讨论】:
标签: python apache-spark boto3 airflow amazon-emr