【问题标题】:How do I group records that are within a specific time interval using Spark Scala or sql?如何使用 Spark Scala 或 sql 对特定时间间隔内的记录进行分组?
【发布时间】:2019-03-29 02:16:04
【问题描述】:

只有当它们具有相同的 ID 并且它们的时间在 1 分钟内时,我才想在 scala 中对记录进行分组。

我在概念上是这样想的?但我不太确定

HAVING a.ID = b.ID AND a.time + 30 sec > b.time AND a.time - 30 sec < b.time




| ID         |     volume  |           Time             |
|:-----------|------------:|:--------------------------:|
| 1          |      10     |    2019-02-17T12:00:34Z    |
| 2          |      20     |    2019-02-17T11:10:46Z    |
| 3          |      30     |    2019-02-17T13:23:34Z    |
| 1          |      40     |    2019-02-17T12:01:02Z    |
| 2          |      50     |    2019-02-17T11:10:30Z    |
| 1          |      60     |    2019-02-17T12:01:57Z    |

到这里:

| ID         |     volume  | 
|:-----------|------------:|
| 1          |      50     |   // (10+40)
| 2          |      70     |   // (20+50)
| 3          |      30     |


df.groupBy($"ID", window($"Time", "1 minutes")).sum("volume")

上面的代码是 1 个解决方案,但它总是四舍五入。

例如 2019-02-17T12:00:45Z 的范围为

2019-02-17T12:00:00Z TO 2019-02-17T12:01:00Z.

我正在寻找这个: 2019-02-17T11:45:00Z TO 2019-02-17T12:01:45Z.

有办法吗?

【问题讨论】:

  • 12:00:3412:01:02 相距不到一分钟。但是12:01:0212:01:57 也相差不到一分钟。你为什么不想把这三个结合起来?为什么你更喜欢结合前两个而不是后两个?
  • 您的最终2019-02-17T11:45:00Z TO 2019-02-17T12:01:45Z. 是否应该读作2019-02-17T12:01:45Z TO 2019-02-17T12:01:45Z
  • 12:00:34 和 12:01:02 在 1 分钟内。但是 12:00:34 和 12:01:57 不是。我不想将它们组合在一起,它们几乎是 2 分钟的一部分。我希望澄清。 2019-02-17T11:45:00Z (+1m) (+1m) 2019-02-17T12:01:45Z
  • 不,你忽略了我的一半问题。为什么不将12:01:0212:01:57 结合起来?他们相隔55秒。是不是因为12:01:02 已经与12:00:34 合并而不想合并它们?在这种情况下,如果12:00:01 也有一行,事情就会改变。您将合并12:00:0112:00:34,并分别合并12:01:0212:01:57。这意味着如果不回到序列的开头并向前滚动,就无法判断要组合哪些行。这是一个顺序循环,在 SQL 中你不会这样做。
  • 你是对的!再次感谢您的陪伴

标签: sql scala apache-spark


【解决方案1】:

org.apache.spark.sql.functions 提供如下重载的窗口函数。

1. window(timeColumn: Column, windowDuration: String) : 在给定时间戳指定列的情况下生成翻滚时间窗口。窗口开始是包含的,但窗口结束是排除的,例如12:05 将在窗口 [12:05,12:10) 中,但不在 [12:00,12:05) 中。

窗口看起来像:

  {{{
    09:00:00-09:01:00
    09:01:00-09:02:00
    09:02:00-09:03:00 ...
  }}}

