【发布时间】:2020-02-14 10:30:46
【问题描述】:
我正在使用 Airflow 在 Google BigQuery 中触发加载作业。源文件由多个 NDJSON 文件组成。
这是 Airflow 运算符(我认为不相关。为上下文给出):
load = GoogleCloudStorageToBigQueryOperator(
task_id=f"load",
bigquery_conn_id="bigquery_default",
pool="bigquery_insert",
destination_project_dataset_table="<HIDDEN>",
bucket="<HIDDEN>",
source_objects=list_files(),
source_format="NEWLINE_DELIMITED_JSON",
write_disposition="WRITE_APPEND",
autodetect=True,
ignore_unknown_values=True
)
为了检查这不是 Airflow 故障,我已经调试并准确提取了 Airflow 发送到 Google BigQuery REST API 的有效负载:
{
"configuration":{
"load":{
"autodetect":True,
"createDisposition":"CREATE_IF_NEEDED",
"destinationTable":{
"projectId":"<PRIVATE>",
"datasetId":"<PRIVATE>",
"tableId":"<PRIVATE>"
},
"sourceFormat":"NEWLINE_DELIMITED_JSON",
"sourceUris":[
"<PRIVATE>"
],
"writeDisposition":"WRITE_APPEND",
"ignoreUnknownValues":True
}
}
}
由于我设置了选项 ignoreUnknownValues (documentation),我希望我的源文件中但不在我的目标架构中的 JSON 字段将被忽略,但我收到以下错误来自 BigQuery:
异常:BigQuery 作业失败。最终错误是:{'reason': 'invalid', '消息':'提供的架构与表 [PRIVATE] 不匹配。无法添加 字段(字段:source_fingerprint)'}。这份工作是:{'kind': 'bigquery#job','etag':'[PRIVATE]','id':'[PRIVATE]','selfLink': '[PRIVATE]','user_email':'[PRIVATE]','配置':{'load': {'sourceUris':[[PRIVATE]],'destinationTable':{'projectId': '[PRIVATE]','datasetId':'气流','tableId':'[PRIVATE]'}, “createDisposition”:“CREATE_IF_NEEDED”、“writeDisposition”: 'WRITE_APPEND', 'sourceFormat': 'NEWLINE_DELIMITED_JSON', 'ignoreUnknownValues': True, 'autodetect': True}, 'jobType': 'LOAD'}, 'jobReference': {'projectId': '[PRIVATE]', 'jobId': '[PRIVATE]', '位置':'欧盟'},'统计':{'creationTime':'1581675754961', “开始时间”:“1581675755090”,“结束时间”:“1581675755491”},“状态”: {'errorResult': {'reason': 'invalid', 'message': '提供的 Schema 确实 不匹配表 [PRIVATE]。无法添加字段(字段: source_fingerprint)'}, 'errors': [{'reason': 'invalid', 'message': '提供的架构与表 [PRIVATE] 不匹配。无法添加字段 (field: source_fingerprint)'}], 'state': 'DONE'}}
请注意,我的 ignoreUnknownValues 选项也会在响应中返回,因此他们理解这一点。
根据文档,我希望忽略额外的列并成功完成作业:
ignoreUnknownValues: 布尔值
[可选] 指示 BigQuery 是否应允许额外的值 未在表架构中表示。如果为真,额外的值是 忽略。如果为 false,则带有额外列的记录将被视为错误 记录,如果坏记录太多,则无效错误 在作业结果中返回。默认值为假。这 sourceFormat 属性确定 BigQuery 将哪些内容视为额外内容 值:CSV:尾随列 JSON:不匹配的命名值 列名
有人知道发生了什么吗?
请注意,我不想更新我的架构(因此,我没有使用选项schemaUpdateOptions)。我希望忽略多余的列。
谢谢
--
更新 1:我使用的是 Airflow 1.10.3,它已经支持此选项的此语法。旧版本的 Airflow 具有不同的传递此参数的方式,但正如我们在我发布的有效负载中看到的那样,Airflow 似乎正在向 Google BigQuery API 发送正确的选项(related question 不适用)。
更新 2:在使用 CLI 时,我也遇到了同样的错误。
bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON --noreplace --ignore_unknown_values [MY TABLE NAME] [MY GCS PATH]
Waiting on bqjob_[...]_1 ... (0s) 当前状态:DONE BigQuery 错误 在加载操作中:错误处理作业'[...]':提供的架构确实 不匹配表 [...]。无法添加字段(字段: metadata_deposit_00_sourceId)
更新 3:当我同时使用 autodetect 和 ignore_unkown_values 时,似乎会出现问题。如果我将现有架构提供为 schema_fields,那么 ignore_unkown_values 将按我的预期工作,但在文档中我并不清楚。
【问题讨论】:
-
新列是 NULLABLE、REQUIRED 还是 REPEATED
-
@PaddyPopeye 在我的源 JSON 文件上?我使用
autodetect=True作为输入模式,所以我什至不确定它会如何应用 -
如果需要添加额外的字段,可以使用 schemaUpdateOptions[] 来允许新的字段。但它不适用于自动检测,因此您需要为负载显式提供新架构
-
谢谢,但正如我上面写的,我希望忽略而不是添加额外的字段。
标签: google-bigquery etl airflow