【问题标题】:Grouping in Apache Spark dataframe在 Apache Spark 数据框中进行分组
【发布时间】:2019-09-29 03:54:58
【问题描述】:

我是 Apache Spark 的新手。下面是我在读取 csv 文件时创建的 Spark dataframe

Parent  Keyword   Volume
P1       K1        100
P1       K2        200
P1       K3        150
P2       K4        100
P2       K5        200

我需要将上面的数据框转换为下面的数据框。逻辑是所有属于同一个父级的关键字都是相关的,并且应该按 Volume 的排序顺序列出。例如K1, K2, K3 属于同一个父P1,所以它们都是相关的。因此,对于K1,相关的关键字是K2K3。首先显示K2,因为它的音量(200)大于K3(150)

Keyword   Related_keywords
K1         K2, K3
K2         K3, K1
K3         K2, K1
K4         K5
K5         K4

我是 Spark 的新手,正在研究这个问题,groupBy 可以使用,但不知道如何将第一个数据帧转换为第二个数据帧。

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    虽然这可以使用groupBy 完成,但当您需要结果数据框中的所有原始行时,窗口函数通常更容易。我们可以使用collect_list,但作为doc says,顺序是不确定的,所以让我们创建卷和关键字的元组:

    val txt =
      """Parent  Keyword   Volume
        |P1       K1        100
        |P1       K2        200
        |P1       K3        150
        |P2       K4        100
        |P2       K5        200""".stripMargin.lines
        .map(_.split("\\s+").mkString("|"))
        .toSeq
        .toDS()
    val df = spark.read
      .option("inferSchema", true)
      .option("header", true)
      .option("delimiter", "|")
      .csv(txt)
    
    val win = Window.partitionBy($"Parent")
    val df1 =
      df.select($"Keyword",
                collect_list(struct(-$"Volume", $"Keyword")).over(win) as "rel")
    

    现在我们几乎有了所需的格式

    df1.select(array_sort($"rel") as "Related_keywords")
      .show(20, false)
    

    输出:

    +------------------------------------+
    |Related_keywords                    |
    +------------------------------------+
    |[[-200, K5], [-100, K4]]            |
    |[[-200, K5], [-100, K4]]            |
    |[[-200, K2], [-150, K3], [-100, K1]]|
    |[[-200, K2], [-150, K3], [-100, K1]]|
    |[[-200, K2], [-150, K3], [-100, K1]]|
    +------------------------------------+
    

    但是,有两个问题,原来的Keyword会在列表中重复,并且所有关键字前面都有负数。为了使它更漂亮,我认为需要 UDF:s(找不到用于解压缩元组的 SQL 函数):

    val myudf = udf(
      (keyword: String, rel: Seq[Row]) =>
        rel
          .collect {
            case Row(volume: Int, kw: String) if kw != keyword => (volume, kw)
          }
          .sortBy(_._1)
          .map(_._2))
    
    df1.select($"Keyword", myudf($"Keyword", $"rel") as "Related_keywords")
      .show(20, false)
    

    输出:

    +-------+----------------+
    |Keyword|Related_keywords|
    +-------+----------------+
    |K4     |[K5]            |
    |K5     |[K4]            |
    |K1     |[K2, K3]        |
    |K2     |[K3, K1]        |
    |K3     |[K2, K1]        |
    +-------+----------------+
    

    【讨论】:

    • 谢谢.. 有效。我需要做的另一件事是从结果数据帧中创建一个 JSON,然后调用一个 HTTP POST 请求来发送。我可以调用 collect() 但问题是输出数据帧可能包含数百万行,因此调用 collect 可能会导致驱动程序出现内存不足的问题。那我应该怎么做呢?
    • 如果你想为每个条目单独调用 API df.toJSON.foreach(jsonStr => callApi(jsonStr)) 或者如果你需要批处理它 df.toJSON.foreachPartition 就像这里描述的 stackoverflow.com/questions/55979027/…
    猜你喜欢
    • 2015-03-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-15
    • 1970-01-01
    • 2017-09-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多