【发布时间】:2018-07-20 09:57:34
【问题描述】:
我是 Python 新手,也是 Airflow 新手。
我正在使用 Snowflake 数据库。
我创建了一个运算符SnowflakeGetDataOperator,它返回雪花hook.get_records 方法(我返回少量的kines - 通常是单个单元格)
所以现在我有这个任务:
check_last_run_date=SnowflakeGetDataOperator(
task_id='check_last_run_date',
sql="SELECT COALESCE (max(update_date), '2000-01-01') FROM poc.dwh.fact_collector",
snowflake_conn_id='snowflake_default',
dag=dag)
当这个任务运行时,我在 Airfow 后端看到这个任务的 xcom 对象(操作符的返回值 - 我没有使用 xcom.push())
我的问题是如何从下一个下游任务中访问这个值?
我需要将它用作我的下一个 sql 运算符的参数。
我在 dag 代码中尝试了以下行
{{ task_instance.xcom_pull(task_ids='check_last_run_date') }}
但代码无法识别 task_instance 属性。
编辑
下一个任务应该是这样的
fill_agg_table = SnowflakeOperator(
task_id='fill_cust_agg_data',
sql= str.replace ("""INSERT INTO oc.TEMP_COMPUTING.collector_customer_aggregative_data
( SELECT * FROM POC.STG."stg_atg_data" WHERE XXXXX < current_date)""",
'XXXXX',
{{ task_instance.xcom_pull(task_ids='check_last_run_date') }},
snowflake_conn_id='snowflake_default',
dag=dag ))
【问题讨论】: