【问题标题】:Spark column wise word count星火列明智的字数
【发布时间】:2023-04-02 04:51:01
【问题描述】:

我们正在尝试在 spark 中生成数据集的列统计信息。除了使用统计库中的汇总功能。我们正在使用以下程序:

  1. 我们用字符串值确定列

  2. 为整个数据集生成键值对,以列号为键,列值为值

  3. 生成新的格式图

    (K,V) ->((K,V),1)

然后我们使用 reduceByKey 求所有列中所有唯一值的总和。我们缓存此输出以减少进一步的计算时间。

在下一步中,我们使用 for 循环遍历列以查找所有列的统计信息。

我们正在尝试通过再次使用 map reduce 方式来减少 for 循环,但我们无法找到实现它的方法。这样做将允许我们在一次执行中为所有列生成列统计信息。 for 循环方法是按顺序运行的,速度很慢。

代码:

//drops the header

    def dropHeader(data: RDD[String]): RDD[String] = {
         data.mapPartitionsWithIndex((idx, lines) => {
           if (idx == 0) {
             lines.drop(1)
           }
           lines
         })
       }

    def retAtrTuple(x: String) = {
       val newX = x.split(",")
       for (h <- 0 until newX.length) 
          yield (h,newX(h))
    }



    val line = sc.textFile("hdfs://.../myfile.csv")

    val withoutHeader: RDD[String] = dropHeader(line)

    val kvPairs = withoutHeader.flatMap(retAtrTuple) //generates a key-value pair where key is the column number and value is column's value


    var bool_numeric_col = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_).sortByKey()    //this contains column indexes as key and boolean as value (true for numeric and false for string type)

    var str_cols = bool_numeric_col.filter{case (x,y) => y == false}.map{case (x,y) => x}
    var num_cols = bool_numeric_col.filter{case (x,y) => y == true}.map{case (x,y) => x}

    var str_col = str_cols.toArray   //array consisting the string col
    var num_col = num_cols.toArray   //array consisting numeric col


    val colCount = kvPairs.map((_,1)).reduceByKey(_+_)
    val e1 = colCount.map{case ((x,y),z) => (x,(y,z))}
    var numPairs = e1.filter{case (x,(y,z)) => str_col.contains(x) }

    //running for loops which needs to be parallelized/optimized as it sequentially operates on each column. Idea is to find the top10, bottom10 and number of distinct elements column wise
    for(i <- str_col){
       var total = numPairs.filter{case (x,(y,z)) => x==i}.sortBy(_._2._2)
       var leastOnes = total.take(10)
       println("leastOnes for Col" + i)
       leastOnes.foreach(println)
       var maxOnes = total.sortBy(-_._2._2).take(10)
       println("maxOnes for Col" + i)
       maxOnes.foreach(println)
       println("distinct for Col" + i + " is " + total.count)
    }

【问题讨论】:

  • 那么问题是什么?
  • serejja 问题是使用在 colCount 中生成的相同 (K,V) 对,其中包含格式为 : (colIndex,(colValue,NumoftimesValueOccured)) 的数据来找出每个的摘要柱子。或基本上避免串行执行
  • 在我看来,这是表示您的数据的一种特别糟糕的方式。为什么不为每一列制作一个单独的 RDD?就像这个答案中提到的那样:stackoverflow.com/questions/28137208/…
  • 谢谢 Daniel 我也看过那个解决方案,但我担心的是,如果我们将每一列安排在单独的 RDD 中,我们将如何能够为每一列并行执行这样的汇总函数。在这种情况下,我假设我们最终会在每一列上执行相同的函数,这也可能效率很低。我的主要目标是一次性完成这个操作,就像我们使用 hadoop MapReduce 一样。 @DanielDarabos
  • 啊,谢谢,我明白你的意思了。事实上,这在 Spark 中很难重现。一旦你拆分了 RDD,对它们进行操作就像你想要的那样。您需要在每一列上运行相同的函数,但每次运行只会触及该列,因此您实际上只会传递整个数据一次。但问题是您不能一次将数据拆分为多个 RDD。这是我觉得Spark的一个缺点。今天晚些时候,我会用一个可能的(尴尬的)解决方案添加一个更长的答案。

标签: scala apache-spark summary


【解决方案1】:

让我稍微简化一下你的问题。 (实际上很多。)我们有一个 RDD[(Int, String)],我们希望为每个 Int 找到前 10 个最常见的 Strings(它们都在 0-100 范围内)。

使用 Spark 内置的 RDD.top(n) 方法而不是像您的示例中那样进行排序更有效。它的运行时间与数据大小呈线性关系,并且需要移动的数据比排序少得多。

考虑在RDD.scala 中实现top。您也想这样做,但每个Int 键都有一个优先级队列(堆)。代码变得相当复杂:

import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private.

