可以通过使用不同线程提交作业来并行执行三个管道,但这将通过 RDD 3 次,并且在集群上需要多达 3 倍的资源。
可以通过重写作业以一次处理所有计数来一次性完成作业 - 关于aggregate 的答案是一个选项。将数据成对拆分(keep, foo) (keep, bar), (keep, baz) 将是另一种方法。
不可能在不更改任何代码的情况下一次性完成工作,因为 Spark 无法知道这些工作与同一数据集相关。最多可以通过caching rdd.cache 和rdd.cache 在.filter().map().reduce() 步骤之前的初始rdd 来提高第一个作业之后的后续作业的速度;这仍然会通过 RDD 3 次,但是如果所有数据都适合集群的内存,那么第 2 次和第 3 次可能会快很多:
rdd.cache
// first reduceByKey action will trigger the cache and rdd data will be kept in memory
val foo = rdd.filter(fooFilter).map(fooMap).reduceByKey(???)
// subsequent operations will execute faster as the rdd is now available in mem
val bar = rdd.filter(barFilter).map(barMap).reduceByKey(???)
val baz = rdd.filter(bazFilter).map(bazMap).reduceByKey(???)
如果我这样做,我会创建成对的相关数据并一次性计算它们:
// We split the initial tuple into pairs keyed by the data type ("foo", "bar", "baz") and the keep information. dataPairs will contain data like: (("bar",true),1), (("foo",false),1)
val dataPairs = rdd.flatmap{case (keep, foo, bar, baz) =>
def condPair(name:String, x:Int):Option[((String,Boolean), Int)] = if (x==1) Some(((name,keep),x)) else None
Seq(condPair("foo",foo), condPair("bar",bar), condPair("baz",baz)).flatten
}
val totals = dataPairs.reduceByKey(_ + _)
这简单并且只会传递一次数据,但需要重写代码。我会说它在回答问题时得分 66,66%。