【问题标题】:Why does xcom_pull from EmrCreateJobFlowOperator returns None in Airflow为什么来自 EmrCreateJobFlowOperator 的 xcom_pull 在 Airflow 中返回 None
【发布时间】:2022-01-08 22:50:05
【问题描述】:

这是我的达格: 我正在尝试旋转 EMR 集群并尝试在添加步骤时使用其集群 ID。 用例是:想要旋转一个集群并将其集群 ID 保存在 s3 中的某处。 但是 xcom_pull 显示错误。

from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_job_flow import EmrJobFlowSensor
from airflow.utils.dates import days_ago

SPARK_STEPS = [
    {
        'Name': 'emr_spin',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
        },
    }
]

JOB_FLOW_OVERRIDES = {
    'Name': 'airflow_trial',
    'ReleaseLabel': 'emr-5.29.0',
    'Applications': [{'Name': 'Spark'}],
    'Instances': {
        'InstanceGroups': [
            {
                'Name': "Master",
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': False,
        'TerminationProtected': False,
    },
    'Steps': [],
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
}

with DAG(
    dag_id='xcom',
    default_args={
        'owner': 'airflow'
    },
    dagrun_timeout=timedelta(hours=2),
    start_date=days_ago(2),
    schedule_interval='0 3 * * *',
    tags=['example'],
) as dag:

    # [START howto_operator_emr_automatic_steps_tasks]
    job_flow_creator = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )

    step_adder = EmrAddStepsOperator(
        task_id='add_steps',
        job_flow_id = "{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=SPARK_STEPS,
    )

输出: XCom

Key         Value
job_flow_id None

请帮忙。 我想将集群 ID 保存在 json 中。 但我没有使用 xcom 得到它

【问题讨论】:

  • EmrCreateJobFlowOperator 是否成功完成?
  • 是的。它确实成功完成了。但是这些步骤不是通过 EmrAddStepsOperator 添加的。
  • 您从哪里打印job_flow_id,请分享该代码

标签: python airflow amazon-emr


【解决方案1】:

有多种方法可以通过job_flow_id,请您尝试一下并告诉我结果。

首先,使用xcom,尝试使用xcom_pull,如下所示

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=SPARK_STEPS,
)

使用 EmrCreateJobFlowOperator 的 output

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id=job_flow_creator.output,
    aws_conn_id='aws_default',
    steps=SPARK_STEPS,
)

不确定,但也可以单独尝试job_flow_creator,不使用job_flow_creator.output

【讨论】:

  • 当我使用 job_flow_creator.output 时,它工作正常。但是当我使用 job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}" 时,没有添加这些步骤。此外,当我尝试在字符串中存储:job_flow_creator.output 或“{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}”时,它什么也不存储但理想情况下它应该存储集群 ID跨度>
  • ob_flow_creator.output 是否也添加了这些步骤?你特别需要job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}吗?
  • 如果有帮助,请随时投票/接受答案:)
  • 我需要使用 job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }} 专门在字符串中捕获集群 ID,以便我可以将其保存在某处S3 用于在同一集群中添加其他步骤。
  • job_flow_creator.output 为您提供了 cluster_id (job_flow_id == cluster_id),您可以保存它,将其传递给 operator,甚至将其持久化到 s3。如果我遗漏了什么,请告诉我?
猜你喜欢
  • 1970-01-01
  • 2013-07-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多