def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
  // A heap that only keeps the top N values, so it has bounded size.
  type Heap = BoundedPriorityQueue[(Long, String)]
  // Get the word counts.
  val counts: RDD[[(Int, String), Long)] =
    rdd.map(_ -> 1L).reduceByKey(_ + _)
  // In each partition create a column -> heap map.
  val perPartition: RDD[Map[Int, Heap]] =
    counts.mapPartitions { items =>
      val heaps =
        collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
      for (((k, v), count) <- items) {
        heaps(k) += count -> v
      }
      Iterator.single(heaps)
    }
  // Merge the per-partition heap maps into one.
  val merged: Map[Int, Heap] =
    perPartition.reduce { (heaps1, heaps2) =>
      val heaps =
        collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
      for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
        for (cv <- heap) {
          heaps(k) += cv
        }
      }
      heaps
    }
  // Discard counts, return just the top strings.
  merged.mapValues(_.map { case(count, value) => value })
}

这很有效,但也很痛苦,因为我们需要同时处理多个列。每列有一个 RDD 并在每个列上调用 rdd.top(10) 会更容易。

不幸的是,将 RDD 拆分为 N 个较小的 RDD 的天真方法是 N 次传递:

def split(together: RDD[(Int, String)], columns: Int): Seq[RDD[String]] = {
  together.cache // We will make N passes over this RDD.
  (0 until columns).map {
    i => together.filter { case (key, value) => key == i }.values
  }
}

更有效的解决方案可能是将数据按键写入单独的文件,然后将其加载回单独的 RDD。这在Write to multiple outputs by key Spark - one Spark job 中进行了讨论。

【讨论】:

    【解决方案2】:

    感谢@Daniel Darabos 的回答。但是有一些错误。

    1. 混合使用 Map 和 collection.mutable.Map

    2. withDefault((i: Int) => new Heap(n)) 设置 heaps(k) += count -> v

    3. 时不会创建新堆
    4. 括号的混合用法

    这是修改后的代码:

    //import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private. copy to your own folder and import it
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    object BoundedPriorityQueueTest {
    
      //  https://stackoverflow.com/questions/28166190/spark-column-wise-word-count
      def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
        // A heap that only keeps the top N values, so it has bounded size.
        type Heap = BoundedPriorityQueue[(Long, String)]
        // Get the word counts.
        val counts: RDD[((Int, String), Long)] =
        rdd.map(_ -> 1L).reduceByKey(_ + _)
        // In each partition create a column -> heap map.
        val perPartition: RDD[collection.mutable.Map[Int, Heap]] =
        counts.mapPartitions { items =>
          val heaps =
            collection.mutable.Map[Int, Heap]() // .withDefault((i: Int) => new Heap(n))
          for (((k, v), count) <- items) {
            println("\n---")
            println("before add " + ((k, v), count) + ", the map is: ")
            println(heaps)
            if (!heaps.contains(k)) {
              println("not contains key " + k)
              heaps(k) = new Heap(n)
              println(heaps)
            }
            heaps(k) += count -> v
            println("after add " + ((k, v), count) + ", the map is: ")
            println(heaps)
    
          }
          println(heaps)
          Iterator.single(heaps)
        }
        // Merge the per-partition heap maps into one.
        val merged: collection.mutable.Map[Int, Heap] =
        perPartition.reduce { (heaps1, heaps2) =>
          val heaps =
            collection.mutable.Map[Int, Heap]() //.withDefault((i: Int) => new Heap(n))
          println(heaps)
          for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
            for (cv <- heap) {
              heaps(k) += cv
            }
          }
          heaps
        }
        // Discard counts, return just the top strings.
        merged.mapValues(_.map { case (count, value) => value }).toMap
      }
    
      def main(args: Array[String]): Unit = {
        Logger.getRootLogger().setLevel(Level.FATAL) //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
        val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN") //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
    
    
        val words = sc.parallelize(List((1, "s11"), (1, "s11"), (1, "s12"), (1, "s13"), (2, "s21"), (2, "s22"), (2, "s22"), (2, "s23")))
        println("# words:" + words.count())
    
        val result = top(1, words)
    
        println("\n--result:")
        println(result)
        sc.stop()
    
        print("DONE")
      }
    
    }
    

    【讨论】:

    • 考虑到另一个答案被赞成并被接受为正确的答案,您应该真正解释另一个答案的代码中有哪些错误,您的代码会更正。
    • @TomBrunberg 感谢您的友好提醒。我刚刚列出了我在答案中所做的更改。
    • 谢谢!也许还有一件事:您确实在帖子中与@DanielDarabos 联系,以引起他对您的批评的注意。这是一个公平和良好的姿态。但是,我不确定寻址符号 (@) 在问答帖子中是否有效,因此我也在此评论中添加了它,以防万一。干杯。
    猜你喜欢
    • 2014-05-24
    • 2020-03-11
    • 2021-04-28
    • 1970-01-01
    • 1970-01-01
    • 2011-06-10
    • 2016-04-24
    • 1970-01-01
    • 2018-06-08
    相关资源
    最近更新 更多