【问题标题】:Pyspark: reduceByKey multiple columns but independentlyPyspark:reduceByKey 多列但独立
【发布时间】:2018-01-28 02:51:06
【问题描述】:

我的数据由多列组成,看起来像这样:

我想分别对每一列的数据进行分组并计算每个元素的出现次数,我可以这样做:

df.groupBy("Col-1").count() 

df.groupBy("Col-2").count()

df.groupBy("Col-n").count()

但是,如果有 1000 列,我会很耗时。所以我试图找到另一种方法:

目前我所做的事情:

def mapFxn1(x):
    vals=[1] * len(x)
    c=tuple(zip(list(x), vals))
    return c

df_map=df.rdd.map(lambda x: mapFxn1(x))

mapFxn1 获取每一行并将其转换为元组的元组:所以基本上第一行看起来像这样:((10, 1), (2, 1), (x, 1))

我只是想知道如何在 df_map 上使用 reduceByKey 和 lambda x,y: x + y 来实现对每一列的分组并在单个步骤中计算每一列中元素的出现次数。

提前谢谢你

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    cube:

    df = spark.createDataFrame(
        [(3, 2), (2, 1), (3, 8), (3, 9), (4, 1)]
    ).toDF("col1", "col2")
    df.createOrReplaceTempView("df")
    
    spark.sql("""SELECT col1, col2, COUNT(*) 
                 FROM df GROUP BY col1, col2 GROUPING SETS(col1, col2)"""
    ).show()
    
    # +----+----+--------+
    # |col1|col2|count(1)|
    # +----+----+--------+
    # |null|   9|       1|
    # |   3|null|       3|
    # |null|   1|       2|
    # |null|   2|       1|
    # |   2|null|       1|
    # |null|   8|       1|
    # |   4|null|       1|
    # +----+----+--------+
    

    melt:

    melt(df, [], df.columns).groupBy("variable", "value").count().show()
    
    # +--------+-----+-----+
    # |variable|value|count|
    # +--------+-----+-----+
    # |    col2|    8|    1|
    # |    col1|    3|    3|
    # |    col2|    2|    1|
    # |    col1|    2|    1|
    # |    col2|    9|    1|
    # |    col1|    4|    1|
    # |    col2|    1|    2|
    # +--------+-----+-----+
    

    reduceByKey

    from operator import add
    
    counts = (df.rdd
       .flatMap(lambda x: x.asDict().items())
       .map(lambda x: (x, 1))
       .reduceByKey(add))
    
    counts.toLocalIterator():
        print(x)
    #     
    # (('col1', 2), 1)
    # (('col2', 8), 1)
    # (('col2', 1), 2)
    # (('col2', 9), 1)
    # (('col1', 4), 1)
    # (('col1', 3), 3)
    # (('col2', 2), 1)
    

    【讨论】:

    • 非常感谢您的快速回复。它似乎工作。我对 reduceByKey 方法特别感兴趣。虽然它按预期工作。性能是问题。我有 50M 行和 60 列。在我的 8 核机器上用了差不多 10 多分钟。但是,如果我使用 python 和 numpy 执行相同的任务(只需一次通过文件 10K 行并应用 numpy hist 函数,然后更新计数)在单核上花费的时间不到 3 分钟。这是正常行为吗?
    猜你喜欢
    • 1970-01-01
    • 2022-06-09
    • 1970-01-01
    • 2013-10-21
    • 2021-01-21
    • 1970-01-01
    • 2015-05-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多