【问题标题】:airflow xcom.pull() access to a implicitly returned value of upstream task气流 xcom.pull() 访问上游任务的隐式返回值
【发布时间】:2018-07-20 09:57:34
【问题描述】:

我是 Python 新手,也是 Airflow 新手。

我正在使用 Snowflake 数据库。

我创建了一个运算符SnowflakeGetDataOperator,它返回雪花hook.get_records 方法(我返回少量的kines - 通常是单个单元格)

所以现在我有这个任务:

check_last_run_date=SnowflakeGetDataOperator(
    task_id='check_last_run_date',
    sql="SELECT COALESCE (max(update_date), '2000-01-01') FROM poc.dwh.fact_collector",
    snowflake_conn_id='snowflake_default',
    dag=dag)

当这个任务运行时,我在 Airfow 后端看到这个任务的 xcom 对象(操作符的返回值 - 我没有使用 xcom.push()

我的问题是如何从下一个下游任务中访问这个值?

我需要将它用作我的下一个 sql 运算符的参数。

我在 dag 代码中尝试了以下行

{{ task_instance.xcom_pull(task_ids='check_last_run_date') }}

但代码无法识别 task_instance 属性。

编辑

下一个任务应该是这样的

fill_agg_table = SnowflakeOperator( 
task_id='fill_cust_agg_data', 
sql= str.replace ("""INSERT INTO oc.TEMP_COMPUTING.collector_customer_aggregative_data 
  ( SELECT * FROM POC.STG."stg_atg_data" WHERE XXXXX < current_date)""", 
    'XXXXX', 
    {{ task_instance.xcom_pull(task_ids='check_last_run_date') }}, 
snowflake_conn_id='snowflake_default', 
dag=dag )) 

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    迟到了,但你的标题就是答案:

    xcom_pull() 不带 args 将返回 dagrun 的最新 return_value,因此假设只有一个任务,直接上游任务推送的值。

    文档中没有明确说明,但我喜欢这样比硬编码任务名称更好。

    【讨论】:

      【解决方案2】:

      您的第二个任务看起来有点不寻常。如果字段是模板化的,您可以简单地将字段放入字符串中。

      事实上,使用string.replacestring.format 会弄乱你的宏并且在Airflow 中不能很好地工作。其他宏在这里:https://airflow.apache.org/code.html#macros

      确保在您自己的运算符中模板化 sql 字段。如何执行此操作请参阅此示例代码 https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/bigquery_operator.py 并检查变量 templated_fields

      建议:

      sql= """INSERT INTO oc.TEMP_COMPUTING.collector_customer_aggregative_data 
        ( SELECT * FROM POC.STG."stg_atg_data" WHERE {{ task_instance.xcom_pull(task_ids='check_last_run_date') }} < current_date)""", 
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-09-07
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-09-12
        • 2020-04-29
        • 1970-01-01
        • 2021-05-06
        相关资源
        最近更新 更多