通过在内置函数中使用 Dataframe explode,split,collect_set,groupBy。
//input data
val df=Seq("Line number one has six words","Line number two has has two words").toDF("input")
scala> :paste
// Entering paste mode (ctrl-D to finish)
df.withColumn("words",explode(split($"input","\\s+"))) //split by space and explode
.groupBy("input","words") //group by on both columns
.count()
.withColumn("line_word_count",struct($"words",$"count")) //create struct
.groupBy("input") //grouping by input data column
.agg(collect_set("line_word_count").alias("line_word_count"))
.show(false)
Result:
+---------------------------------+------------------------------------------------------------------+
|input |line_word_count |
+---------------------------------+------------------------------------------------------------------+
|Line number one has six words |[[one, 1], [has, 1], [six, 1], [number, 1], [words, 1], [Line, 1]]|
|Line number two has has two words|[[has, 2], [two, 2], [words, 1], [number, 1], [Line, 1]] |
+---------------------------------+------------------------------------------------------------------+
如果您需要行号,请使用concat,monotonically_increasing_id 函数:
df.withColumn("line",concat(lit("line"),monotonically_increasing_id()+1))
.withColumn("words",explode(split($"input","\\s+")))
.groupBy("input","words","line")
.count()
.withColumn("line_word_count",struct($"words",$"count"))
.groupBy("line")
.agg(collect_set("line_word_count").alias("line_word_count"))
.show(false)
Result:
+-----+------------------------------------------------------------------+
|line |line_word_count |
+-----+------------------------------------------------------------------+
|line1|[[one, 1], [has, 1], [six, 1], [words, 1], [number, 1], [Line, 1]]|
|line2|[[has, 2], [two, 2], [number, 1], [words, 1], [Line, 1]] |
+-----+------------------------------------------------------------------+
注意如果数据集较大,我们需要这样做.repartition(1)。