【问题标题】:How do I determine an offset in Apache Spark?如何确定 Apache Spark 中的偏移量?
【发布时间】:2015-10-21 16:55:26
【问题描述】:

我正在搜索一些数据文件 (~20GB)。我想在该数据中找到一些特定的术语并标记匹配的偏移量。有没有办法让 Spark 识别我正在操作的数据块的偏移量?

import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

import java.util.regex.*;

public class Grep {
        public static void main( String args[] ) {
            SparkConf        conf       = new SparkConf().setMaster( "spark://ourip:7077" );
            JavaSparkContext jsc        = new JavaSparkContext( conf );
            JavaRDD<String>  data       = jsc.textFile( "hdfs://ourip/test/testdata.txt" ); // load the data from HDFS
            JavaRDD<String>  filterData = data.filter( new Function<String, Boolean>() {
                    // I'd like to do something here to get the offset in the original file of the string "babe ruth"
                    public Boolean call( String s ) { return s.toLowerCase().contains( "babe ruth" ); } // case insens matching

            });

            long matches = filterData.count();  // count the hits

            // execute the RDD filter
            System.out.println( "Lines with search terms: " + matches );
 );
        } //  end main
} // end class Grep

我想在“filter”操作中做一些事情来计算原始文件中“babe ruth”的偏移量。我可以得到当前行中“babe ruth”的偏移量,但是告诉我该行在文件中的偏移量的过程或函数是什么?

【问题讨论】:

  • 我不确定这里的偏移是什么意思。你能说得更具体点吗?
  • 我正在寻找文件中的字节偏移量。例如,如果我有文本: 1 a
    2 b
    3 c
    我想在文件中找到字符“b”的字节偏移量。 (本例为6(空格+'\n')。如果Spark不参与这个过程,这很简单,但是当Spark读取这些文件时,它们会被分块成行。所以,代码上面可能会收到“2 b”作为输入。它可以计算相对于该行的字节偏移量,但是如何获得相对于文件的字节偏移量?

标签: java apache-spark offset


【解决方案1】:

在 Spark 常见的Hadoop 输入格式 中可以使用。要从文件中读取字节偏移量,您可以使用 Hadoop 中的 TextInputFormat 类 (org.apache.hadoop.mapreduce.lib.input)。它已经与 Spark 捆绑在一起。

它将文件读取为key(字节偏移)和value(文本行):

纯文本文件的 InputFormat。文件被分成几行。换行或回车用于表示行结束。键是文件中的位置,值是文本行。

在Spark中可以通过调用newAPIHadoopFile()来使用

SparkConf conf = new SparkConf().setMaster("");
JavaSparkContext jsc = new JavaSparkContext(conf);

// read the content of the file using Hadoop format
JavaPairRDD<LongWritable, Text> data = jsc.newAPIHadoopFile(
        "file_path", // input path
        TextInputFormat.class, // used input format class
        LongWritable.class, // class of the value
        Text.class, // class of the value
        new Configuration());    

JavaRDD<String> mapped = data.map(new Function<Tuple2<LongWritable, Text>, String>() {
    @Override
    public String call(Tuple2<LongWritable, Text> tuple) throws Exception {
        // you will get each line from as a tuple (offset, text)    
        long pos = tuple._1().get(); // extract offset
        String line = tuple._2().toString(); // extract text

        return pos + " " + line;
    }
});

【讨论】:

  • 这是我正在寻找的功能。这绝对有效。谢谢!
【解决方案2】:

您可以使用JavaSparkContext 中的wholeTextFiles(String path, int minPartitions) 方法返回JavaPairRDD&lt;String,String&gt;,其中键是文件名,值是包含文件全部内容的字符串(因此,此RDD 中的每条记录都代表一个文件)。从这里,只需运行一个map(),它将在每个值上调用indexOf(String searchString)。这应该返回每个文件中出现相关字符串的第一个索引。

(编辑:)

因此,可以以分布式方式为一个文件(根据您在 cmets 中的用例)找到偏移量。下面是一个适用于 Scala 的示例。

val searchString = *search string*
val rdd1 = sc.textFile(*input file*, *num partitions*)

// Zip RDD lines with their indices
val zrdd1 = rdd1.zipWithIndex()

// Find the first RDD line that contains the string in question
val firstFind = zrdd1.filter { case (line, index) => line.contains(searchString) }.first()

// Grab all lines before the line containing the search string and sum up all of their lengths (and then add the inline offset)
val filterLines = zrdd1.filter { case (line, index) => index < firstFind._2 }
val offset = filterLines.map { case (line, index) => line.length }.reduce(_ + _) + firstFind._1.indexOf(searchString)

请注意,您还需要在此之上手动添加任何换行符,因为它们没有被考虑在内(输入格式使用换行符作为记录之间的分界线)。新行数只是包含搜索字符串的行之前的行数,因此添加起来很简单。

不幸的是,我对 Java API 并不完全熟悉,而且测试起来也不是很容易,所以我不确定下面的代码是否有效但可以使用(另外,我使用了 Java 1.7,但 1.8 压缩了很多带有 lambda 表达式的代码。):

String searchString = *search string*;
JavaRDD<String> data = jsc.textFile("hdfs://ourip/test/testdata.txt");

JavaRDD<Tuple2<String, Long>> zrdd1 = data.zipWithIndex();

Tuple2<String, Long> firstFind = zrdd1.filter(new Function<Tuple2<String, Long>, Boolean>() {
      public Boolean call(Tuple2<String, Long> input) { return input.productElement(0).contains(searchString); }
  }).first();

JavaRDD<Tuple2<String, Long>> filterLines = zrdd1.filter(new Function<Tuple2<String, Long>, Boolean>() {
      public Boolean call(Tuple2<String, Long> input) { return input.productElement(1) < firstFind.productElement(1); }
  });

Long offset = filterLines.map(new Function<Tuple2<String, Long>, Int>() {
      public Int call(Tuple2<String, Long> input) { return input.productElement(0).length(); }
  }).reduce(new Function2<Integer, Integer, Integer>() {
      public Integer call(Integer a, Integer b) { return a + b; }
  }) + firstFind.productElement(0).indexOf(searchString);

这只能在您的输入是 一个 文件时完成(否则,zipWithIndex() 不会保证文件中的偏移量)但是此方法适用于任意数量分区的 RDD,因此随意将您的文件分成任意数量的块。

【讨论】:

  • 所以,我的情况是有一个~20GB 的文件。在您描述的方法中,在这种情况下,每个 Spark 节点都会收到该文件的副本吗?在这种情况下,这不会从本质上破坏 Spark 的意义吗?我希望仍然有许多节点可以从文件中的各种偏移量中快速获得答案。从理论上讲,Spark 的子系统知道我正在处理的块的偏移量+长度。有什么方法可以访问它吗?
  • 嘿@Gary,我用我认为可以帮助您的用例的内容更新了我的回复。让我知道这是否有帮助!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-11-28
  • 2011-08-04
  • 2017-07-22
  • 1970-01-01
  • 2010-11-19
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多