【发布时间】:2023-04-02 04:51:01
【问题描述】:
我们正在尝试在 spark 中生成数据集的列统计信息。除了使用统计库中的汇总功能。我们正在使用以下程序:
我们用字符串值确定列
为整个数据集生成键值对,以列号为键,列值为值
-
生成新的格式图
(K,V) ->((K,V),1)
然后我们使用 reduceByKey 求所有列中所有唯一值的总和。我们缓存此输出以减少进一步的计算时间。
在下一步中,我们使用 for 循环遍历列以查找所有列的统计信息。
我们正在尝试通过再次使用 map reduce 方式来减少 for 循环,但我们无法找到实现它的方法。这样做将允许我们在一次执行中为所有列生成列统计信息。 for 循环方法是按顺序运行的,速度很慢。
代码:
//drops the header
def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
def retAtrTuple(x: String) = {
val newX = x.split(",")
for (h <- 0 until newX.length)
yield (h,newX(h))
}
val line = sc.textFile("hdfs://.../myfile.csv")
val withoutHeader: RDD[String] = dropHeader(line)
val kvPairs = withoutHeader.flatMap(retAtrTuple) //generates a key-value pair where key is the column number and value is column's value
var bool_numeric_col = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_).sortByKey() //this contains column indexes as key and boolean as value (true for numeric and false for string type)
var str_cols = bool_numeric_col.filter{case (x,y) => y == false}.map{case (x,y) => x}
var num_cols = bool_numeric_col.filter{case (x,y) => y == true}.map{case (x,y) => x}
var str_col = str_cols.toArray //array consisting the string col
var num_col = num_cols.toArray //array consisting numeric col
val colCount = kvPairs.map((_,1)).reduceByKey(_+_)
val e1 = colCount.map{case ((x,y),z) => (x,(y,z))}
var numPairs = e1.filter{case (x,(y,z)) => str_col.contains(x) }
//running for loops which needs to be parallelized/optimized as it sequentially operates on each column. Idea is to find the top10, bottom10 and number of distinct elements column wise
for(i <- str_col){
var total = numPairs.filter{case (x,(y,z)) => x==i}.sortBy(_._2._2)
var leastOnes = total.take(10)
println("leastOnes for Col" + i)
leastOnes.foreach(println)
var maxOnes = total.sortBy(-_._2._2).take(10)
println("maxOnes for Col" + i)
maxOnes.foreach(println)
println("distinct for Col" + i + " is " + total.count)
}
【问题讨论】:
-
那么问题是什么?
-
serejja 问题是使用在 colCount 中生成的相同 (K,V) 对,其中包含格式为 : (colIndex,(colValue,NumoftimesValueOccured)) 的数据来找出每个的摘要柱子。或基本上避免串行执行
-
在我看来,这是表示您的数据的一种特别糟糕的方式。为什么不为每一列制作一个单独的 RDD?就像这个答案中提到的那样:stackoverflow.com/questions/28137208/…
-
谢谢 Daniel 我也看过那个解决方案,但我担心的是,如果我们将每一列安排在单独的 RDD 中,我们将如何能够为每一列并行执行这样的汇总函数。在这种情况下,我假设我们最终会在每一列上执行相同的函数,这也可能效率很低。我的主要目标是一次性完成这个操作,就像我们使用 hadoop MapReduce 一样。 @DanielDarabos
-
啊,谢谢,我明白你的意思了。事实上,这在 Spark 中很难重现。一旦你拆分了 RDD,对它们进行操作就像你想要的那样。您需要在每一列上运行相同的函数,但每次运行只会触及该列,因此您实际上只会传递整个数据一次。但问题是您不能一次将数据拆分为多个 RDD。这是我觉得Spark的一个缺点。今天晚些时候,我会用一个可能的(尴尬的)解决方案添加一个更长的答案。
标签: scala apache-spark summary