【问题标题】:How can I get same result in both Pandas and Pyspark during groupby().agg() operation for categorical columns?在对分类列进行 groupby().agg() 操作期间,如何在 Pandas 和 Pyspark 中获得相同的结果?
【发布时间】:2020-09-08 11:51:46
【问题描述】:

我同时使用 groupby() 和 agg() 函数来查找组结果。我同时使用了 pandas 和 pyspark。 Pandas 结果和 pyspark 结果不同。似乎 pyspark 结果不正确。我还注意到,如果我不将列更改为类别类型 pandas 和 pyspark 结果是相似的。如何修复 pyspark 代码,以便在两种情况下得到相同的结果?

这是我尝试过的代码:

import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


pivot_agg_func= {'Max_Speed': 'min', 'Delay': 'sum'}
cat_col_all = ['Date', 'TrainID', 'Traintype']

df = pd.read_csv('groupby_example.csv', index_col=0)

print('Original DF')
print(df)

for col in cat_col_all:
    df[col] = df[col].astype('category')

df_groupby = df.groupby(cat_col_all, as_index=True).agg(pivot_agg_func).reset_index()
print('Pandas group by result')
print(df_groupby)

# reading spark dataframe
df_sp = spark.read.option("header", True).csv('groupby_example.csv').drop('_c0')
df = df_sp.groupby(*cat_col_all).agg(pivot_agg_func)
print('Pyspark group by result')
df.show()

这是我的控制台输出:

Original DF

      TrainID Traintype  Max_Speed  Delay  Date
2017   1000.0        IC      280.0   11.0  2017
2017   1000.0       ICE      280.0    2.0  2017
2018   1002.0        IC        NaN    NaN  2018
2019   1002.0        IC      260.0    3.0  2019
2019   1002.0        IC      220.0    9.0  2019
2019   1000.0       ICE      270.0    NaN  2019
2020   1000.0        IC      280.0    3.0  2020
2020   1000.0        IC      260.0   55.0  2020
2020   1002.0       ICE      280.0    NaN  2020
2020   1000.0        IC      220.0    4.0  2020

Pandas group by and agg result:

    Date TrainID Traintype  Max_Speed  Delay
0   2017  1000.0        IC      280.0   11.0
1   2017  1000.0       ICE      280.0    2.0
2   2017  1002.0        IC        NaN    NaN
3   2017  1002.0       ICE        NaN    NaN
4   2018  1000.0        IC        NaN    NaN
5   2018  1000.0       ICE        NaN    NaN
6   2018  1002.0        IC        NaN    0.0
7   2018  1002.0       ICE        NaN    NaN
8   2019  1000.0        IC        NaN    NaN
9   2019  1000.0       ICE      270.0    0.0
10  2019  1002.0        IC      220.0   12.0
11  2019  1002.0       ICE        NaN    NaN
12  2020  1000.0        IC      220.0   62.0
13  2020  1000.0       ICE        NaN    NaN
14  2020  1002.0        IC        NaN    NaN
15  2020  1002.0       ICE      280.0    0.0

Pyspark group by and agg result:

+----+-------+---------+--------------+----------+
|Date|TrainID|Traintype|min(Max_Speed)|sum(Delay)|
+----+-------+---------+--------------+----------+
|2019| 1000.0|      ICE|         270.0|      null|
|2017| 1000.0|      ICE|         280.0|       2.0|
|2020| 1000.0|       IC|         220.0|      62.0|
|2018| 1002.0|       IC|          null|      null|
|2017| 1000.0|       IC|         280.0|      11.0|
|2020| 1002.0|      ICE|         280.0|      null|
|2019| 1002.0|       IC|         220.0|      12.0|
+----+-------+---------+--------------+----------+

【问题讨论】:

  • 我对熊猫的行为与您的行为不同。我的熊猫输出看起来像你的火花输出(我的火花就是你的火花)
  • 如果不将列转换为类别类型,那么pandas和spark结果是一样的。
  • 即使有类别的东西......完全尝试过你的代码,复制粘贴它
  • 阅读您的评论后,我在另一台笔记本电脑上再次检查了上面的代码,我也得到了不同的输出。我不知道为什么你没有像我一样得到输出。我正在使用熊猫版本 1.0.3。

标签: pandas dataframe pyspark group-by aggregate


【解决方案1】:

我有一个肮脏的解决方案,但它可能无法正常工作,具体取决于您拥有的数据量。

from functools import reduce

from pyspark.sql import functions as F


df_sp.join(
    reduce(
        lambda a, b: a.crossJoin(b),
        [F.broadcast(df_sp.select(col).distinct()) for col in cat_col_all],
    ),
    how="full",
    on=cat_col_all,
).groupby(cat_col_all).agg(pivot_agg_func).show()

+----+-------+---------+--------------+----------+
|Date|TrainID|Traintype|min(Max_Speed)|sum(Delay)|
+----+-------+---------+--------------+----------+
|2019| 1000.0|       IC|          null|      null|
|2018| 1000.0|      ICE|          null|      null|
|2018| 1002.0|       IC|          null|      null|
|2017| 1002.0|      ICE|          null|      null|
|2020| 1002.0|       IC|          null|      null|
|2017| 1000.0|      ICE|         280.0|       2.0|
|2017| 1000.0|       IC|         280.0|      11.0|
|2017| 1002.0|       IC|          null|      null|
|2019| 1000.0|      ICE|         270.0|      null|
|2020| 1002.0|      ICE|         280.0|      null|
|2018| 1002.0|      ICE|          null|      null|
|2020| 1000.0|      ICE|          null|      null|
|2019| 1002.0|      ICE|          null|      null|
|2020| 1000.0|       IC|         220.0|      62.0|
|2019| 1002.0|       IC|         220.0|      12.0|
|2018| 1000.0|       IC|          null|      null|
+----+-------+---------+--------------+----------+

【讨论】:

  • 我会用大数据来测试一下。感谢您的解决方案。
猜你喜欢
  • 2019-08-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-02-01
  • 2015-09-09
  • 2018-02-24
  • 1970-01-01
  • 2021-06-21
相关资源
最近更新 更多