【发布时间】:2021-12-15 09:33:41
【问题描述】:
我致力于创建数据管道,它从各种常规数据库和 CSV 文件中获取数据。它将使用 pyspark 进行预处理,然后将结果数据帧写入 BigQuery 表。
我已将 jar 文件 gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.1.jar 复制到 $SPARK_HOME/jars 文件夹,并在创建 spark 对象时在 spark.jars.packages 中指定了相同的内容。
请给出一些建议,因为我能找到的所有博客都使用 DataProc 作为示例。
spark = SparkSession. \
builder. \
enableHiveSupport(). \
appName('data_load'). \
master('yarn'). \
config('spark.jars.packages','com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.23.1'). \
getOrCreate()
Scala 版本 = 2.12.15
在写入大查询时,我收到与身份验证相关的错误。 BigQuery Connection API 已启用。
df.write.format('bigquery') \
.option('table',big_table_id) \
.save()
错误:
Py4JJavaError: An error occurred while calling o59.save.
: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Request had insufficient authentication scopes.
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:286)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$18.call(BigQueryImpl.java:746)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$18.call(BigQueryImpl.java:743)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.run(RetryHelper.java:76)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.getTable(BigQueryImpl.java:742)
at com.google.cloud.bigquery.connector.common.BigQueryClient.getTable(BigQueryClient.java:107)
at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.getTable$lzycompute(BigQueryInsertableRelation.scala:67)
at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.getTable(BigQueryInsertableRelation.scala:67)
at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.exists$lzycompute(BigQueryInsertableRelation.scala:53)
at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.exists(BigQueryInsertableRelation.scala:49)
at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:114)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
GET https://www.googleapis.com/bigquery/v2/projects/project212/datasets/NYSE_datasets/tables/NYSE_table?prettyPrint=false
{
"code" : 403,
"details" : [ {
"@type" : "type.googleapis.com/google.rpc.ErrorInfo",
"reason" : "ACCESS_TOKEN_SCOPE_INSUFFICIENT"
} ],
"errors" : [ {
"domain" : "global",
"message" : "Insufficient Permission",
"reason" : "insufficientPermissions"
} ],
"message" : "Request had insufficient authentication scopes.",
"status" : "PERMISSION_DENIED"
}
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:428)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:284)
【问题讨论】: