【问题标题】:How to Get the file name for record in spark RDD (JavaRDD)如何在 spark RDD (JavaRDD) 中获取记录的文件名
【发布时间】:2015-12-04 14:11:40
【问题描述】:

我正在使用将多个文件加载到 JavaRDD 中

JavaRDD<String> allLines = sc.textFile(hdfs://path/*.csv);

加载文件后,我修改了每条记录并想保存它们。但是,我还需要将原始文件名 (ID) 与记录一起保存以供将来参考。无论如何,我可以从 RDD 中的单个记录中获取原始文件名吗? 谢谢

【问题讨论】:

    标签: java hadoop apache-spark hdfs


    【解决方案1】:

    您可以尝试在以下 sn-p 中执行类似操作:

    JavaPairRDD<LongWritable, Text> javaPairRDD = sc.newAPIHadoopFile(
        "hdfs://path/*.csv", 
        TextInputFormat.class, 
        LongWritable.class, 
        Text.class, 
        new Configuration()
    );
    JavaNewHadoopRDD<LongWritable, Text> hadoopRDD = (JavaNewHadoopRDD) javaPairRDD;
    
    JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, lines) -> {
        FileSplit fileSplit = (FileSplit) inputSplit;
        String fileName = fileSplit.getPath().getName();
    
        Stream<Tuple2<String, String>> stream =
            StreamSupport.stream(Spliterators.spliteratorUnknownSize(lines, Spliterator.ORDERED), false)
                .map(line -> {
                    String lineText = line._2().toString();
                    // emit file name as key and line as a value
                    return new Tuple2(fileName, lineText);
                });
        return stream.iterator();
    }, true);
    

    更新(针对 java7)

    JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit(
        new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<String, String>>>() {
            @Override
            public Iterator<Tuple2<String, String>> call(InputSplit inputSplit, final Iterator<Tuple2<LongWritable, Text>> lines) throws Exception {
                FileSplit fileSplit = (FileSplit) inputSplit;
                final String fileName = fileSplit.getPath().getName();
                return new Iterator<Tuple2<String, String>>() {
                    @Override
                    public boolean hasNext() {
                        return lines.hasNext();
                    }
                    @Override
                    public Tuple2<String, String> next() {
                        Tuple2<LongWritable, Text> entry = lines.next();
                        return new Tuple2<String, String>(fileName, entry._2().toString());
                    }
                };
            }
        }, 
        true
    );
    

    【讨论】:

    • Stream 在 Java 7 中似乎不可用 :( 有没有替代品?thx
    【解决方案2】:

    你想要 spark 的 wholeTextFiles 函数。来自文档:

    For example, if you have the following files:
    
       hdfs://a-hdfs-path/part-00000
       hdfs://a-hdfs-path/part-00001
       ...
       hdfs://a-hdfs-path/part-nnnnn
    
    Do val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path"),
    
    then rdd contains
    
       (a-hdfs-path/part-00000, its content)
       (a-hdfs-path/part-00001, its content)
       ...
       (a-hdfs-path/part-nnnnn, its content)
    

    它返回一个元组的 RDD,其中左边是文件名,右边是内容。

    【讨论】:

    • 感谢您的回复,我可以使用它,但是似乎整个文本文件并没有真正从每个文件中创建 RDD,并且正如 Gillespie 提到的那样,它都被读取为一个元素
    • Wholtextfile() 适用于较小的文件,但如果文件很大,则将是不利的,因为每个文件都作为单个记录放在 RDD 中。
    【解决方案3】:

    您应该可以使用toDebugString。使用 wholeTextFile 会将文件的全部内容作为一个元素读入,而 sc.textfile 会创建一个 RDD,每行作为一个单独的元素 - 如 here 所述。

    例如:

    val file= sc.textFile("/user/user01/whatever.txt").cache()
    
    val wordcount = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    
    wordcount.toDebugString
    
    // res0: String =
    
    // (2) ShuffledRDD[4] at reduceByKey at <console>:23 []
    
    // +-(2) MapPartitionsRDD[3] at map at <console>:23 []
    
    //    |  MapPartitionsRDD[2] at flatMap at <console>:23 []
    
    //    |  /user/user01/whatever.txt MapPartitionsRDD[1] at textFile at <console>:21 []
    
    //    |  /user/user01/whatever.txt HadoopRDD[0] at textFile at <console>:21 []
    

    【讨论】:

    • 我试过这个,不幸的是,如果你给文本文件一个模式,即*.txt,它不会显示文件名而只显示模式,所以调试字符串似乎不是真的有用。
    • 您是否在应用转换的 val 上调用它?像上面例子中的字数一样?这应该真正提供有关加载行的来源的信息..
    • 现在用过滤器再次尝试,结果是一样的:(1) MapPartitionsRDD[2] at filter at xxx.java:69 [] | MapPartitionsRDD[1] at textFile at xxx.java:68 [] | hdfs://localhost:8020/user/xxxxx/*.txt HadoopRDD[0] at textFile at xxx.java:68 []
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-03
    • 1970-01-01
    • 2021-07-29
    • 1970-01-01
    • 1970-01-01
    • 2023-04-04
    相关资源
    最近更新 更多