【问题标题】:Dataframe pyspark to dictionary after groupby operations在 groupby 操作后将 Dataframe pyspark 转换为字典
【发布时间】:2020-04-09 10:18:05
【问题描述】:

我希望在 pyspark 中解决以下问题。

我有一个庞大的数据集,我想在其上计算必须存储在字典中的基本统计数据。

下表大致描述了数据集,尽管实际上它要大得多。

我感兴趣的目标数据的数据类型要么是字符串要么是双精度。

|  key1  | ... | key2   | key3   | value1 | ... | value2 | 
|--------|-----|--------|--------|--------|-----|--------| 
| string | ... | string | string | double | ... | double | 

我必须对 [key1, key2, key3] 执行 groupBy 并计算 value1 和 value2 的均值和标准差。

然后,我想将所有结果存储在具有以下结构的字典中:

dict {[key1, key2, key3]  :  [avg1, sd1, avg2, sd2]}

我最初的方法是先执行计算,例如:

sparkdf.groupBy(keys).agg(avg(v1), stddev(v1), avg(v2), stddev(v2)) 

然后将生成的 sparkdf 转换为 pandasdf 并遍历行以构建字典。但对于大型数据集,我非常怀疑这种方法是否合理。

我了解到一种方法是使用地图和 UDF。

例如:

sparkdf.groupBy(keys)
       .agg(collect_list(create_map([avg1, sd1, avg2, sd2]).alias('map')))

map = udf(lambda maps: {key:r[key] for r in map for key in r}, 
                 MapType( StringType(), DoubleType())

dict = do something with the map

这种解决方案就是本主题 (Dataframe pyspark to dict) 中讨论的内容。

但是,就我而言,我没有需要使用 groupBy 操作计算的值 [avg1, sd1, avg2, sd2]。由于使用了 collect_list 和 create_map 之类的方法,因此我不能将 avg(v1) 放入其中,否则会返回错误。

谁能建议你如何构建我需要的字典?

非常感谢!

【问题讨论】:

  • 方法 toLocalIterator() 并使用 row.asDict() 将 Row 对象转换为 dict 将对您的情况有所帮助。

标签: python dictionary pyspark


【解决方案1】:

假设您的输出字典足够小以适合您的主节点内存,并且假设 keys 是关键字段名称的列表,这应该可以工作(虽然我没有运行它,所以可能有错别字):

aggregatedSparkDf = sparkdf.groupBy(keys).agg(avg(v1), stddev(v1), avg(v2), stddev(v2))
aggregatedPandasDf = aggregatedSparkDf.toPandas().set_index(keys)
aggregatedPandasSeriesOfLists = aggregatedPandasDf.apply(list, result_type='reduce', axis=1)
aggregatedDict = aggregatedPandasSeriesOfLists.to_dict()

这应该会给你一个 dict {(key1,key2,key3}:[avg1,std1,avg2,std2])。

但是你为什么要转换成字典呢?很可能您最好使用 pandas DF。

【讨论】:

    猜你喜欢
    • 2022-12-01
    • 2018-01-01
    • 2019-02-06
    • 2016-10-16
    • 2018-03-31
    • 2019-01-21
    • 2017-12-26
    • 2014-12-30
    相关资源
    最近更新 更多