【问题标题】:Collect and order the DataFrame column收集并订购 DataFrame 列
【发布时间】:2017-12-13 19:56:48
【问题描述】:

我有一个如下的DataFrame

A B C
1 3 1
1 8 2
1 5 3
2 2 1

我的输出应该是,B 列是根据 B 列的初始值排序的

A B
1 3,1/5,3/8,2
2 2,1

我写了这样的东西是scala

df.groupBy("A").withColumn("B",collect_list(concat("B",lit(","),"C"))

但是 dint 解决了我的问题。

【问题讨论】:

  • 您能否编辑您的问题以显示您的数据和代码的正确格式。这可以通过突出显示它并按 Ctrl+K 来完成
  • 你是如何推导出 B 列的? - 不清楚
  • 按A列分组
  • 是的,那么1/5 来自哪里?
  • 你如何期望一个列表中有多个分隔符?

标签: python scala apache-spark dataframe dataset


【解决方案1】:

鉴于您的输入数据框为

+---+---+---+
|A  |B  |C  |
+---+---+---+
|1  |3  |1  |
|1  |8  |2  |
|1  |5  |3  |
|2  |2  |1  |
+---+---+---+

你可以得到以下输出

+---+---------------+
|A  |B              |
+---+---------------+
|1  |[3,1, 5,3, 8,2]|
|2  |[2,1]          |
+---+---------------+

通过简单的groupByaggregations 和使用functions

df.orderBy("B").groupBy("A").agg(collect_list(concat_ws(",", col("B"), col("C"))) as "B")

您可以使用udf 函数来获得最终想要的结果

def joinString = udf((b: mutable.WrappedArray[String]) => {
  b.mkString("/")
} )

newdf.withColumn("B", joinString(col("B"))).show(false)

你应该得到

+---+-----------+
|A  |B          |
+---+-----------+
|1  |3,1/5,3/8,2|
|2  |2,1        |
+---+-----------+

注意您需要import org.apache.spark.sql.functions._ 才能使上述所有功能起作用

已编辑

B 列是根据 B 列的初始值排序的

为此,您可以将 orderBy 部分删除为

 import org.apache.spark.sql.functions._
val newdf = df.groupBy("A").agg(collect_list(concat_ws(",", col("B"), col("C"))) as "B")

def joinString = udf((b: mutable.WrappedArray[String]) => {
  b.mkString("/")
} )

newdf.withColumn("B", joinString(col("B"))).show(false)

你应该得到输出为

+---+-----------+
|A  |B          |
+---+-----------+
|1  |3,1/8,2/5,3|
|2  |2,1        |
+---+-----------+

【讨论】:

  • 但这不会给我一个命令
  • 它抛出一个错误说,withcolumn is not a member of relationalgroupeddataset
  • 谢谢。它工作正常。但是,问题在于大数据。处理大数据时不保持顺序。可能是因为group by不维护顺序?
  • 不,groupBy 不维持秩序。您将不得不像我使用的那样使用 orderBy,但这不会给您与原始订单相同的订单。如果答案对您有帮助,您可以接受答案:) 谢谢
  • @Deek 我已经更新了答案,希望这次我做对了
【解决方案2】:

这是你可以通过使用 concat_ws 函数然后 groupby 列 A 并收集列表来实现的

val df1 = spark.sparkContext.parallelize(Seq(
  ( 1, 3, 1),
  (1, 8, 2),
  (1, 5, 3),
  (2, 2, 1)
)).toDF("A", "B", "C")

val result = df1.withColumn("B", concat_ws("/", $"B", $"C"))

result.groupBy("A").agg(collect_list($"B").alias("B")).show

输出:

+---+---------------+
|  A|              B|
+---+---------------+
|  1|[3/1, 8/2, 5/3]|
|  2|          [2/1]|
+---+---------------+

已编辑: 如果要使用 B 列进行排序,可以执行以下操作

val format = udf((value : Seq[String]) => {
  value.sortBy(x => {x.split(",")(0)}).mkString("/")
})


val result = df1.withColumn("B", concat_ws(",", $"B", $"C"))
  .groupBy($"A").agg(collect_list($"B").alias("B"))
  .withColumn("B", format($"B"))

result.show()

输出:

+---+-----------+
|  A|          B|
+---+-----------+
|  1|3,1/5,3/8,2|
|  2|        2,1|
+---+-----------+

希望这对您有所帮助!

【讨论】:

  • 我的输出基本上应该是,[3,1 / 8,2 / 5,3]
  • 如果您需要在以后根据需要对其进行排序,您可以在 group by 之后获取订单。以上是更新的代码,希望对您有所帮助
猜你喜欢
  • 2012-09-21
  • 1970-01-01
  • 2023-03-10
  • 1970-01-01
  • 2010-12-25
  • 1970-01-01
  • 1970-01-01
  • 2018-08-26
  • 1970-01-01
相关资源
最近更新 更多