【问题标题】:How to store and read data from Spark PairRDD如何从 Spark PairRDD 存储和读取数据
【发布时间】:2015-05-28 07:38:30
【问题描述】:

Spark PairRDD 可以选择保存文件。

JavaRDD<String> baseRDD = context.parallelize(Arrays.asList("This", "is", "dummy", "data"));

JavaPairRDD<String, Integer> myPairRDD =
    baseRDD.mapToPair(new PairFunction<String, String, Integer>() {

      @Override
      public Tuple2<String, Integer> call(String input) throws Exception {
        // TODO Auto-generated method stub
        return new Tuple2<String, Integer>(input, input.length());
      }
    });

myPairRDD.saveAsTextFile("path");

Spark 上下文 textfile 仅将数据读取到 JavaRDD。

如何直接从源重构PairRDD?

注意:

  • 可能的方法是将数据读取到JavaRDD&lt;String&gt;并构造JavaPairRDD

但是对于海量数据,它会占用大量资源。

  • 以非文本格式存储这个中间文件也可以。

  • 执行环境 - JRE 1.7

【问题讨论】:

  • 如果您不介意结果文件不是人类可读的,您可以将它们保存为目标文件。
  • 是的,目标文件也可以。

标签: apache-spark


【解决方案1】:

如果您不介意结果文件不是人类可读的,您可以将它们保存为目标文件。

保存文件:

myPairRDD.saveAsObjectFile(path);

然后你可以像这样阅读对:

JavaPairRDD.fromJavaRDD(sc.objectFile(path))

编辑:

工作示例:

JavaRDD<String> rdd = sc.parallelize(Lists.newArrayList("1", "2"));
rdd.mapToPair(p -> new Tuple2<>(p, p)).saveAsObjectFile("c://example");
JavaPairRDD<String, String> pairRDD 
    = JavaPairRDD.fromJavaRDD(sc.objectFile("c://example"));
pairRDD.collect().forEach(System.out::println);

【讨论】:

  • JavaPairRDD.fromJavaRDD 可以将带有元组的 JavaRDD 转换为 JavaPairRDD。请找到源代码github.com/apache/spark/blob/…
  • 我收到错误消息:JavaPairRDD 类型中的方法 fromJavaRDD(JavaRDD>) 不适用于参数 (JavaRDD)
  • @VijayInnamuri 我不太明白您的问题是什么,请添加工作示例。
  • +1 获得了很棒的答案。这适用于 Java 8 但不适用于 Java 7.. 上述错误是由于此 Java 版本兼容性..
  • @user52045 我们必须使用 JavaPairRDD 函数保存文件吗?我正在尝试使用 JavaRDD 并对其进行迭代。我不想在系统中保存那个rdd
【解决方案2】:

在这种情况下,将 Spark PairRDD 存储在 Sequence 文件中效果很好。

JavaRDD<String> baseRDD = context.parallelize(Arrays.asList("This", "is", "dummy", "data"));

JavaPairRDD<Text, IntWritable> myPairRDD = baseRDD.mapToPair(new PairFunction<String, Text, IntWritable>() {

  @Override
  public Tuple2<Text, IntWritable> call(String input) throws Exception {
    // TODO Auto-generated method stub
    return new Tuple2<Text, IntWritable>(new Text(input), new IntWritable(input.length()));
  }
});

myPairRDD.saveAsHadoopFile(path , Text.class, IntWritable.class,
    SequenceFileOutputFormat.class);

JavaPairRDD<Text, IntWritable> newbaseRDD =
    context.sequenceFile(path , Text.class, IntWritable.class);

// Verify the data
System.out.println(myPairRDD.collect());
newbaseRDD.foreach(new VoidFunction<Tuple2<Text, IntWritable>>() {
  @Override
  public void call(Tuple2<Text, IntWritable> arg0) throws Exception {
    System.out.println(arg0);
  }
});

正如 user52045 所建议的,以下代码适用于 Java 8。

myPairRDD.saveAsObjectFile(path);
JavaPairRDD<String, String> objpairRDD = JavaPairRDD.fromJavaRDD(context.objectFile(path));
objpairRDD.collect().forEach(System.out::println);

【讨论】:

    【解决方案3】:

    使用 Scala 的示例:

    读取文本文件并将其保存为对象文件格式

    val ordersRDD = sc.textFile("/home/cloudera/orders.txt");
    ordersRDD.count();
    ordersRDD.saveAsObjectFile("orders_save_obj");
    

    读取目标文件并将其保存为文本文件格式:

    val ordersRDD = sc.objectFile[String]("/home/cloudera/orders.txt");
    ordersRDD.count();
    ordersRDD.saveAsTextFile("orders_save_text");
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-06-29
      • 2011-12-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多