【问题标题】:Calculate rolling sum of array in PySpark and save as dict?计算 PySpark 中数组的滚动总和并保存为 dict?
【发布时间】:2020-06-09 19:54:03
【问题描述】:

给定这样的输入:

timestamp     vars 
2             [1,2,3]
2             [1,2,4]
3             [1,2]
4             [1,3]
5             [1,3]

我需要对每个索引进行滚动计数。尝试将数组扩展为一个热编码([1,2,3,5] -> [0,1,1,1,0,1])并添加,但这可能会变得任意大(> 100万),所以我想把它作为一个字典来维护。像下面的东西。任何指针将不胜感激。

timestamp     vars 
2             {1:1, 2:1, 3:1}
2             {1:2, 2:2, 3:1, 4:1}
3             {1:3, 2:3, 3:1, 4:1}
4             {1:4, 2:3, 3:2, 4:1}
5             {1:5, 2:3, 3:3, 4:1}

谢谢!

【问题讨论】:

  • 我建议您维护一个哈希图并在那里增加计数并在创建字典时获取它。

标签: pyspark


【解决方案1】:

示例数据框:

+---+------------+
| ID|         arr|
+---+------------+
|  1|         [0]|
|  2|      [0, 1]|
|  3|   [0, 1, 2]|
|  4|[0, 1, 2, 3]|
|  1|         [0]|
|  1|         [0]|
|  3|   [0, 1, 2]|
|  0|          []|
+---+------------+

使用以下使用收集计数器的函数:

def arr_operation(arr):
   from collections import Counter
   return dict(Counter(arr))

以下列方式为arr_operation 函数创建 UDF:

udf_dist_count =  udf(arr_operation,MapType(IntegerType(), IntegerType()))

并调用创建新列:

final_df = df.withColumn("Dict",udf_dist_count("arr"))

结果会是这样的:

+---+------------+--------------------------------+
|ID |arr         |Dict                            |
+---+------------+--------------------------------+
|1  |[0]         |[0 -> 1]                        |
|2  |[0, 1]      |[0 -> 1, 1 -> 1]                |
|3  |[0, 1, 2]   |[0 -> 1, 1 -> 1, 2 -> 1]        |
|4  |[0, 1, 2, 3]|[0 -> 1, 1 -> 1, 2 -> 1, 3 -> 1]|
|1  |[0]         |[0 -> 1]                        |
|1  |[0]         |[0 -> 1]                        |
|3  |[0, 1, 2]   |[0 -> 1, 1 -> 1, 2 -> 1]        |
|0  |[]          |[]                              |
+---+------------+--------------------------------+

关于collection Counter在分布式环境中速度慢的说法已经在Why is Collections.counter so slow?这个问题的答案中得到了很好的解释

【讨论】:

    【解决方案2】:

    我会建议 Counter 来自 collections

    In [1]: from collections import Counter                                                                                                                             
    
    In [2]: count = Counter()                                                                                                                                           
    
    In [3]: count.update([1,2,4])                                                                                                                                       
    
    In [4]: count                                                                                                                                                       
    Out[4]: Counter({1: 1, 2: 1, 4: 1})
    
    In [5]: count.update([1,2,3])                                                                                                                                       
    
    In [6]: count                                                                                                                                                       
    Out[6]: Counter({1: 2, 2: 2, 4: 1, 3: 1})
    
    In [7]: count.update([2,3,5])                                                                                                                                       
    
    In [8]: count                                                                                                                                                       
    Out[8]: Counter({1: 2, 2: 3, 4: 1, 3: 2, 5: 1})
    

    【讨论】:

    • 这不适用于 pyspark。
    • @salt-die,你对 PySpark 有什么建议吗?
    • 我的建议是使用计数器。 PySpark 不包含 python 的标准库吗?
    • @salt-die 是的,它确实有库,但它不能以分布式方式工作。您的建议将在 UDF 中使用,它们对于大数据任务非常慢
    • 如果您不想弄乱可变对象,可以通过添加计数来创建新计数器。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-09
    • 2018-01-27
    • 1970-01-01
    • 2019-11-15
    • 1970-01-01
    相关资源
    最近更新 更多