在这种情况下,我认为影响结果的是 Spark 驱动程序将执行的操作,而不是编译器。 Spark 是否可以优化其执行管道以避免创建s 的冗余重复。我不确定,但我认为 Spark 会在内存中创建 rdd1pairs。
您可以使用(String, Unit),而不是映射到(String, String):
rdd1.map(s => (s,()))
您所做的基本上是基于rdd1 的rdd2 过滤器。如果rdd1明显小于rdd2,另一种方法是将rdd1的数据表示为广播变量而不是RDD,并简单地过滤rdd2。这避免了任何 shuffle 或 reduce 阶段,因此可能更快,但仅当 rdd1 的数据足够小以适合每个节点时才有效。
编辑:
考虑使用 Unit 而不是 String 如何节省空间,请考虑以下示例:
object size extends App {
(1 to 1000000).map(i => ("foo"+i, ()))
val input = readLine("prompt> ")
}
和
object size extends App {
(1 to 1000000).map(i => ("foo"+i, "foo"+i))
val input = readLine("prompt> ")
}
使用这个问题How to check heap usage of a running JVM from the command line? 中描述的jstat 命令,第一个版本使用的堆比后者少得多。
编辑 2:
Unit 实际上是一个没有内容的单例对象,因此从逻辑上讲,它不需要任何序列化。类型定义包含 Unit 的事实告诉您能够反序列化具有 Unit 类型字段的结构。
Spark 默认使用 Java 序列化。考虑以下几点:
object Main extends App {
import java.io.{ObjectOutputStream, FileOutputStream}
case class Foo (a: String, b:String)
case class Bar (a: String, b:String, c: Unit)
val str = "abcdef"
val foo = Foo("abcdef", "xyz")
val bar = Bar("abcdef", "xyz", ())
val fos = new FileOutputStream( "foo.obj" )
val fo = new ObjectOutputStream( fos )
val bos = new FileOutputStream( "bar.obj" )
val bo = new ObjectOutputStream( bos )
fo writeObject foo
bo writeObject bar
}
两个文件大小相同:
�� sr Main$Foo3�,�z \ L at Ljava/lang/String;L bq ~ xpt abcdeft xyz
和
�� sr Main$Bar+a!N��b L at Ljava/lang/String;L bq ~ xpt abcdeft xyz