【问题标题】:Are multiple reduceByKey on the same RDD compiled into a single scan?同一个 RDD 上的多个 reduceByKey 是否编译为一次扫描?
【发布时间】:2015-03-23 03:44:09
【问题描述】:

假设我有一个 RDD(50M 记录/dayredu),我想以几种不同的方式进行总结。 RDD 记录是 4 元组:(keep, foo, bar, baz)

  • keep - 布尔值
  • foobarbaz - 0/1 整数

我想计算每个foo &c 有多少被保留和丢弃,即我必须为foo 执行以下操作(对于barbaz 也是如此):

rdd.filter(lambda keep, foo, bar, baz: foo == 1)
   .map(lambda keep, foo, bar, baz: keep, 1)
   .reduceByKey(operator.add)

这将返回(在collect 之后)类似[(True,40000000),(False,10000000)] 的列表。

问题是:有没有一种简单的方法可以避免扫描rdd 3 次(foobarbaz 各扫描一次)?

我的意思是不是一种重写上述代码来处理所有 3 个字段的方法,而是告诉 spark 一次处理所有 3 个管道。

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    可以通过使用不同线程提交作业来并行执行三个管道,但这将通过 RDD 3 次,并且在集群上需要多达 3 倍的资源。

    可以通过重写作业以一次处理所有计数来一次性完成作业 - 关于aggregate 的答案是一个选项。将数据成对拆分(keep, foo) (keep, bar), (keep, baz) 将是另一种方法。

    不可能在不更改任何代码的情况下一次性完成工作,因为 Spark 无法知道这些工作与同一数据集相关。最多可以通过caching rdd.cacherdd.cache.filter().map().reduce() 步骤之前的初始rdd 来提高第​​一个作业之后的后续作业的速度;这仍然会通过 RDD 3 次,但是如果所有数据都适合集群的内存,那么第 2 次和第 3 次可能会快很多:

    rdd.cache
    // first reduceByKey action will trigger the cache and rdd data will be kept in memory
    val foo = rdd.filter(fooFilter).map(fooMap).reduceByKey(???)
    // subsequent operations will execute faster as the rdd is now available in mem
    val bar = rdd.filter(barFilter).map(barMap).reduceByKey(???)
    val baz = rdd.filter(bazFilter).map(bazMap).reduceByKey(???)
    

    如果我这样做,我会创建成对的相关数据并一次性计算它们:

    // We split the initial tuple into pairs keyed by the data type ("foo", "bar", "baz") and the keep information. dataPairs will contain data like: (("bar",true),1), (("foo",false),1)
    
    val dataPairs = rdd.flatmap{case (keep, foo, bar, baz) => 
        def condPair(name:String, x:Int):Option[((String,Boolean), Int)] = if (x==1) Some(((name,keep),x)) else None
        Seq(condPair("foo",foo), condPair("bar",bar), condPair("baz",baz)).flatten
    }
    val totals = dataPairs.reduceByKey(_ + _)
    

    简单并且只会传递一次数据,但需要重写代码。我会说它在回答问题时得分 66,66%

    【讨论】:

      【解决方案2】:

      如果我没看错你的问题,你需要RDD.aggregate

      val zeroValue = (0L, 0L, 0L, 0L, 0L, 0L) // tfoo, tbar, tbaz, ffoo, fbar, fbaz
      rdd.aggregate(zeroValue)(
        (prior, current) => if (current._1) {
          (prior._1 + current._2, prior._2 + current._3, prior._3 + current._4,
            prior._4, prior._5, prior._6)
        } else {
          (prior._1, prior._2, prior._3,
            prior._4 + current._2, prior._5 + current._3, prior._6 + current._4)
        },
        (left, right) =>
          (left._1 + right._1,
            left._2 + right._2,
            left._3 + right._3,
            left._4 + right._4,
            left._5 + right._5,
            left._6 + right._6)
      )
      

      Aggregate 在概念上类似于列表上的概念化约简函数,但 RDD 不是列表,它们是分布式的,因此您提供两个函数参数,一个用于对每个分区进行操作,一个用于组合处理结果分区。

      【讨论】:

      • 对不起,我的意思不是重写上面的代码来处理所有 3 个字段,而是告诉 spark 一次处理所有 3 个管道。
      • AFAIK 没有。您可以在单个闭包中创建一个合并的管道并在单个通道中运行它(与此答案相同),或者您可以在某些情况下链接多个将作为单个阶段(单通道)执行的转换,但在这种情况下,您仍然需要在上一步中建立每一步,因此这也不是实现您提到的目标的干净方法。
      • Spark 操作正在阻塞调用,因此没有内置方法可以按照您想要的方式并行化三个操作,或者告诉它重用您已经进行的遍历。我已经使用线程来完成您所描述的内容,并且它有效,但它似乎不是该框架的预期用途,并且对 RDD 图处理的内部更改很容易破坏它。如果你可以通过相对复杂的 aggregate() 调用来实现你想要的,你会更安全。
      猜你喜欢
      • 2016-10-08
      • 1970-01-01
      • 1970-01-01
      • 2021-01-03
      • 1970-01-01
      • 1970-01-01
      • 2015-04-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多