【问题标题】:Create a KPI with a timestamp and a groupby in pyspark在 pyspark 中创建带有时间戳和 groupby 的 KPI
【发布时间】:2021-11-20 13:45:17
【问题描述】:

我有一个包含日志的数据框,就像这个例子一样:

+------------+--------------------------+--------------------+-------------------+
|Source      |Error                     |          @timestamp| timestamp_rounded |
+------------+--------------------------+--------------------+-------------------+
|      A     |             No           |2021-09-12T14:07:...|2021-09-12 16:10:00|
|      B     |             No           |2021-09-12T12:49:...|2021-09-12 14:50:00|
|      C     |             No           |2021-09-12T12:59:...|2021-09-12 15:00:00|
|      C     |             No           |2021-09-12T12:58:...|2021-09-12 15:00:00|
|      B     |             No           |2021-09-12T14:22:...|2021-09-12 16:20:00|
|      A     |             Yes          |2021-09-12T14:22:...|2021-09-12 16:25:00|
|      B     |             No           |2021-09-12T13:00:...|2021-09-12 15:00:00|
|      B     |             No           |2021-09-12T12:57:...|2021-09-12 14:55:00|
|      B     |             No           |2021-09-12T12:57:...|2021-09-12 15:00:00|
|      B     |             No           |2021-09-12T12:58:...|2021-09-12 15:00:00|
|      C     |             No           |2021-09-12T12:54:...|2021-09-12 14:55:00|
|      A     |             Yes          |2021-09-12T14:17:...|2021-09-12 16:15:00|
|      B     |             No           |2021-09-12T12:43:...|2021-09-12 14:45:00|
|      A     |             No           |2021-09-12T12:45:...|2021-09-12 14:45:00|
|      D     |             No           |2021-09-12T12:57:...|2021-09-12 14:55:00|
|      A     |             No           |2021-09-12T13:00:...|2021-09-12 15:00:00|
|      C     |             No           |2021-09-12T12:47:...|2021-09-12 14:45:00|
|      A     |             No           |2021-09-12T12:57:...|2021-09-12 15:00:00|
|      A     |             No           |2021-09-12T13:00:...|2021-09-12 15:00:00|
|      A     |             No           |2021-09-12T14:23:...|2021-09-12 16:25:00|
+------------+--------------------------+--------------------+-------------------+
only showing top 20 rows

我的数据框有数百万条日志,这并不重要。

我想计算每个来源的错误率,每 5 分钟。我已经搜索过有关此类转换的文档(groupby with partition ? double groupby ?...),但没有找到很多信息。

我可以用 Yes ==> 1 和 No ==> 0 获得一个新列,然后用 gorupby{avg: foo} 获得每个来源的平均值,以获得每个来源的错误率,但我想要它每 5 分钟一次(参见 col 'timestamp_rounded')

结果会是这样的:

+-------------------+------------+--------------+-------------+------------+
|timestamp_rounded  |Error_rate_A| Error_rate_B | Error_rate_C|Error_rate_D|
+-------------------+------------+--------------+-------------+------------+
|2021-09-12 16:10:00|       0    |       0.2    |       0     |       0.2  |
|2021-09-12 16:15:00|       0.1  |       0.3    |       0     |       0    |
|2021-09-12 16:20:00|       0    |       0.2    |       0     |       0    |
|2021-09-12 16:25:00|       0    |       0.2    |       0     |       0    |
|2021-09-12 16:30:00|       0    |       0.2    |       0     |       0    |
|2021-09-12 16:35:00|       0.2  |       0.2    |       0     |       0    |
|2021-09-12 16:40:00|       0.3  |       0.2    |       0     |       0.2  |
|2021-09-12 16:45:00|       0.4  |       0.3    |       0     |       0    |

etc...



来源可能非常多(我的示例有 4 个,但可能有数千个来源)

如果您需要更多信息,请告诉我。 非常感谢!

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql kpi


    【解决方案1】:

    假设您的数据可以在名为logs 的数据框中访问,您可以通过timestamp_rounded 上的初始组然后以source 为中心来实现此目的,以将您的汇总错误率转换为每个source 的列的行每个timestamp_rounded 的错误率。最后,您可以将缺失的错误率值替换为0.0

    在执行这些转换之前,我们可以将您的 Yes/No 值转换为 1/0 以简化聚合/均值并将 source 列值重命名为前缀 Error_rate_ 以实现透视后所需的列名。

    注意。我在问题的示例数据中更改了您的 1 条记录

    |      A     |             No           |2021-09-12T12:57:...|2021-09-12 15:00:00|
    

    |      A     |             Yes           |2021-09-12T12:57:...|2021-09-12 15:00:00|
    

    接收更多变化的数据。因此,您的数据框在初始聚合后将如下所示。

    您可以使用以下方法实现此目的:

    output_df =(
        logs.withColumn("Error",F.when(F.col("Error")=="Yes",1).otherwise(0))
            .withColumn("Source",F.concat(F.lit("Error_rate_"),F.col("Source")))
            .groupBy("timestamp_rounded")
            .pivot("Source")
            .agg(
                F.round(F.mean("Error"),2).alias("Error_rate")
            )
            .na.fill(0.0)
    )
    

    输出

    +-------------------+------------+------------+------------+------------+
    |timestamp_rounded  |Error_rate_A|Error_rate_B|Error_rate_C|Error_rate_D|
    +-------------------+------------+------------+------------+------------+
    |2021-09-12 14:50:00|0.0         |0.0         |0.0         |0.0         |
    |2021-09-12 16:15:00|1.0         |0.0         |0.0         |0.0         |
    |2021-09-12 16:20:00|0.0         |0.0         |0.0         |0.0         |
    |2021-09-12 16:25:00|0.5         |0.0         |0.0         |0.0         |
    |2021-09-12 14:55:00|0.0         |0.0         |0.0         |0.0         |
    |2021-09-12 14:45:00|0.0         |0.0         |0.0         |0.0         |
    |2021-09-12 16:10:00|0.0         |0.0         |0.0         |0.0         |
    |2021-09-12 15:00:00|0.33        |0.0         |0.0         |0.0         |
    +-------------------+------------+------------+------------+------------+
    

    注意。上面的输出没有排序,可以使用.orderBy轻松排序

    让我知道这是否适合你。

    【讨论】:

    • 像魅力一样工作!我找到了一种解决方法,但它对我来说太“熊猫”了(可能会在评论中添加它以供人们查看)。这更“pysparky”并且可能更有效。非常感谢
    猜你喜欢
    • 2023-02-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-31
    • 1970-01-01
    相关资源
    最近更新 更多