【问题标题】:Spark Data set transformation to array [duplicate]Spark数据集转换为数组[重复]
【发布时间】:2020-10-14 05:40:47
【问题描述】:

我有一个如下所示的数据集; col1 的值重复多次和 col2 的唯一值。这个原始数据集可以有近十亿行,所以我不想使用 collect 或 collect_list,因为它不会针对我的用例进行横向扩展。

原始数据集:

+---------------------|
|    col1  |    col2  |
+---------------------|
|    AA|    11        |
|    BB|    21        |
|    AA|    12        |
|    AA|    13        |
|    BB|    22        |
|    CC|    33        |
+---------------------|

我想将数据集转换为以下数组格式。 newColumn 作为 col2 的数组。

转换后的数据集:

+---------------------|
|col1  |     newColumn|
+---------------------|
|    AA|    [11,12,13]|
|    BB|    [21,22]   |
|    CC|    [33]      |
+---------------------|

我见过this 解决方案,但它使用collect_list 并且不会在大数据集上横向扩展。

【问题讨论】:

  • 我赞成这个问题,因为这更多的是关于性能而不是过程本身。我想知道为什么这里关闭了
  • 是的,这里也是,自动关闭,可能没看完整题,错过了性能方面。

标签: arrays apache-spark apache-spark-sql transformation large-data


【解决方案1】:

使用 spark 的内置功能始终是最好的方法。 我认为使用 collect_list 函数没有问题。只要你有足够的内存,这将是最好的方法。 优化工作的一种方法是将数据保存为 parquet ,按 A 列存储数据并将其保存为表格。更好的做法是还按一些均匀分布数据的列对其进行分区。

例如,

df_stored = #load your data from csv or parquet or any format'
spark.catalog.setCurrentDatabase(database_name)
df_stored.write.mode("overwrite").format("parquet").partitionBy(part_col).bucketBy(10,"col1").option("path",savepath).saveAsTable(tablename)
df_analysis = spark.table(tablename)
df_aggreg = df_analysis.groupby('col1').agg(F.collect_list(col('col2')))

这将加快聚合速度并避免大量洗牌。试试看

【讨论】:

  • 谢谢,@Raghu,是的,我计划在执行 groupBy 之前使用存储桶和分区保存数据集。我将使用 collect_list 进行测试并做出回应。我过去曾遇到过 collect_ist 的问题,但这也取决于实现。在这种情况下,我假设因为我将在 agg 中使用 collect_list,所以 size 永远不会变得太大。
  • 超级,告诉我结果如何
  • 嗨@Raghu,我测试了这将有 200M 记录我的应用程序按预期工作,而不会导致内存过载。再次感谢您从性能角度解决我的问题!
  • 很高兴听到:-)
【解决方案2】:
  1. 加载您的数据框
  2. col1分组
  3. 使用collect_listcol2 聚合到一个列表中
import org.apache.spark.sql.functions

object GroupToArray {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    //Load your dataframe
    val df = List(("AA", "11"),
      ("BB", "21"),
      ("AA", "12"),
      ("AA", "13"),
      ("BB", "22"),
      ("CC", "33")).toDF("col1","col2")

    //Group by 'col1'
    df.groupBy("col1")
      //agregate on col2 and combine it to a list
    .agg(functions.collect_list("col2").as("newColumn"))
      .show()
  }

}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-06-14
    • 2016-12-12
    • 2019-01-16
    • 1970-01-01
    • 1970-01-01
    • 2014-05-02
    • 2018-09-03
    相关资源
    最近更新 更多