【发布时间】:2019-06-11 12:26:46
【问题描述】:
我有一个包含很多列的 df,但我的问题是 2 列:
val df = Seq(("id1","unknown"),("id1","type1"),("id1","unknown"),("id2","typeX"),
("id2","typeX"),("id2","unknown"),("id5","typeY"),("id2","unknown"))
.toDF("ID","TYPE")
+---+-------+
| ID| TYPE|
+---+-------+
|id1|unknown|
|id1| type1|
|id1|unknown|
|id2| typeX|
|id2| typeX|
|id2|unknown|
|id5| typeY|
|id2|unknown|
+---+-------+
我想将类型“未知”替换为与 ID 对应的类型。 结果应该是这样的:
+---+-----+
| ID| TYPE|
+---+-----+
|id1|type1|
|id1|type1|
|id1|type1|
|id2|typeX|
|id2|typeX|
|id2|typeX|
|id5|typeY|
|id2|typeX|
+---+-----+
它不能被硬编码(使用when id1 -> type1 等),因为我有 300 000 个每周更改的 ID...
这是我已经尝试过的:
val w = Window.partitionBy("ID")
df.withColumn("TYPE",collect_list("TYPE").over(w))
+---+--------------------------------+
|ID |TYPE |
+---+--------------------------------+
|id5|[typeY] |
|id1|[unknown, type1, unknown] |
|id1|[unknown, type1, unknown] |
|id1|[unknown, type1, unknown] |
|id2|[typeX, typeX, unknown, unknown]|
|id2|[typeX, typeX, unknown, unknown]|
|id2|[typeX, typeX, unknown, unknown]|
|id2|[typeX, typeX, unknown, unknown]|
+---+--------------------------------+
df.withColumn("TYPE",typeProcessingUDF(col("TYPE")))
+---+-----+
| ID| TYPE|
+---+-----+
|id5|typeY|
|id1|type1|
|id1|type1|
|id1|type1|
|id2|typeX|
|id2|typeX|
|id2|typeX|
|id2|typeX|
+---+-----+
def dtypeProcessing(dtypeList : mutable.WrappedArray[String]) : String = {
val dtype = dtypeList
.filter(element => element!= "unknown" && element!="")
.distinct
dtype.length match {
case 0 => "Unknown"
case x if x >1 => "Unknown"
case x if x ==1 => dtype(0)
}
}
val typeProcessingUDF = udf(dtypeProcessing _)
这行得通,
但它并没有处理考虑案例的所有情况:
if [type1,type2] => return "Unknown"if [type1,type2,type2] => return type2
【问题讨论】:
-
当某个id有多个类型时,你想做什么?或者除了未知之外根本没有类型?
-
如果超过on类型,返回主导类型。如果没有类型返回“未知”
-
“主导”是什么意思?
-
抱歉,ID 的代表类型最多,如果对于 id1 : [type1,type2,type2,"unknown"],那就是 type2
-
那么 [id1=>type1,type2,unknown] 呢?看来你还没想好……
标签: scala apache-spark apache-spark-sql