【问题标题】:Spark: How to aggregate/reduce records based on time difference?Spark:如何根据时差聚合/减少记录?
【发布时间】:2019-12-22 19:24:28
【问题描述】:

我有来自车辆的 CSV 时间序列数据,其中包含以下信息:

  • 行程 ID
  • 时间戳
  • 速度

数据如下:

trip-id | timestamp  | speed

001     | 1538204192 | 44.55
001     | 1538204193 | 47.20 <-- start of brake
001     | 1538204194 | 42.14
001     | 1538204195 | 39.20
001     | 1538204196 | 35.30
001     | 1538204197 | 32.22 <-- end of brake
001     | 1538204198 | 34.80
001     | 1538204199 | 37.10
...
001     | 1538204221 | 55.30
001     | 1538204222 | 57.20 <-- start of brake
001     | 1538204223 | 54.60
001     | 1538204224 | 52.15
001     | 1538204225 | 49.27
001     | 1538204226 | 47.89 <-- end of brake
001     | 1538204227 | 50.57
001     | 1538204228 | 53.72
...

当基于 timestamp 的 2 个连续记录中 speed 减少时,就会发生制动事件。

我想根据事件start timestampend timestampstart speedend speed从数据中提取制动事件。

+-------------+---------------+-------------+-----------+---------+
|      breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193|     1538204193|   1538204196|       47.2|     35.3|
|0011538204222|     1538204222|   1538204225|       57.2|    49.27|
+-------------+---------------+-------------+-----------+---------+

这是我的看法:

  1. 根据trip-id 定义了一个带有分区的窗口规范,按timestamp 排序。
  2. 应用窗口lag 移动连续行并计算速度差异。
  3. 过滤掉具有正速度差的记录,因为我只对制动事件感兴趣。
  4. 现在我只有属于制动事件的记录,我想对属于同一事件的记录进行分组。我想我可以根据时间戳差异来做到这一点。如果 2 条记录相差 1 秒,则这 2 条记录属于同一个刹车事件。

我被困在这里,因为我没有属于同一组的key,因此我可以应用基于键的聚合。

我的问题是:

  1. 如何映射根据时间戳的差异添加key 列?因此,如果 2 条记录相差 1 秒,则它们应该有一个公共键。这样,我可以根据新添加的键减少一个组。

  2. 是否有任何更好、更优化的方法来实现这一目标?我的方法可能非常低效,因为它依赖于逐行比较。在属于特定事件(来自单车行程的数据)的数据流中检测此类“子事件”(例如制动事件)的其他可能方法是什么?

提前致谢!


附录:

【问题讨论】:

  • 在第 3 步之后,如果它是中断的开始(例如时间戳比它前面的行大 1 多),如何添加一列并将 1 编码为开始和 @987654337 @ 对于所有其他行。然后,您可以对派生列进行累积求和,这会将所有事件留在标记为1 的第一个中断中,所有事件都在标记为1 的第二个中断中,等等(例如stackoverflow.com/questions/46979685/…)。
  • 你根本不需要考虑时间戳的差异。您可以使用累积和来查看哪些事件属于同一制动事件。
  • @pault 不确定我是否理解。你能再解释一下吗?这将如何解决?
  • @Shumail 你想要没有trip-id的数据框,只有列开始时间戳,结束时间戳,开始速度和结束速度?

标签: dataframe apache-spark pyspark apache-spark-sql rdd


【解决方案1】:

对于 Pandas 用户,几乎有一种常见的编程模式,使用 shift() + cumsum() 来设置组标签来识别与某些特定模式/条件匹配的连续行。使用 pyspark,我们可以使用窗口函数lag() + sum() 来做同样的事情并找到这个组标签(以下代码中的d2):

数据设置:

from pyspark.sql import functions as F, Window

