【问题标题】:Performing aggregation after binning a dataframe to a specific size将数据帧合并为特定大小后执行聚合
【发布时间】:2021-06-14 01:39:51
【问题描述】:

我有一个像这样的 pyspark 数据框:(在这个例子中我有 20 条记录)

+-----------------------+---------+
|TIME_STAMP             |RESULT   |
+-----------------------+---------+
|2020-08-31 00:00:08.395|80.0     |
|2020-08-31 00:03:50.422|27939.368|
|2020-08-31 00:04:27.586|80.0     |
|2020-08-31 00:06:01.476|27956.04 |
|2020-08-31 00:06:12.883|27958.179|
|2020-08-31 00:06:14.082|27939.168|
|2020-08-31 00:08:46.169|80.0     |
|2020-08-31 00:11:18.627|27940.127|
|2020-08-31 00:13:04.91 |80.0     |
|2020-08-31 00:13:18.746|27954.786|
|2020-08-31 00:13:38.569|27958.417|
|2020-08-31 00:13:51.633|27939.395|
|2020-08-31 00:17:23.901|80.0     |
|2020-08-31 00:18:47.043|27940.273|
|2020-08-31 00:20:36.029|27956.06 |
|2020-08-31 00:21:03.403|27958.464|
|2020-08-31 00:21:19.796|27939.9  |
|2020-08-31 00:21:42.546|80.0     |
|2020-08-31 00:26:01.334|80.0     |
|2020-08-31 00:27:53.582|27955.768|
+-----------------------+---------+

我已按 TIME_STAMP 对其进行了排序,并希望将数据框分成 5 个一组。并对每个组的 RESULT 列执行聚合 (mean)。所以前 5 条记录会组成一个组,接下来的 5 条记录会导致 4 个组。

预期输出:

bin     mean
5   16802.7174
10  16798.8162
15  22374.829
20  16802.8264

这里,bin 列来自记录 1-5mean 列是这 5 条记录的平均值,依此类推。

在我的研究中,似乎我可能不得不使用 monotonically_increasing_id() pyspark 函数,因为我有非常大的数据集并且可能导致 OOM,因此我试图避免使用该函数。

有没有办法实现这一点,而不必将整个数据集collect 发送给驱动程序?

作为一个附加的问题,在上面的例子中,记录总数(20)可以被 5 整除。但是说我有 19 条记录,有没有办法让 3 组 5 条记录和 4 条记录在最后一组?

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql binning


    【解决方案1】:
    1. 首先使用 row_number() 为每一行分配一个行号 (按时间戳排序)。无需分区。
    2. 接下来,通过取地板 ((row_number - 1)/5) 对行号进行分类。
    3. 终于变成了一个微不足道的群了

    您可以按原样运行并轻松适应您的数据的示例 SQL:

    SELECT floor((id - 1)/5), avg(value)
    FROM   (SELECT row_number() OVER (ORDER BY value) AS id,
                   value
            FROM   (SELECT Explode(Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210)) AS value) a)
    GROUP  BY 1 
    

    【讨论】:

    • 我在一个小数据集上试了一下,效果很好。我还没有在我的大型数据集上尝试它。我想知道row_number() 函数将如何影响火花并行。但我想我现在可以处理这个了。谢谢!
    猜你喜欢
    • 2018-03-04
    • 1970-01-01
    • 1970-01-01
    • 2021-06-22
    • 1970-01-01
    • 2015-08-12
    • 1970-01-01
    • 1970-01-01
    • 2021-08-07
    相关资源
    最近更新 更多