【发布时间】:2021-08-12 12:56:46
【问题描述】:
我的 DAG 中有一个 python 运算符和 BigQueryInsertJobOperator。 python 运算符返回的结果应在 params 字段中传递给 BigQueryInsertJobOperator。
下面是我正在运行的脚本。
def get_columns():
field = "name"
return field
with models.DAG(
"xcom_test",
default_args=default_args,
schedule_interval="0 0 * * *",
tags=["xcom"],
)as dag:
t1 = PythonOperator(task_id="get_columns", python_callable=get_columns, do_xcom_push=True)
t2 = BigQueryInsertJobOperator(
task_id="bigquery_insert",
project_id=project_id,
configuration={
"query": {
"query": "{% include 'xcom_query.sql' %}",
"useLegacySql": False,
}
},
force_rerun=True,
provide_context=True,
params={
"columns": "{{ ti.xcom_pull(task_ids='get_columns') }}",
"project_id": project_id
},
)
xcom_query.sql 如下所示
INSERT INTO `{{ params.project_id }}.test.xcom_test`
{{ params.columns }}
select 'Andrew' from `{{ params.project_id }}.test.xcom_test`
运行时,列参数被转换为字符串,因此导致错误。以下是查询的转换方式。
INSERT INTO `project.test.xcom_test`
{{ ti.xcom_pull(task_ids='get_columns') }}
select 'Andrew' from `project.test.xcom_test`
知道我错过了什么吗?
【问题讨论】:
-
也许可以看看这里:stackoverflow.com/a/68024013/2838867 - 根据这篇文章,运营商不支持参数,您应该使用 user_defined_macros。
-
在我的例子中,传递给 BigQueryInsertJob 运算符的值是由另一个任务生成的,因此用户定义的宏没有帮助..
标签: airflow