【问题标题】:Spark scala group by one column breaking another column into listSpark scala 按一列分组,将另一列分成列表
【发布时间】:2019-08-02 14:31:54
【问题描述】:

有一个表格存储了用户听音乐的时间,如下所示:

+-------+-------+---------------------+
|  user | music | listen_time         |
+-------+-------+---------------------+
|   A   |   m   | 2019-07-01 16:00:00 |
+-------+-------+---------------------+
|   A   |   n   | 2019-07-01 16:05:00 |
+-------+-------+---------------------+
|   A   |   x   | 2019-07-01 16:10:00 |
+-------+-------+---------------------+
|   A   |   y   | 2019-07-01 17:10:00 |
+-------+-------+---------------------+
|   A   |   z   | 2019-07-02 18:10:00 |
+-------+-------+---------------------+
|   A   |   m   | 2019-07-02 18:15:00 |
+-------+-------+---------------------+
|   B   |   t   | 2019-07-02 18:15:00 |
+-------+-------+---------------------+
|   B   |   s   | 2019-07-02 18:20:00 |
+-------+-------+---------------------+

计算结果应该是每个用户听过的间隔小于30分钟的音乐列表,应该是这样的(music_list应该是ArrayType列):

+-------+------------+
|  user | music_list |
+-------+------------+
|   A   |   m, n, x  |
+-------+------------+
|   A   |      y     |
+-------+------------+
|   A   |    z, m    |
+-------+------------+
|   B   |    t, s    |
+-------+------------+

我怎么可能在 scala spark 数据帧中实现它?

【问题讨论】:

  • 你的数据框架构是什么?
  • @JackieLam 如果用户在4:004:054:304:35 听音乐会发生什么?第一组应该包括35个吗?在第二个?
  • @BlueSheepToken 当然,因为它距离 4:30 仅 5 分钟
  • 好的,这可以通过滞后和累积总和来完成。我会在星期一写一个答案
  • @JackieLam,这对你有帮助吗?如果是,请毫不犹豫地接受答案

标签: scala apache-spark apache-spark-sql


【解决方案1】:

这是一个提示。

df.groupBy($"user", window($"listen_time", "30 minutes")).agg(collect_list($"music"))

结果是

+----+------------------------------------------+-------------------+
|user|window                                    |collect_list(music)|
+----+------------------------------------------+-------------------+
|A   |[2019-07-01 16:00:00, 2019-07-01 16:30:00]|[m, n, x]          |
|B   |[2019-07-02 18:00:00, 2019-07-02 18:30:00]|[t, s]             |
|A   |[2019-07-02 18:00:00, 2019-07-02 18:30:00]|[z, m]             |
|A   |[2019-07-01 17:00:00, 2019-07-01 17:30:00]|[y]                |
+----+------------------------------------------+-------------------+

这是相似的结果,但不完全相同。在collect_list之后使用concat_ws就可以得到m, n, x

【讨论】:

  • 如果我们在 17:40 有一个用户 A,这将不起作用。它创建了几个会话。但我赞成答案感谢分享 te window 我不知道的勾
  • 确实如此,因为它不属于任何 30 分钟间隔。窗口函数将时间分为 00~30、30~00 而不是 10~40。
【解决方案2】:

这对你有用

val data = Seq(("A", "m", "2019-07-01 16:00:00"),
("A", "n", "2019-07-01 16:05:00"),
("A", "x", "2019-07-01 16:10:00"),
("A", "y", "2019-07-01 17:10:00"),
("A", "z", "2019-07-02 18:10:00"),
("A", "m", "2019-07-02 18:15:00"),
("B", "t", "2019-07-02 18:15:00"),
("B", "s", "2019-07-02 18:20:00"))

val getinterval = udf((time: Long) => {
(time / 1800) * 1800
})

val df = data.toDF("user", "music", "listen")
.withColumn("unixtime", unix_timestamp(col("listen")))
.withColumn("interval", getinterval(col("unixtime")))


 val res = df.groupBy(col("user"), col("interval"))
.agg(collect_list(col("music")).as("music_list")).drop("interval")

