【问题标题】:Passing Result of a Python Operator as a parameter to BigQueryInsertJobOperator将 Python 运算符的结果作为参数传递给 BigQueryInsertJobOperator
【发布时间】:2021-08-12 12:56:46
【问题描述】:

我的 DAG 中有一个 python 运算符和 BigQueryInsertJobOperator。 python 运算符返回的结果应在 params 字段中传递给 BigQueryInsertJobOperator。

下面是我正在运行的脚本。

def get_columns():
    field = "name"
    return field


with models.DAG(
        "xcom_test",
        default_args=default_args,
        schedule_interval="0 0 * * *",
        tags=["xcom"],
)as dag:
    t1 = PythonOperator(task_id="get_columns", python_callable=get_columns, do_xcom_push=True)

    t2 = BigQueryInsertJobOperator(
        task_id="bigquery_insert",
        project_id=project_id,
        configuration={
            "query": {
                "query": "{% include 'xcom_query.sql' %}",
                "useLegacySql": False,
            }
        },
        force_rerun=True,
        provide_context=True,
        params={
            "columns": "{{ ti.xcom_pull(task_ids='get_columns') }}",
            "project_id": project_id
        },
    )

xcom_query.sql 如下所示

INSERT INTO `{{ params.project_id }}.test.xcom_test`
    {{  params.columns }}
select 'Andrew' from `{{ params.project_id }}.test.xcom_test`

运行时,列参数被转换为字符串,因此导致错误。以下是查询的转换方式。

 INSERT INTO `project.test.xcom_test`
  {{ ti.xcom_pull(task_ids='get_columns') }}
 select 'Andrew' from `project.test.xcom_test`

知道我错过了什么吗?

【问题讨论】:

  • 也许可以看看这里:stackoverflow.com/a/68024013/2838867 - 根据这篇文章,运营商不支持参数,您应该使用 user_defined_macros。
  • 在我的例子中,传递给 BigQueryInsertJob 运算符的值是由另一个任务生成的,因此用户定义的宏没有帮助..

标签: airflow


【解决方案1】:

我找到了我的 dag 失败的原因。

BigQueryInsertJobOperator 的“params”字段不是模板化字段,因此调用“task_instance.xcom_pull”不会以这种方式工作。

但是,您可以直接从 jinja 模板访问“task_instance”变量。

INSERT INTO `{{ params.project_id }}.test.xcom_test`
    ({{ task_instance.xcom_pull(task_ids='get_columns') }})
select
'Andrew' from `{{ params.project_id }}.test.xcom_test`

https://marclamberti.com/blog/templates-macros-apache-airflow/ - 本文解释了如何识别气流中的模板参数

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-01
    • 2017-11-18
    • 2018-03-27
    • 1970-01-01
    • 1970-01-01
    • 2018-10-17
    相关资源
    最近更新 更多