【发布时间】:2021-08-13 05:05:07
【问题描述】:
我在从已弃用的 BigQueryOperator 转换为 BigQueryInsertJobOperator 时遇到了一些问题。我有以下任务:
bq_extract = BigQueryInsertJobOperator(
dag="big_query_task,
task_id='bq_query',
gcp_conn_id='google_cloud_default',
params={'data': Utils().querycontext},
configuration={
"query": {"query": "{% include 'sql/bigquery.sql' %}", "useLegacySql": False,
"writeDisposition": "WRITE_TRUNCATE", "destinationTable": {"datasetId": bq_dataset}}
})
我的 bigquery_extract.sql 查询中的这一行引发了错误:
{% for field in data.bq_fields %}
我想使用params中的'data',它正在调用一个方法,这个方法是从一个.json文件中读取的:
class Utils():
bucket = Variable.get('s3_bucket')
_qcontext = None
@property
def querycontext(self):
if self._qcontext is None:
self.load_querycontext()
return self._qcontext
def load_querycontext(self):
with open(path.join(conf.get("core", "dags"), 'traffic/bq_query.json')) as f:
self._qcontext = json.load(f)
bq_query.json是这种格式,我需要使用嵌套的bq_fields列表值:
{
"bq_fields": [
{ "name": "CONCAT(ID, '-', CAST(ID AS STRING), "alias": "new_id" },
{ "name": "TIMESTAMP(CAST(visitStartTime * 1000 AS INT64)", "alias": "new_timestamp" },
{ "name": "TO_JSON_STRING(hits.experiment)", "alias": "hit_experiment" }]
}
这个文件有一个我想在上面提到的查询行中使用的列表,但是它抛出了这个错误:
jinja2.exceptions.UndefinedError: 'data' 未定义
【问题讨论】: