【问题标题】:How to authenticate with BigQuery from Apache Spark (pyspark)?如何使用来自 Apache Spark (pyspark) 的 BigQuery 进行身份验证?
【发布时间】:2020-03-31 17:00:25
【问题描述】:

我已经为我的 bigquery 项目创建了 client idclient secret,但我不知道如何使用它们成功地将数据帧从 pyspark 脚本保存到我的 bigquery 表中。我下面的python代码导致以下错误。有没有办法可以使用 pyspark 数据帧上的保存选项连接到 BigQuery?

代码

df.write \
  .format("bigquery") \
  .option("client_id", "<MY_CLIENT_ID>") \
  .option("client_secret", "<MY_CLIENT_SECRET>") \
  .option("project", "bigquery-project-id") \
  .option("table", "dataset.table") \
  .save()

错误

py4j.protocol.Py4JJavaError:调用 o93.save 时出错。 : com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: 400 错误请求 { “error”:“invalid_grant”,“error_description”: “错误请求”} 在 com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:106) 在 com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:268) 在 com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$17.call(BigQueryImpl.java:664) 在 com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$17.call(BigQueryImpl.java:661) 在 com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105) 在 com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.run(RetryHelper.java:76) 在 com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50) 在 com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.getTable(BigQueryImpl.java:660) 在 com.google.cloud.spark.bigquery.BigQueryInsertableRelation.getTable(BigQueryInsertableRelation.scala:68) 在 com.google.cloud.spark.bigquery.BigQueryInsertableRelation.exists(BigQueryInsertableRelation.scala:54) 在 com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:86) 在 org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) 在 org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) 在 org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) 在 org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) 在 org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) 在 org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) 在 org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) 在 org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) 在 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) 在 org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) 在 org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) 在 org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:27​​1) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:748) 原因: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.http.HttpResponseException: 400 错误请求 { “error”:“invalid_grant”,“error_description”: “错误请求”}

【问题讨论】:

    标签: apache-spark pyspark google-bigquery


    【解决方案1】:

    来自spark-bigquery-connector

    如何在 GCE/Dataproc 之外进行身份验证?

    使用服务帐号 JSON 密钥和 GOOGLE_APPLICATION_CREDENTIALS 作为 描述了here

    凭据也可以作为参数或显式提供 来自 Spark 运行时配置。它可以作为 直接使用 base64 编码的字符串,或包含 凭据(但不是两者)。

    所以你应该使用这个:

    spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
    

    【讨论】:

    • 我还需要其他配置吗?我现在看到这个错误:Please set a project ID using the builder
    • 您也可以通过选项添加它。查看所有可用的properties
    • 请也添加这个选项 .option("parentProject", "project-to-be-billed")
    猜你喜欢
    • 2021-03-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-02
    • 2014-08-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多