【发布时间】:2022-01-08 22:50:05
【问题描述】:
这是我的达格: 我正在尝试旋转 EMR 集群并尝试在添加步骤时使用其集群 ID。 用例是:想要旋转一个集群并将其集群 ID 保存在 s3 中的某处。 但是 xcom_pull 显示错误。
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_job_flow import EmrJobFlowSensor
from airflow.utils.dates import days_ago
SPARK_STEPS = [
{
'Name': 'emr_spin',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'airflow_trial',
'ReleaseLabel': 'emr-5.29.0',
'Applications': [{'Name': 'Spark'}],
'Instances': {
'InstanceGroups': [
{
'Name': "Master",
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'Steps': [],
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
}
with DAG(
dag_id='xcom',
default_args={
'owner': 'airflow'
},
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(2),
schedule_interval='0 3 * * *',
tags=['example'],
) as dag:
# [START howto_operator_emr_automatic_steps_tasks]
job_flow_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id = "{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
输出: XCom
Key Value
job_flow_id None
请帮忙。 我想将集群 ID 保存在 json 中。 但我没有使用 xcom 得到它
【问题讨论】:
-
EmrCreateJobFlowOperator 是否成功完成?
-
是的。它确实成功完成了。但是这些步骤不是通过 EmrAddStepsOperator 添加的。
-
您从哪里打印job_flow_id,请分享该代码
标签: python airflow amazon-emr