【问题标题】:How to load local file in sc.textFile, instead of HDFS如何在 sc.textFile 中加载本地文件,而不是 HDFS
【发布时间】:2015-02-02 16:20:15
【问题描述】:

我正在关注伟大的spark tutorial

所以我试图在 46m:00s 加载 README.md 但我正在做的是失败:

$ sudo docker run -i -t -h sandbox sequenceiq/spark:1.1.0 /etc/bootstrap.sh -bash
bash-4.1# cd /usr/local/spark-1.1.0-bin-hadoop2.4
bash-4.1# ls README.md
README.md
bash-4.1# ./bin/spark-shell
scala> val f = sc.textFile("README.md")
14/12/04 12:11:14 INFO storage.MemoryStore: ensureFreeSpace(164073) called with curMem=0, maxMem=278302556
14/12/04 12:11:14 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 160.2 KB, free 265.3 MB)
f: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at <console>:12
scala> val wc = f.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://sandbox:9000/user/root/README.md
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)

如何加载README.md

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    试试

    val f = sc.textFile("./README.md")
    

    【讨论】:

    • scala&gt; val f = sc.textFile("./README.md") 14/12/04 12:54:33 INFO storage.MemoryStore: ensureFreeSpace(81443) called with curMem=164073, maxMem=278302556 14/12/04 12:54:33 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 79.5 KB, free 265.2 MB) f: org.apache.spark.rdd.RDD[String] = ./README.md MappedRDD[5] at textFile at &lt;console&gt;:12 scala&gt; val wc = f.flatMap(l =&gt; l.split(" ")).map(word =&gt; (word, 1)).reduceByKey(_ + _) org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://sandbox:9000/user/root/README.md at
    • 你能在 bash shell 上做一个pwd bash-4.1#
    • bash-4.1# pwd /usr/local/spark-1.1.0-bin-hadoop2.4
    • 这适用于我在没有 hadoop/hdfs 的 spark 上。但是,它似乎不适用于 OP,因为它给了他们一个错误转储。
    【解决方案2】:

    尝试明确指定sc.textFile("file:///path to the file/")。设置Hadoop环境时发生错误。

    SparkContext.textFile 在内部调用org.apache.hadoop.mapred.FileInputFormat.getSplits,如果架构不存在,则反过来使用org.apache.hadoop.fs.getDefaultUri。此方法读取 Hadoop conf 的“fs.defaultFS”参数。如果设置HADOOP_CONF_DIR环境变量,参数通常设置为“hdfs://...”;否则为“file://”。

    【讨论】:

    • 你碰巧知道如何用 Java 做到这一点吗?我没有看到方法。发现没有一种简单的方法可以提供从简单文件系统加载文件的路径,这非常令人沮丧。
    • 回答我自己。您可以通过 spark-submit 传递一个 --file 开关。因此,文件路径可以是硬编码的,或者您的配置是为应用程序设置的,但您也可以发出该路径的信号。当您提交时,以便执行者可以看到路径。
    • 当我在 Windows 上指定路径时,为什么 "file:///C:\\Xiang\\inputfile"file:////C:\\Xiang\\inputfile 都有效,而 "file://C:\\Xiang\\inputfile 在 Java 代码中无效。在Linux上怎么样?前缀应该是file:///(三个斜线)还是file:////(四个斜线)? file://// 也适用于 linux 吗?
    • 我查了源码,是static final URI NAME = URI.create("file:///");,所以我想应该硬编码为file:///(三个斜线)作为前缀。但是我还是不明白为什么file:////(四个斜线)也可以。
    • @YuXiang 是否要在源代码行(在 GitHub 中)添加链接?
    【解决方案3】:

    这已在 spark 邮件列表中讨论,请参考此mail

    你应该使用hadoop fs -put &lt;localsrc&gt; ... &lt;dst&gt;将文件复制到hdfs

    ${HADOOP_COMMON_HOME}/bin/hadoop fs -put /path/to/README.md README.md
    

    【讨论】:

      【解决方案4】:

      gonbe 的回答非常好。但我还是想提一下file:/// = ~/../../,而不是$SPARK_HOME。希望这可以为像我这样的新手节省一些时间。

      【讨论】:

      • file:/// 是执行 JVM 所看到的文件系统的根文件夹,而不是主文件夹之上的两层。 RFC 8089 中指定的 URI 格式为 file://hostname/absolute/path。在本地情况下,hostname(权限)组件为空。
      【解决方案5】:

      这是我在 Windows 集群上托管在 Azure 中的 Spark 集群上遇到的此错误的解决方案:

      加载原始 HVAC.csv 文件,使用函数解析它

      data = sc.textFile("wasb:///HdiSamples/SensorSampleData/hvac/HVAC.csv")
      

      我们使用 (wasb:///) 来允许 Hadoop 访问 azure 博客存储文件,三个斜线是对正在运行的节点容器文件夹的相对引用。

      例如:如果您的文件在 Spark 集群仪表板的文件资源管理器中的路径是:

      sflcc1\sflccspark1\HdiSamples\SensorSampleData\hvac

      所以描述路径如下: sflcc1:是存储账户的名称。 sflccspark:是集群节点名。

      所以我们用相对的三个斜杠来引用当前集群节点名称。

      希望这会有所帮助。

      【讨论】:

        【解决方案6】:

        您只需将文件的路径指定为 "file:///directory/file"

        示例:

        val textFile = sc.textFile("file:///usr/local/spark/README.md")
        

        【讨论】:

          【解决方案7】:

          我的桌面上有一个名为 NewsArticle.txt 的文件。

          在 Spark 中,我输入:

          val textFile= sc.textFile(“file:///C:/Users/582767/Desktop/NewsArticle.txt”)
          

          我需要将文件路径的所有 \ 更改为 / 字符。

          为了测试它是否有效,我输入了:

          textFile.foreach(println)
          

          我运行的是 Windows 7,但没有安装 Hadoop。

          【讨论】:

            【解决方案8】:

            如果文件位于您的 Spark 主节点中(例如,在使用 AWS EMR 的情况下),则首先在本地模式下启动 spark-shell。

            $ spark-shell --master=local
            scala> val df = spark.read.json("file:///usr/lib/spark/examples/src/main/resources/people.json")
            df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
            
            scala> df.show()
            +----+-------+
            | age|   name|
            +----+-------+
            |null|Michael|
            |  30|   Andy|
            |  19| Justin|
            +----+-------+
            

            或者,您可以先将文件从本地文件系统复制到 HDFS,然后以默认模式启动 Spark(例如,使用 AWS EMR 时使用 YARN)直接读取文件。

            $ hdfs dfs -mkdir -p /hdfs/spark/examples
            $ hadoop fs -put /usr/lib/spark/examples/src/main/resources/people.json /hdfs/spark/examples
            $ hadoop fs -ls /hdfs/spark/examples
            Found 1 items
            -rw-r--r--   1 hadoop hadoop         73 2017-05-01 00:49 /hdfs/spark/examples/people.json
            
            $ spark-shell
            scala> val df = spark.read.json("/hdfs/spark/examples/people.json")
            df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
            
            scala> df.show()
            +----+-------+
            | age|   name|
            +----+-------+
            |null|Michael|
            |  30|   Andy|
            |  19| Justin|
            +----+-------+
            

            【讨论】:

            • 唯一告诉您如何在本地模式下启动的答案。这个需要更多的支持。
            【解决方案9】:

            注意:

            确保在从本地(sc.textFile("file:///path to the file/"))加载数据时以本地模式运行 spark,否则您将收到类似 Caused by: java.io.FileNotFoundException: File file:/data/sparkjob/config2.properties does not exist 的错误。 因为在不同 worker 上运行的 executor 不会在它的本地路径中找到这个文件。

            【讨论】:

            • 我们可以在驱动中的本地文件上运行spark独立模式(一个节点上的驱动程序,其他节点上的执行程序)吗?或者我应该在所有节点上都存在本地文件?
            【解决方案10】:

            如果您尝试从 HDFS 读取文件。尝试在 SparkConf 中设置路径

             val conf = new SparkConf().setMaster("local[*]").setAppName("HDFSFileReader")
             conf.set("fs.defaultFS", "hdfs://hostname:9000")
            

            【讨论】:

            • 请在您的代码中添加 4 个空格/制表符缩进,以便将其格式化为代码。最好的问候
            【解决方案11】:

            虽然 Spark 支持从本地文件系统加载文件,但它要求文件在集群中所有节点上的相同路径中可用。

            一些网络文件系统,如 NFS、AFS 和 MapR 的 NFS 层,作为常规文件系统向用户公开。

            如果您的数据已经在其中一个系统中,那么您只需指定 file:// 路径即可将其用作输入;只要文件系统安装在每个节点上的相同路径,Spark 就会处理它。每个节点都需要有相同的路径

             rdd = sc.textFile("file:///path/to/file")
            

            如果您的文件尚未在集群中的所有节点上,您可以将其本地加载到驱动程序上,而无需通过 Spark,然后调用并行化将内容分发给工作人员

            注意将file://放在前面,并根据操作系统使用“/”或“\”。

            【讨论】:

            • Spark 有没有办法自动将 $SPARK_HOME 目录中的数据复制到所有计算节点。还是您需要手动执行?
            • 处理不同文件系统格式的 spark 源代码在哪里?
            【解决方案12】:

            这发生在我身上,带有 Hadoop 的 Spark 2.3 也安装在公共“hadoop”用户主目录下。由于 Spark 和 Hadoop 都安装在同一个公共目录下,Spark 默认情况下认为该方案为hdfs,并且开始在 Hadoop 的 core-site.xml 中查找 fs.defaultFS 指定的 hdfs 下的输入文件。在这种情况下,我们需要明确指定方案为file:///&lt;absoloute path to file&gt;

            【讨论】:

              【解决方案13】:

              您不必使用 sc.textFile(...) 将本地文件转换为数据帧。一种选择是,逐行读取本地文件,然后将其转换为 Spark 数据集。以下是 Java 中 Windows 机器的示例:

              StructType schemata = DataTypes.createStructType(
                          new StructField[]{
                                  createStructField("COL1", StringType, false),
                                  createStructField("COL2", StringType, false),
                                  ...
                          }
                  );
              
              String separator = ";";
              String filePath = "C:\\work\\myProj\\myFile.csv";
              SparkContext sparkContext = new SparkContext(new SparkConf().setAppName("MyApp").setMaster("local"));
              JavaSparkContext jsc = new JavaSparkContext (sparkContext );
              SQLContext sqlContext = SQLContext.getOrCreate(sparkContext );
              
              List<String[]> result = new ArrayList<>();
              try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
                  String line;
                  while ((line = br.readLine()) != null) {
                    String[] vals = line.split(separator);
                    result.add(vals);
                  }
               } catch (Exception ex) {
                     System.out.println(ex.getMessage());
                     throw new RuntimeException(ex);
                }
                JavaRDD<String[]> jRdd = jsc.parallelize(result);
                JavaRDD<Row> jRowRdd = jRdd .map(RowFactory::create);
                Dataset<Row> data = sqlContext.createDataFrame(jRowRdd, schemata);
              

              现在您可以在代码中使用数据框data

              【讨论】:

                【解决方案14】:

                我尝试了以下方法,它在我的本地文件系统中运行。基本上 spark 可以从本地、HDFS 和 AWS S3 路径读取

                listrdd=sc.textFile("file:////home/cloudera/Downloads/master-data/retail_db/products")
                

                【讨论】:

                  猜你喜欢
                  • 2017-01-25
                  • 2017-05-11
                  • 2017-11-19
                  • 2023-04-09
                  • 2016-05-09
                  • 1970-01-01
                  • 2021-10-19
                  • 1970-01-01
                  • 2021-11-22
                  相关资源
                  最近更新 更多