>>> df.orderBy('timestamp').show()
+-------+----------+-----+
|trip-id| timestamp|speed|
+-------+----------+-----+
|    001|1538204192|44.55|
|    001|1538204193|47.20|
|    001|1538204194|42.14|
|    001|1538204195|39.20|
|    001|1538204196|35.30|
|    001|1538204197|32.22|
|    001|1538204198|34.80|
|    001|1538204199|37.10|
|    001|1538204221|55.30|
|    001|1538204222|57.20|
|    001|1538204223|54.60|
|    001|1538204224|52.15|
|    001|1538204225|49.27|
|    001|1538204226|47.89|
|    001|1538204227|50.57|
|    001|1538204228|53.72|
+-------+----------+-----+

>>> df.printSchema()
root
 |-- trip-id: string (nullable = true)
 |-- unix_timestamp: integer (nullable = true)
 |-- speed: double (nullable = true)

设置两个Window Spec (w1, w2):

# Window spec used to find previous speed F.lag('speed').over(w1) and also do the cumsum() to find flag `d2`
w1 = Window.partitionBy('trip-id').orderBy('timestamp')

# Window spec used to find the minimal value of flag `d1` over the partition(`trip-id`,`d2`)
w2 = Window.partitionBy('trip-id', 'd2').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

三个标志(d1、d2、d3):

  • d1 :标识前一个速度是否大于当前速度的标志,如果为真则d1 = 0,否则为d1 = 1
  • d2 :标记连续行以使用相同的唯一编号进行速降
  • d3 : 标识分区上d1的最小值的标志('trip-id', 'd2'),只有d3 == 0的行才能属于speed-的组降低。这将用于过滤掉不相关的行

    df_1 = df.withColumn('d1', F.when(F.lag('speed').over(w1) > F.col('speed'), 0).otherwise(1))\
             .withColumn('d2', F.sum('d1').over(w1)) \
             .withColumn('d3', F.min('d1').over(w2))
    
    >>> df_1.orderBy('timestamp').show()
    +-------+----------+-----+---+---+---+
    |trip-id| timestamp|speed| d1| d2| d3|
    +-------+----------+-----+---+---+---+
    |    001|1538204192|44.55|  1|  1|  1|
    |    001|1538204193|47.20|  1|  2|  0|
    |    001|1538204194|42.14|  0|  2|  0|
    |    001|1538204195|39.20|  0|  2|  0|
    |    001|1538204196|35.30|  0|  2|  0|
    |    001|1538204197|32.22|  0|  2|  0|
    |    001|1538204198|34.80|  1|  3|  1|
    |    001|1538204199|37.10|  1|  4|  1|
    |    001|1538204221|55.30|  1|  5|  1|
    |    001|1538204222|57.20|  1|  6|  0|
    |    001|1538204223|54.60|  0|  6|  0|
    |    001|1538204224|52.15|  0|  6|  0|
    |    001|1538204225|49.27|  0|  6|  0|
    |    001|1538204226|47.89|  0|  6|  0|
    |    001|1538204227|50.57|  1|  7|  1|
    |    001|1538204228|53.72|  1|  8|  1|
    +-------+----------+-----+---+---+---+
    

删除不关心的行:

df_1 = df_1.where('d3 == 0')

>>> df_1.orderBy('timestamp').show()
+-------+----------+-----+---+---+---+
|trip-id| timestamp|speed| d1| d2| d3|
+-------+----------+-----+---+---+---+
|    001|1538204193|47.20|  1|  2|  0|
|    001|1538204194|42.14|  0|  2|  0|
|    001|1538204195|39.20|  0|  2|  0|
|    001|1538204196|35.30|  0|  2|  0|
|    001|1538204197|32.22|  0|  2|  0|
|    001|1538204222|57.20|  1|  6|  0|
|    001|1538204223|54.60|  0|  6|  0|
|    001|1538204224|52.15|  0|  6|  0|
|    001|1538204225|49.27|  0|  6|  0|
|    001|1538204226|47.89|  0|  6|  0|
+-------+----------+-----+---+---+---+

最后一步:

现在对于df_1,按trip-idd2分组,找到F.struct('timestamp', 'speed')的最小值和最大值,这将返回组中的第一条和最后一条记录,从struct中选择相应的字段得到最终结果:

