【问题标题】:Airflow BigQueryInsertJobOperator configuration气流 BigQueryInsertJobOperator 配置
【发布时间】:2021-08-13 05:05:07
【问题描述】:

我在从已弃用的 BigQueryOperator 转换为 BigQueryInsertJobOperator 时遇到了一些问题。我有以下任务:

bq_extract = BigQueryInsertJobOperator(
    dag="big_query_task,
    task_id='bq_query',
    gcp_conn_id='google_cloud_default',
    params={'data': Utils().querycontext},
    configuration={
        "query": {"query": "{% include 'sql/bigquery.sql' %}", "useLegacySql": False,
                  "writeDisposition": "WRITE_TRUNCATE", "destinationTable": {"datasetId": bq_dataset}}
    })

我的 bigquery_extract.sql 查询中的这一行引发了错误:

{% for field in data.bq_fields %}

我想使用params中的'data',它正在调用一个方法,这个方法是从一个.json文件中读取的:

class Utils():
    bucket = Variable.get('s3_bucket')
    _qcontext = None

    @property
    def querycontext(self):
        if self._qcontext is None:
            self.load_querycontext()

        return self._qcontext

    def load_querycontext(self):
        with open(path.join(conf.get("core", "dags"), 'traffic/bq_query.json')) as f:
            self._qcontext = json.load(f)

bq_query.json是这种格式,我需要使用嵌套的bq_fields列表值:

{
"bq_fields": [
    { "name": "CONCAT(ID, '-', CAST(ID AS STRING), "alias": "new_id" },
    { "name": "TIMESTAMP(CAST(visitStartTime * 1000 AS INT64)", "alias": "new_timestamp" },
    { "name": "TO_JSON_STRING(hits.experiment)", "alias": "hit_experiment" }]
}

这个文件有一个我想在上面提到的查询行中使用的列表,但是它抛出了这个错误:

jinja2.exceptions.UndefinedError: 'data' 未定义

【问题讨论】:

    标签: google-bigquery airflow


    【解决方案1】:

    您的代码有两个问题。

    BigQueryInsertJobOperator 中不支持第一个“参数”字段。请参阅这篇文章,其中我发布了使用 BigQueryInsertJobOperator 时如何将参数传递给 sql 文件。 How do you pass variables with BigQueryInsertJobOperator in Airflow

    其次,如果您碰巧遇到无法找到文件的错误,请确保设置文件的完整路径。从本地测试迁移到云时,我不得不这样做,即使文件在同一个目录中。您可以使用以下示例在 dag 配置中设置路径(将路径替换为您的路径):

     with DAG(
        ...
        template_searchpath = '/opt/airflow/dags',
        ...
        
    ) as dag:
    

    【讨论】:

      猜你喜欢
      • 2022-08-18
      • 2022-01-06
      • 2021-09-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-10-14
      • 2022-12-10
      相关资源
      最近更新 更多