【讨论】:

    【解决方案3】:

    这种练习是掌握 Spark 的一个非常好的练习,其想法是使用滞后来使用累积和创建会话 ID。

    所以步骤是:

    • 当这是一个新会话时,创建一个带有文字 1 的列“newSession”(如果我理解得很好,超过 30 分钟没有播放音乐)
    • 通过简单地将文字 1 相加来创建会话 ID
    • GroupBy 新创建的会话 id 和用户。

    我强烈建议您在阅读此答案的下一部分之前尝试使用说明。

    解决方法如下:

    import org.apache.spark.sql.{functions => F}
    import org.apache.spark.sql.expressions.Window
    
    // Create the data
    // Here we use unix time, this is easier to check for the 30 minuts difference.
    val df = Seq(("A", "m", "2019-07-01 16:00:00"),
    ("A", "n", "2019-07-01 16:05:00"),
    ("A", "x", "2019-07-01 16:10:00"),
    ("A", "y", "2019-07-01 17:10:00"),
    ("A", "z", "2019-07-02 18:10:00"),
    ("A", "m", "2019-07-02 18:15:00"),
    ("B", "t", "2019-07-02 18:15:00"),
    ("B", "s", "2019-07-02 18:20:00")).toDF("user", "music", "listen").withColumn("unix", F.unix_timestamp($"listen", "yyyy-MM-dd HH:mm:ss"))
    
    
    // The window on which we will lag over to define a new session
    val userSessionWindow = Window.partitionBy("user").orderBy("unix")
    
    // This will put a one in front of each new session. The condition changes according to how you define a "new session"
    val newSession = ('unix > lag('unix, 1).over(userSessionWindow) + 30*60).cast("bigint")
    
    val dfWithNewSession = df.withColumn("newSession", newSession).na.fill(1)
    dfWithNewSession.show
    /**
    +----+-----+-------------------+----------+----------+
    |user|music|             listen|      unix|newSession|
    +----+-----+-------------------+----------+----------+
    |   B|    t|2019-07-02 18:15:00|1562084100|         1|
    |   B|    s|2019-07-02 18:20:00|1562084400|         0|
    |   A|    m|2019-07-01 16:00:00|1561989600|         1|
    |   A|    n|2019-07-01 16:05:00|1561989900|         0|
    |   A|    x|2019-07-01 16:10:00|1561990200|         0|
    |   A|    y|2019-07-01 17:10:00|1561993800|         1|
    |   A|    z|2019-07-02 18:10:00|1562083800|         1|
    |   A|    m|2019-07-02 18:15:00|1562084100|         0|
    +----+-----+-------------------+----------+----------+
    */
    
    // To define a session id to each user, we just need to do a cumulative sum on users' new Session
    
    val userWindow = Window.partitionBy("user").orderBy("unix")
    val dfWithSessionId = dfWithNewSession.na.fill(1).withColumn("session", sum("newSession").over(userWindow))
    
    dfWithSessionId.show
    /**
    +----+-----+-------------------+----------+----------+-------+
    |user|music|             listen|      unix|newSession|session|
    +----+-----+-------------------+----------+----------+-------+
    |   B|    t|2019-07-02 18:15:00|1562084100|         1|      1|
    |   B|    s|2019-07-02 18:20:00|1562084400|         0|      1|
    |   A|    m|2019-07-01 16:00:00|1561989600|         1|      1|
    |   A|    n|2019-07-01 16:05:00|1561989900|         0|      1|
    |   A|    x|2019-07-01 16:10:00|1561990200|         0|      1|
    |   A|    y|2019-07-01 17:10:00|1561993800|         1|      2|
    |   A|    z|2019-07-02 18:10:00|1562083800|         1|      3|
    |   A|    m|2019-07-02 18:15:00|1562084100|         0|      3|
    +----+-----+-------------------+----------+----------+-------+
    */
    
    val dfFinal = dfWithSessionId.groupBy("user", "session").agg(F.collect_list("music").as("music")).select("user", "music").show
    
    dfFinal.show
    
    /**
    +----+---------+
    |user|    music|
    +----+---------+
    |   B|   [t, s]|
    |   A|[m, n, x]|
    |   A|      [y]|
    |   A|   [z, m]|
    +----+---------+
    */
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-01-07
      • 1970-01-01
      • 1970-01-01
      • 2017-01-22
      • 1970-01-01
      • 1970-01-01
      • 2021-09-23
      • 2021-02-11
      相关资源
      最近更新 更多