【问题标题】:Spark - Why is it necessary to collect() to the driver node before printing an RDD? Can it not be done in parallel?Spark - 为什么在打印 RDD 之前需要收集()到驱动程序节点?不能并行吗?
【发布时间】:2019-01-04 22:03:56
【问题描述】:

我正在阅读有关如何在 Spark 中打印 RDD(我正在使用 Java)的信息,似乎大多数人只是 collect()(如果 RDD 足够小)并使用 forall(println) 或类似的东西.不能并行打印吗?为什么我们必须将数据收集到驱动节点上才能打印?

我在想也许是因为我们不能并行使用 System.out,但我觉得不是这样。此外,就代码而言,我不太确定如何分配数据并并行打印。我想到的一种方法是做一个 mappartitions,它在映射方面没有做任何有用的事情,但它会遍历分区并打印其内容。

【问题讨论】:

  • 这是关于你想在哪里看到输出。您可以在工作人员上打印,但这会将输出留在工作人员上(可能在日志文件中)。但您通常希望在驱动程序机器上查看输出。
  • 啊,我明白了,这是有道理的。如果我想以分布式方式进行(所以在工作日志中打印),是否只需遍历 RDD 并打印就足够了?我问这个是因为我想做一些有类似动机的事情;我试图在我的每个工作节点内(因此在分区内)将我的 RDD 的元素一起批处理,不太确定如何处理这个问题。
  • 嗨@ernest_k 我想知道它是否只是关于打印,因为如果是这样,那么如何在笔记本中看到 select() 的输出,select() 是否在内部调用collect()
  • @AnkitAgrawal 我相信笔记本会隐式打印。在这种情况下,他们确实会向司机收集数据。我猜测是因为我没有使用打印数据的笔记本(而不是数据框或 rdd 对象的字符串)。关键是print 可以在具有本地数据的工作人员或具有收集数据的驱动程序上运行,具体取决于调用方式/位置。而在笔记本运行select后显示数据的情况下,数据会被收集到驱动程序中。
  • @ernest_k And in the case of the notebook showing data after running select, the data is collected to the driver. 所以你的意思是select() 确实在内部调用collect() 或者对于worker 上的任何其他语句都是如此?

标签: java apache-spark parallel-processing rdd


【解决方案1】:

当您调用collect() 方法时,您会将所有结果返回给驱动程序节点。您将拥有List 而不是RDD。让我们看一个本地模式的例子。假设你有一个整数 RDD:

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));

如果您调用foreach 方法(Java 中的stream().forEach()),驱动程序节点将按照您创建它的顺序打印RDD 中的所有元素。

rdd.collect().stream().forEach(x -> System.out.println(x));

Output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

如果你想在每个worker上打印结果,你必须调用RDDforeach方法。它不会向驱动程序返回任何内容,只会在每个工作节点上执行您在 foreach 方法中指定的计算。

rdd.foreach(x -> System.out.println(x));

如果您看到控制台(本地模式),您会注意到 System.out.println(x) 已在单独的线程中执行,因为输出不遵守原始顺序:

Output: 6, 3, 2, 1, 8, 9, 10, 4, 5, 7

所以如果你以分布式模式执行它,每个执行器都会在其日志文件上打印System.out.println 操作的结果。

您还提到了mapPartitions 方法。在您的情况下,我发现它没有比直接在RDD 上使用foreach 更有用。这可能有助于控制工人的工作量。

 rdd.repartition(5).mapPartitions(x -> {
     while(x.hasNext()){
         Integer i = x.next();
         System.out.println(i);
     }
     return x;
 }).count(); // Count is just to force the execution of mapPartition (mapPartition is lazy and doesn't get executed until an action is called)

希望对你有帮助!

【讨论】:

  • 我明白了,感谢您深思熟虑的解释,您回答了这个问题。如果可以的话,半相关的问题,如果我需要将条目一起批量放入列表中(作为一个简化示例),您认为使用 for each 对我仍然有用吗?因此,使用您的示例,如果我想将我的 JavaRDD 更改为 JavaRDD> ,其中每个 List 由 100 个整数组成?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-30
  • 1970-01-01
  • 1970-01-01
  • 2015-04-28
  • 1970-01-01
相关资源
最近更新 更多