df_new = df_1.groupby('trip-id', 'd2').agg(
          F.min(F.struct('timestamp', 'speed')).alias('start')
        , F.max(F.struct('timestamp', 'speed')).alias('end')
).select(
      'trip-id'
    , F.col('start.timestamp').alias('start timestamp')
    , F.col('end.timestamp').alias('end timestamp')
    , F.col('start.speed').alias('start speed')
    , F.col('end.speed').alias('end speed')
)

>>> df_new.show()
+-------+---------------+-------------+-----------+---------+
|trip-id|start timestamp|end timestamp|start speed|end speed|
+-------+---------------+-------------+-----------+---------+
|    001|     1538204193|   1538204197|      47.20|    32.22|
|    001|     1538204222|   1538204226|      57.20|    47.89|
+-------+---------------+-------------+-----------+---------+

注意:去掉中间数据帧df_1,我们可以得到如下:

df_new = df.withColumn('d1', F.when(F.lag('speed').over(w1) > F.col('speed'), 0).otherwise(1))\
           .withColumn('d2', F.sum('d1').over(w1)) \
           .withColumn('d3', F.min('d1').over(w2)) \
           .where('d3 == 0') \
           .groupby('trip-id', 'd2').agg(
                F.min(F.struct('timestamp', 'speed')).alias('start')
              , F.max(F.struct('timestamp', 'speed')).alias('end')
            )\
           .select(
                'trip-id'
              , F.col('start.timestamp').alias('start timestamp')
              , F.col('end.timestamp').alias('end timestamp')
              , F.col('start.speed').alias('start speed')
              , F.col('end.speed').alias('end speed')
            )

【讨论】:

  • 非常感谢您的回答!您是否知道任何资源/指针具有一些示例实现,这些示例实现使用这种基于标志的中间数据帧方法?
【解决方案2】:

希望这会有所帮助。 Scala 代码。

输出

+-------------+---------------+-------------+-----------+---------+
|      breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193|     1538204193|   1538204196|       47.2|     35.3|
|0011538204222|     1538204222|   1538204225|       57.2|    49.27|
+-------------+---------------+-------------+-----------+---------+

代码

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.WindowSpec
import org.apache.spark.sql.functions._

scala> df.show
+-------+----------+-----+
|trip-id| timestamp|speed|
+-------+----------+-----+
|    001|1538204192|44.55|
|    001|1538204193| 47.2|
|    001|1538204194|42.14|
|    001|1538204195| 39.2|
|    001|1538204196| 35.3|
|    001|1538204197|32.22|
|    001|1538204198| 34.8|
|    001|1538204199| 37.1|
|    001|1538204221| 55.3|
|    001|1538204222| 57.2|
|    001|1538204223| 54.6|
|    001|1538204224|52.15|
|    001|1538204225|49.27|
|    001|1538204226|47.89|
|    001|1538204227|50.57|
|    001|1538204228|53.72|
+-------+----------+-----+

val overColumns = Window.partitionBy("trip-id").orderBy("timestamp")
val breaksDF = df
  .withColumn("speeddiff", lead("speed", 1).over(overColumns) - $"speed")
  .withColumn("breaking", when($"speeddiff" < 0, 1).otherwise(0))

scala> breaksDF.show
+-------+----------+-----+-------------------+--------+
|trip-id| timestamp|speed|          speeddiff|breaking|
+-------+----------+-----+-------------------+--------+
|    001|1538204192|44.55| 2.6500000000000057|       0|
|    001|1538204193| 47.2| -5.060000000000002|       1|
|    001|1538204194|42.14|-2.9399999999999977|       1|
|    001|1538204195| 39.2|-3.9000000000000057|       1|
|    001|1538204196| 35.3|-3.0799999999999983|       1|
|    001|1538204197|32.22| 2.5799999999999983|       0|
|    001|1538204198| 34.8| 2.3000000000000043|       0|
|    001|1538204199| 37.1| 18.199999999999996|       0|
|    001|1538204221| 55.3| 1.9000000000000057|       0|
|    001|1538204222| 57.2|-2.6000000000000014|       1|
|    001|1538204223| 54.6| -2.450000000000003|       1|
|    001|1538204224|52.15|-2.8799999999999955|       1|
|    001|1538204225|49.27|-1.3800000000000026|       1|
|    001|1538204226|47.89| 2.6799999999999997|       0|
|    001|1538204227|50.57| 3.1499999999999986|       0|
|    001|1538204228|53.72|               null|       0|
+-------+----------+-----+-------------------+--------+