2。 window((timeColumn: Column, windowDuration: String, slideDuration: String): 给定时间戳指定列,将行分桶到一个或多个时间窗口中。窗口开始是包含的,但窗口结束是排除的,例如12:05 将在窗口 [12:05,12:10) 中,但不在 [12:00,12:05) 中。 slideDuration 指定窗口滑动间隔的参数,例如1 minute。每个slideDuration 都会生成一个新窗口。必须小于或等于windowDuration

窗口看起来像:

{{{
  09:00:00-09:01:00
  09:00:10-09:01:10
  09:00:20-09:01:20 ...
}}}

3。 window((timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): 在给定时间戳指定列的情况下,将行分桶到一个或多个时间窗口中。窗口开始包含但窗口结束不包含,例如12:05 将在窗口 [12:05,12:10) 中,但不在 [12:00,12:05) 中。

窗口看起来像:

{{{
  09:00:05-09:01:05
  09:00:15-09:01:15
  09:00:25-09:01:25 ...
}}}

例如,为了让每小时滚动的窗口在整点后 15 分钟开始,例如12:15-13:15、13:15-14:15... 将startTime 提供为15 minutes这是满足您要求的完美重载窗口函数。

请找到如下工作代码。

import org.apache.spark.sql.SparkSession

object SparkWindowTest extends App {

  val spark = SparkSession
    .builder()
    .master("local")
    .appName("File_Streaming")
    .getOrCreate()

  import spark.implicits._
  import org.apache.spark.sql.functions._

  //Prepare Test Data
  val df = Seq((1, 10, "2019-02-17 12:00:49"), (2, 20, "2019-02-17 11:10:46"),
    (3, 30, "2019-02-17 13:23:34"),(2, 50, "2019-02-17 11:10:30"),
    (1, 40, "2019-02-17 12:01:02"), (1, 60, "2019-02-17 12:01:57"))
    .toDF("ID", "Volume", "TimeString")

  df.show()
  df.printSchema()

+---+------+-------------------+
| ID|Volume|         TimeString|
+---+------+-------------------+
|  1|    10|2019-02-17 12:00:49|
|  2|    20|2019-02-17 11:10:46|
|  3|    30|2019-02-17 13:23:34|
|  2|    50|2019-02-17 11:10:30|
|  1|    40|2019-02-17 12:01:02|
|  1|    60|2019-02-17 12:01:57|
+---+------+-------------------+

root
 |-- ID: integer (nullable = false)
 |-- Volume: integer (nullable = false)
 |-- TimeString: string (nullable = true)

  //Converted String Timestamp into Timestamp
  val modifiedDF = df.withColumn("Time", to_timestamp($"TimeString"))

  //Dropped String Timestamp from DF
  val modifiedDF1 = modifiedDF.drop("TimeString")

  modifiedDF.show(false)
  modifiedDF.printSchema()

+---+------+-------------------+-------------------+
|ID |Volume|TimeString         |Time               |
+---+------+-------------------+-------------------+
|1  |10    |2019-02-17 12:00:49|2019-02-17 12:00:49|
|2  |20    |2019-02-17 11:10:46|2019-02-17 11:10:46|
|3  |30    |2019-02-17 13:23:34|2019-02-17 13:23:34|
|2  |50    |2019-02-17 11:10:30|2019-02-17 11:10:30|
|1  |40    |2019-02-17 12:01:02|2019-02-17 12:01:02|
|1  |60    |2019-02-17 12:01:57|2019-02-17 12:01:57|
+---+------+-------------------+-------------------+

root
 |-- ID: integer (nullable = false)
 |-- Volume: integer (nullable = false)
 |-- TimeString: string (nullable = true)
 |-- Time: timestamp (nullable = true)

  modifiedDF1.show(false)
  modifiedDF1.printSchema()

+---+------+-------------------+
|ID |Volume|Time               |
+---+------+-------------------+
|1  |10    |2019-02-17 12:00:49|
|2  |20    |2019-02-17 11:10:46|
|3  |30    |2019-02-17 13:23:34|
|2  |50    |2019-02-17 11:10:30|
|1  |40    |2019-02-17 12:01:02|
|1  |60    |2019-02-17 12:01:57|
+---+------+-------------------+

root
 |-- ID: integer (nullable = false)
 |-- Volume: integer (nullable = false)
 |-- Time: timestamp (nullable = true)

  //Main logic
  val modifiedDF2 = modifiedDF1.groupBy($"ID", window($"Time", "1 minutes","1 minutes","45 seconds")).sum("Volume")

  //Renamed all columns of DF.
  val newNames = Seq("ID", "WINDOW", "VOLUME")
  val finalDF = modifiedDF2.toDF(newNames: _*)

  finalDF.show(false)

+---+---------------------------------------------+------+
|ID |WINDOW                                       |VOLUME|
+---+---------------------------------------------+------+
|2  |[2019-02-17 11:09:45.0,2019-02-17 11:10:45.0]|50    |
|1  |[2019-02-17 12:01:45.0,2019-02-17 12:02:45.0]|60    |
|1  |[2019-02-17 12:00:45.0,2019-02-17 12:01:45.0]|50    |
|3  |[2019-02-17 13:22:45.0,2019-02-17 13:23:45.0]|30    |
|2  |[2019-02-17 11:10:45.0,2019-02-17 11:11:45.0]|20    |
+---+---------------------------------------------+------+

}

【讨论】:

  • 您的第一个测试数据记录不正确。它应该是12:00:34 而不是12:00:49。这将改变结果。 OP 不希望从 45 秒开始有规律的 1 分钟间隔。 OP 想要基于数据的动态窗口。因此,对于 ID=1,将是 12:00:34 -&gt; 12:01:34,然后是 12:01:57 -&gt; 12:02:57,但不是 12:01:02 -&gt; 12:02:02,因为它的起始记录包含在第一个窗口中。 (您必须从序列的最开始并向前迭代,这是所有 SQL 都擅长的)
  • 很好的答案!感谢您提供的所有信息,如果有办法使“startTime”动态化会更好吗?而不是每 45 秒标记一次。该程序将记录时间,并且每次只增加 30 秒
  • startTime 可以是动态的。您可以将其作为参数传递,而不是硬编码 45。
  • e.d. $“时间”这样的?在记录时间的意义上是动态的,而不是一般时间
  • 所以如果记录是 12:01:20。范围将是 12:00:20 12:02:20
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-01-03
  • 1970-01-01
  • 1970-01-01
  • 2016-10-04
  • 1970-01-01
  • 2016-04-02
相关资源
最近更新 更多