【问题标题】:How to iterate Big Query TableResult correctly?如何正确迭代 Big Query TableResult?
【发布时间】:2020-01-16 00:14:08
【问题描述】:

我在 Big Query 中有一个复杂的联接查询,需要在 spark 作业中运行。这是当前代码:

val bigquery = BigQueryOptions.newBuilder().setProjectId(bigQueryConfig.bigQueryProjectId)
      .setCredentials(credentials)
      .build().getService

val query =
      //some complex query

val queryConfig: QueryJobConfiguration =
      QueryJobConfiguration.newBuilder(
        query)
        .setUseLegacySql(false)
        .setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
        .build()

val jobId: JobId = JobId.newBuilder().setRandomJob().build()

val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()

val result = queryJob.getQueryResults()

val output = result.iterateAll().iterator().asScala.to[Seq].map { row: FieldValueList =>

//create case class from the row
}

一直遇到这个错误:

超出速率限制:您的项目:XXX 超出了每个项目每秒 tabledata.list 字节的配额。

有没有办法更好地迭代结果?我曾尝试在查询作业配置上执行setPriority(QueryJobConfiguration.Priority.BATCH),但它并没有改善结果。还尝试将 spark executor 的数量减少到 1,但没有用。

【问题讨论】:

  • 设置BATCH 无济于事。它所做的事情与您想象的不同(它将您的初始查询放在较低优先级的队列中,而不是立即执行它,即它与INTERACTIVE 查询相反)。看看这里:stackoverflow.com/questions/55510164/… 和这里:cloud.google.com/bigquery/quotas#api_requests,以便更好地阅读表格/行。
  • 感谢您的建议。该文档提到了如何从 BQ 表中读取,但没有提到查询。此外,BigQuery Storage API 请求 以下限制适用于使用 BigQuery Storage API 的 ReadRows 调用: 每分钟 ReadRows 调用:5,000 :当您使用 BigQuery Storage API 读取数据时,每个用户每分钟最多可以调用 5,000 个 ReadRows项目。以下限制适用于使用 BigQuery Storage API 的所有其他方法调用: API 调用每分钟:1,000 :BigQuery Storage API 调用每分钟、每个用户、每个项目限制为 1,000 次。
  • 每个查询都是一个表。当您在 BigQuery 中运行查询时,如果您未指定要将结果写入的表目标,则其结果会在后台保存为临时表。您需要遵守限制/配额,即放慢您的请求。

标签: scala apache-spark google-bigquery


【解决方案1】:

您可以使用spark-bigquery-connector 将它们读入DataFrame,而不是直接读取查询结果:

val queryConfig: QueryJobConfiguration =
      QueryJobConfiguration.newBuilder(
        query)
        .setUseLegacySql(false)
        .setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
        .setDestinationTable(TableId.of(destinationDataset, destinationTable))
        .build()
val jobId: JobId = JobId.newBuilder().setRandomJob().build()

val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()

val result = queryJob.getQueryResults()

// read into DataFrame
val data = spark.read.format("bigquery")
  .option("dataset",destinationDataset)
  .option("table" destinationTable)
  .load()

【讨论】:

  • 感谢大卫·拉比诺维茨的回复。我打算使用它,但在我的项目中添加 spark bigquery 依赖项时遇到问题。提交 spark 作业时,它会在我的主类上引发 ClassNotFoundException。我试图解决这个问题。将更新进展情况
  • 你知道我们是否可以使用 spark-bigquery 连接器直接执行 bigquery 查询而不是执行查询 -> 存储到中间表 -> 从表中读取
  • 关于 ClassNotFoundException - 请查看documentation,了解如何将连接器添加到您的项目。简而言之,您可以在 spark-submit 中添加 --jars 参数。关于您的第二个问题,请注意 BigQuery 不是常用数据库,无论如何它都会将查询结果存储在临时表中,因此这样做不会改变通常的行为。
  • 我必须阅读近 100 个大型 BQ 表,并且需要很长时间。根据我的原始代码,查询是在执行程序中执行的。但是现在因为spark session是用来读取的,所以在驱动里面跑起来太慢了
  • 你确定是阅读部分还是查询部分?结果的大小是多少?我可以向您保证 getQueryResults 在单线程中运行,而 Spark DataSource API(底层 spark.read)将读取分配给执行程序。您是否在执行程序上将原始代码作为 lambda 的一部分运行?
【解决方案2】:

我们通过在 TableResult 上提供自定义页面大小解决了这种情况

【讨论】:

    猜你喜欢
    • 2023-02-09
    • 1970-01-01
    • 1970-01-01
    • 2019-11-02
    • 2015-09-05
    • 1970-01-01
    • 1970-01-01
    • 2017-08-16
    • 2021-08-09
    相关资源
    最近更新 更多