val outputDF = breaksDF
  .withColumn("breakevent", 
    when(($"breaking" - lag($"breaking", 1).over(overColumns)) === 1, "start of break")
    .when(($"breaking" - lead($"breaking", 1).over(overColumns)) === 1, "end of break"))

scala> outputDF.show
+-------+----------+-----+-------------------+--------+--------------+
|trip-id| timestamp|speed|          speeddiff|breaking|    breakevent|
+-------+----------+-----+-------------------+--------+--------------+
|    001|1538204192|44.55| 2.6500000000000057|       0|          null|
|    001|1538204193| 47.2| -5.060000000000002|       1|start of break|
|    001|1538204194|42.14|-2.9399999999999977|       1|          null|
|    001|1538204195| 39.2|-3.9000000000000057|       1|          null|
|    001|1538204196| 35.3|-3.0799999999999983|       1|  end of break|
|    001|1538204197|32.22| 2.5799999999999983|       0|          null|
|    001|1538204198| 34.8| 2.3000000000000043|       0|          null|
|    001|1538204199| 37.1| 18.199999999999996|       0|          null|
|    001|1538204221| 55.3| 1.9000000000000057|       0|          null|
|    001|1538204222| 57.2|-2.6000000000000014|       1|start of break|
|    001|1538204223| 54.6| -2.450000000000003|       1|          null|
|    001|1538204224|52.15|-2.8799999999999955|       1|          null|
|    001|1538204225|49.27|-1.3800000000000026|       1|  end of break|
|    001|1538204226|47.89| 2.6799999999999997|       0|          null|
|    001|1538204227|50.57| 3.1499999999999986|       0|          null|
|    001|1538204228|53.72|               null|       0|          null|
+-------+----------+-----+-------------------+--------+--------------+


scala> outputDF.filter("breakevent is not null").select("trip-id", "timestamp", "speed", "breakevent").show
+-------+----------+-----+--------------+
|trip-id| timestamp|speed|    breakevent|
+-------+----------+-----+--------------+
|    001|1538204193| 47.2|start of break|
|    001|1538204196| 35.3|  end of break|
|    001|1538204222| 57.2|start of break|
|    001|1538204225|49.27|  end of break|
+-------+----------+-----+--------------+

outputDF.filter("breakevent is not null").withColumn("breakID", 
  when($"breakevent" === "start of break", concat($"trip-id",$"timestamp"))
  .when($"breakevent" === "end of break", concat($"trip-id", lag($"timestamp", 1).over(overColumns))))
  .groupBy("breakID").agg(first($"timestamp") as "start timestamp", last($"timestamp") as "end timestamp", first($"speed") as "start speed", last($"speed") as "end speed").show


+-------------+---------------+-------------+-----------+---------+
|      breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193|     1538204193|   1538204196|       47.2|     35.3|
|0011538204222|     1538204222|   1538204225|       57.2|    49.27|
+-------------+---------------+-------------+-----------+---------+

【讨论】:

  • 一切都好,只是最终结果不正确。我想它是可以修复的,通过使用lag 而不是lead?
  • @C.S.Reddy Gadipally 你真是个天才!您生成trip-id 的部分非常新颖。谢谢。
猜你喜欢
  • 1970-01-01
  • 2011-07-06
  • 2016-07-27
  • 2021-06-04
  • 2015-06-05
  • 1970-01-01
  • 2020-09-20
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多