【问题标题】:Spark Scala read csv file using s3aSpark Scala 使用 s3a 读取 csv 文件
【发布时间】:2018-04-26 17:01:28
【问题描述】:

我正在尝试使用本地运行的 Spark - Scala 从 S3 存储桶中读取 csv(本机)文件。我可以使用 http 协议读取文件,但我打算使用 s3a 协议。

以下是通话前的配置设置。

    val awsId = System.getenv("AWS_ACCESS_KEY_ID")
    val awsKey = System.getenv("AWS_SECRET_ACCESS_KEY")
    sc.hadoopConfiguration.set("fs.s3a.access.key", awsId) 
    sc.hadoopConfiguration.set("fs.s3a.secret.key", awsKey)    
    sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider");
    sc.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true")
    sc.hadoopConfiguration.set("fs.s3a.endpoint", "us-east-1.amazonaws.com")
    sc.hadoopConfiguration.set("fs.s3a.impl.disable.cache", "true")
 here

读取文件并打印 rdd/dataframe 的前 5 行

    val fileAPath = Files.s3aPath(Files.input);
    println("reading file s3", fileAPath)
    // s3a://bucket-name/dataSets/policyoutput.csv
    val df = sc.textFile(fileAPath);
    df.take(5).foreach(println);

我得到以下异常

Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: FD92FDC175C64AA2, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: IuloUEASgqnY4lrSMpbyJpwgFfCFbttxuxmJ9hGHMUgZTbO/UR/YyDgjix+3rBe0Y4MQHPzNvhA=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1333)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1327)

任何进一步调查的帮助/指导将不胜感激。

谢谢

【问题讨论】:

  • 你是如何执行应用程序的。如果您使用集群模式,则需要确保凭据在环境中可用。
  • 我正在使用 eclipse 作为 Scala 应用程序运行。环境变量也设置为 AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY

标签: scala csv apache-spark amazon-s3


【解决方案1】:

其他人为此苦苦挣扎,我不得不更新 hadoop-client 的版本

另外,下面的链接也很有帮助

pom 详情如下

<properties>
    <spark.version>2.2.0</spark.version>
    <hadoop.version>2.8.0</hadoop.version>

</properties>


<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>${hadoop.version}</version>
    </dependency>

【讨论】:

    猜你喜欢
    • 2015-12-04
    • 1970-01-01
    • 1970-01-01
    • 2021-12-07
    • 2017-01-20
    • 2021-12-15
    • 2016-09-13
    • 2023-03-26
    • 1970-01-01
    相关资源
    最近更新 更多