【问题标题】:Reading from multiple S3 Buckets in Spark从 Spark 中的多个 S3 存储桶中读取
【发布时间】:2020-01-02 03:36:07
【问题描述】:

我有一个在 Yarn 集群上运行的 spark 应用程序,它需要从 S3 兼容对象存储上的多个存储桶中读取文件,每个存储桶都有自己的一组凭据。

根据hadoop documentation,应该可以通过在活动SparkSession 中设置spark.hadoop.fs.s3a.bucket.<bucket-name>.access.key=<access-key> 形式的配置来为多个存储桶指定凭据,但这在实践中对我不起作用。

根据文档,我认为应该可以使用的示例:

import org.apache.spark.sql.{SaveMode, SparkSession}

case class BucketCredential(bucketName: String, accessKey: String, secretKey: String)

object TestMultiBucketReadWrite {

  val credentials: Seq[BucketCredential] = Seq(
    BucketCredential("bucket.1", "access.key.1", "secret.key.1"),
    BucketCredential("bucket.2", "access.key.2", "secret.key.2")

  )

  def addCredentials(sparkBuilder: SparkSession.Builder, credentials: Seq[BucketCredential]): SparkSession.Builder = {
    var sBuilder = sparkBuilder
    for (credential <- credentials) {
      sBuilder = sBuilder
        .config(s"spark.hadoop.fs.s3a.bucket.${credential.bucketName}.access.key", credential.accessKey)
        .config(s"spark.hadoop.fs.s3a.bucket.${credential.bucketName}.secret.key", credential.secretKey)
    }
    sBuilder
  }

  def main(args: Array[String]): Unit = {
    val spark = addCredentials(SparkSession.builder(), credentials)
      .appName("Test MultiBucket Credentials")
      .getOrCreate()

    import spark.implicits._

    val dummyDF = Seq(1,2,3,4,5).toDS()

    println("Testing multi write...")
    credentials.foreach(credential => {
      val bucket = credential.bucketName
      dummyDF.write.mode(SaveMode.Overwrite).json(s"s3a://$bucket/test.json")
    })

    println("Testing multi read...")
    credentials.foreach(credential => {
      val bucket = credential.bucketName
      val df = spark.read.json(s"s3a://$bucket/test.json").as[Long]
      println(df.collect())
    })
  }

}

但是,提交时,作业失败并出现以下错误:

Testing multi write...
Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: null
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:93)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:545)

当我按顺序设置 fs.s3a.access.keyfs.s3a.secret.key 设置时,这项工作确实成功了,但这涉及顺序读/写:

    //...
    println("Testing multi write...")
    credentials.foreach(credential => {
      val bucket = credential.bucketName
      spark.conf.set("fs.s3a.access.key", credential.accessKey)
      spark.conf.set("fs.s3a.secret.key", credential.secretKey)
      dummyDF.write.mode(SaveMode.Overwrite).json(s"s3a://$bucket/test.json")
    })

    println("Testing multi read...")
    credentials.foreach(credential => {
      val bucket = credential.bucketName
      spark.conf.set("fs.s3a.access.key", credential.accessKey)
      spark.conf.set("fs.s3a.secret.key", credential.secretKey)
      val df = spark.read.json(s"s3a://$bucket/test.json").as[Long]
      println(df.collect())
    })
    //...

【问题讨论】:

  • 什么版本的 Spark,AWS 库?

标签: scala amazon-web-services apache-spark hadoop amazon-s3


【解决方案1】:

据我所知,此功能仅适用于 hadoop-aws>=2.8.0。

【讨论】:

    【解决方案2】:

    线程“main”中的异常 com.amazonaws.services.s3.model.AmazonS3Exception:状态代码:403, AWS 服务:Amazon S3,AWS 请求 ID:null,AWS 错误代码:null, AWS 错误消息:禁止,S3 扩展请求 ID:空

    403 Forbidden 表示理解请求,无法服务....

    s3 帐户没有您的多个存储桶之一的访问权限。 请再次检查...

    其中一个原因可能是代理问题...

    AWS 使用 http 代理连接到 aws 集群。我希望那些代理设置是正确的 在您的 shell 脚本中定义这些示例变量,

     SPARK_DRIVER_JAVA_OPTS="
            -Dhttp.proxyHost=${PROXY_HOST}
            -Dhttp.proxyPort=${PROXY_PORT}
            -Dhttps.proxyHost=${PROXY_HOST}
            -Dhttps.proxyPort=${PROXY_PORT}
            $SPARK_DRIVER_JAVA_OPTS"
    
        SPARK_EXECUTOR_JAVA_OPTS="
            -Dhttp.proxyHost=${PROXY_HOST}
            -Dhttp.proxyPort=${PROXY_PORT}
            -Dhttps.proxyHost=${PROXY_HOST}
            -Dhttps.proxyPort=${PROXY_PORT}
            $SPARK_EXECUTOR_JAVA_OPTS"
    
        SPARK_OPTS="
            --conf spark.hadoop.fs.s3a.proxy.host=${PROXY_HOST}
            --conf spark.hadoop.fs.s3a.proxy.port=${PROXY_PORT}
            $SPARK_OPTS"
    

    spark 提交看起来像...

      spark-submit \
          --executor-cores $SPARK_EXECUTOR_CORES \
          --executor-memory $SPARK_EXECUTOR_MEMORY \
          --driver-memory $SPARK_DRIVER_MEMORY \
          --driver-java-options "$SPARK_DRIVER_JAVA_OPTS" \
          --conf spark.executor.extraJavaOptions="$SPARK_EXECUTOR_JAVA_OPTS" \
          --master $SPARK_MASTER \
          --class $APP_MAIN \
          $SPARK_OPTS \
          $APP_JAR "$@"
    

    注意:AFAIK,如果您对 AWS EMR 具有 s3 访问权限,则无需每次都设置访问密钥,因为它是隐式的

    【讨论】:

    • spark 应用程序不会在 AWS 本身上运行。代理设置都是正确的,当我使用 fs.s3a.access.key=&lt;access-key&gt;fs.s3a.secret.key=&lt;secret-key&gt; 设置存储桶的凭据时,我能够连接到 a 存储桶,但是我无法使用这种方法连接到多个存储桶根据我的问题同时进行。
    • ddi 你找到解决办法了吗?
    猜你喜欢
    • 1970-01-01
    • 2017-11-19
    • 1970-01-01
    • 1970-01-01
    • 2019-03-22
    • 2021-05-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多