【发布时间】:2020-01-16 00:14:08
【问题描述】:
我在 Big Query 中有一个复杂的联接查询,需要在 spark 作业中运行。这是当前代码:
val bigquery = BigQueryOptions.newBuilder().setProjectId(bigQueryConfig.bigQueryProjectId)
.setCredentials(credentials)
.build().getService
val query =
//some complex query
val queryConfig: QueryJobConfiguration =
QueryJobConfiguration.newBuilder(
query)
.setUseLegacySql(false)
.setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
.build()
val jobId: JobId = JobId.newBuilder().setRandomJob().build()
val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()
val result = queryJob.getQueryResults()
val output = result.iterateAll().iterator().asScala.to[Seq].map { row: FieldValueList =>
//create case class from the row
}
一直遇到这个错误:
超出速率限制:您的项目:XXX 超出了每个项目每秒 tabledata.list 字节的配额。
有没有办法更好地迭代结果?我曾尝试在查询作业配置上执行setPriority(QueryJobConfiguration.Priority.BATCH),但它并没有改善结果。还尝试将 spark executor 的数量减少到 1,但没有用。
【问题讨论】:
-
设置
BATCH无济于事。它所做的事情与您想象的不同(它将您的初始查询放在较低优先级的队列中,而不是立即执行它,即它与INTERACTIVE查询相反)。看看这里:stackoverflow.com/questions/55510164/… 和这里:cloud.google.com/bigquery/quotas#api_requests,以便更好地阅读表格/行。 -
感谢您的建议。该文档提到了如何从 BQ 表中读取,但没有提到查询。此外,BigQuery Storage API 请求 以下限制适用于使用 BigQuery Storage API 的 ReadRows 调用: 每分钟 ReadRows 调用:5,000 :当您使用 BigQuery Storage API 读取数据时,每个用户每分钟最多可以调用 5,000 个 ReadRows项目。以下限制适用于使用 BigQuery Storage API 的所有其他方法调用: API 调用每分钟:1,000 :BigQuery Storage API 调用每分钟、每个用户、每个项目限制为 1,000 次。
-
每个查询都是一个表。当您在 BigQuery 中运行查询时,如果您未指定要将结果写入的表目标,则其结果会在后台保存为临时表。您需要遵守限制/配额,即放慢您的请求。
标签: scala apache-spark google-bigquery