【问题标题】:Distribution of time periods over rows with certain status (column value)具有特定状态的行的时间段分布(列值)
【发布时间】:2017-05-23 11:31:24
【问题描述】:

我有一个包含日志的 Pyspark 数据框,每一行对应于记录时系统的状态,以及一个组号。我想找出每个组处于不健康状态的时间段的长度。

例如,如果这是我的桌子:

TIMESTAMP | STATUS_CODE | GROUP_NUMBER
--------------------------------------
02:03:11  | healthy     | 000001
02:03:04  | healthy     | 000001
02:03:03  | unhealthy   | 000001
02:03:00  | unhealthy   | 000001
02:02:58  | healthy     | 000008
02:02:57  | healthy     | 000008
02:02:55  | unhealthy   | 000001
02:02:54  | healthy     | 000001
02:02:50  | healthy     | 000007
02:02:48  | healthy     | 000004

我想返回组 000001,其不正常的时间段为 9 秒(从 02:02:55 到 02:03:04)。

其他组也可能有不健康的时间段,我也想退回这些时间段。

由于具有相同状态的连续行的可能性,并且由于不同组的行穿插,我正在努力寻找一种有效地做到这一点的方法。

我无法将 Pyspark 数据帧转换为 Pandas 数据帧,因为它太大了。

如何有效地确定这些时间段的长度?

非常感谢!

【问题讨论】:

  • 发布的解决方案是否有效?
  • @rogue-one - 谢谢,你的实现解释得很好,很详细!走在台阶上,显得颇为精明灵动。对此的扩展 - 有没有办法获取组处于不健康状态的每个时间段的长度(例如,如果组 000001 有多个不健康状态的时间段,则返回每个时间段的长度)?
  • 如果你在我的查询中没有 sum 和 group by 的 t2.timestamp_value - t1.timestamp_value,你应该能够得到持续时间..

标签: apache-spark pyspark apache-spark-sql spark-dataframe pyspark-sql


【解决方案1】:

带有 spark-sql 解决方案的 pyspark 看起来像这样。

首先我们创建示例数据集。除了数据集,我们还根据时间戳生成分组和顺序的 row_number 字段分区。然后我们将生成的数据框注册为一个表格,比如table1

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import unix_timestamp

df = spark.createDataFrame([
('2017-01-01 02:03:11','healthy','000001'),
('2017-01-01 02:03:04','healthy','000001'),
('2017-01-01 02:03:03','unhealthy','000001'),
('2017-01-01 02:03:00','unhealthy','000001'),
('2017-01-01 02:02:58','healthy','000008'),
('2017-01-01 02:02:57','healthy','000008'),
('2017-01-01 02:02:55','unhealthy','000001'),
('2017-01-01 02:02:54','healthy','000001'),
('2017-01-01 02:02:50','healthy','000007'),
('2017-01-01 02:02:48','healthy','000004')
],['timestamp','state','group_id'])

df = df.withColumn('rownum', row_number().over(Window.partitionBy(df.group_id).orderBy(unix_timestamp(df.timestamp))))

df.registerTempTable("table1")

一旦数据帧被注册为表 (table1)。所需的数据可以使用 spark-sql 计算如下

>>> spark.sql("""
... SELECT t1.group_id,sum((t2.timestamp_value - t1.timestamp_value)) as duration
... FROM
... (SELECT unix_timestamp(timestamp) as timestamp_value,group_id,rownum FROM table1 WHERE state = 'unhealthy') t1
... LEFT JOIN
... (SELECT unix_timestamp(timestamp) as timestamp_value,group_id,rownum FROM table1) t2
... ON t1.group_id = t2.group_id
... AND t1.rownum = t2.rownum - 1
... group by t1.group_id
... """).show()
+--------+--------+
|group_id|duration|
+--------+--------+
|  000001|       9|
+--------+--------+

示例日期集仅包含 group_id 00001 的不健康数据。但此解决方案适用于其他 group_id 状态不佳的情况。

【讨论】:

    【解决方案2】:

    一种直接的方法(可能不是最佳的)是:

    1. 映射到[K,V],其中 GROUP_NUMBER 作为键 K
    2. 使用repartitionAndSortWithinPartitions,因此您将拥有同一分区中每个组的所有数据,并按TIMESTAMP 对它们进行排序。详细解释它的工作原理在这个答案中:Pyspark: Using repartitionAndSortWithinPartitions with multiple sort Critiria
    3. 最后使用mapPartitions 对单个分区中的排序数据进行迭代,这样您就可以轻松找到所需的答案。 (mapPartitions的解释:How does the pyspark mapPartitions function work?

    【讨论】:

      猜你喜欢
      • 2021-01-29
      • 1970-01-01
      • 2022-11-04
      • 2017-04-24
      • 2015-09-06
      • 2022-12-22
      • 1970-01-01
      • 2023-02-05
      • 1970-01-01
      相关资源
      最近更新 更多