【问题标题】:Reversing Group By in PySpark在 PySpark 中反转 Group By
【发布时间】:2020-09-21 17:19:44
【问题描述】:

我不确定问题本身的正确性。我为 SQL 找到的解决方案不适用于 Hive SQL 或禁止递归。 因此,我想在 Pyspark 中解决这个问题,并且需要一个解决方案,或者至少是想法,如何解决这个问题。

我有一个如下所示的原始表格:

+--------+----------+
|customer|nr_tickets|
+--------+----------+
|       A|         3|
|       B|         1|
|       C|         2|
+--------+----------+

这就是我想要的表格:

+--------+
|customer|
+--------+
|       A|
|       A|
|       A|
|       B|
|       C|
|       C|
+--------+

你有什么建议吗?

非常感谢您!

【问题讨论】:

    标签: python apache-spark pyspark pivot unpivot


    【解决方案1】:

    对于 Spark2.4+,请使用 array_repeatexplode

    from pyspark.sql import functions as F
    
    df.selectExpr("""explode(array_repeat(customer,cast(nr_tickets as int))) as customer""").show()
    
    #+--------+
    #|customer|
    #+--------+
    #|       A|
    #|       A|
    #|       A|
    #|       B|
    #|       C|
    #|       C|
    #+--------+
    

    【讨论】:

    • 您好,感谢您的解决方案。我已经尝试过了,但是它说“array_repeat”未知:未定义的函数:“array_repeat”。该函数既不是注册的临时函数,也不是在数据库“默认”中注册的永久函数。不幸的是,检查我的 Sparkversion 也不起作用。但它应该在 2 以上。
    • 你的 spark 版本小于 2.4。这种方法对你不起作用
    • 愚蠢的问题,但我如何找出我的火花版本? 'sc.version' 不起作用
    【解决方案2】:

    您可以通过遍历行(组)来创建一个新的数据框。

    使用range(int(a["nr_tickets"])) 为该客户重复nr_ticketscustomer (Row(customer=a["customer"])) 的行列表

    df_list + [Row(customer=a["customer"]) for T in range(int(a["nr_tickets"]))]
    

    您可以将它们存储并附加到一个列表中,然后用它创建一个数据框。

     df= spark.createDataFrame(df_list)
    

    总体而言,

    from pyspark.sql import Row
    
    df_list = []
    for a in df.select(["customer","nr_tickets"]).collect():
      df_list = df_list + [Row(customer=a["customer"]) for T in range(int(a["nr_tickets"]))]
    df= spark.createDataFrame(df_list)
    df.show()
    

    你也可以用列表理解来做到这一点

    from pyspark.sql import Row
    from functools import reduce #python 3
    
    df_list = [
    [Row(customer=a["customer"])]*int(a["nr_tickets"]) 
    for a in df.select(["customer","nr_tickets"]).collect() 
     ]
    
    df= spark.createDataFrame(reduce(lambda x,y: x+y,df_list))
    df.show()
    

    生产

    +--------+
    |customer|
    +--------+
    |       A|
    |       A|
    |       A|
    |       B|
    |       C|
    |       C|
    +--------+
    

    【讨论】:

    • 您好,感谢您的帮助。您的代码的“整体”版本生成的列表确实是正确的。但是转换为 df 不知何故对我不起作用,并在我 df.show(): 时抛出此错误
    • Py4JJavaError:调用 o153.showString 时出错。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 6.0 中的任务 0 失败 4 次,最近一次失败:阶段 6.0 中丢失任务 0.3(TID 12,sdeb-hdpdn-q3014a.sys.schwarz,执行程序2):org.apache.spark.api.python.PythonException:回溯(最近一次调用最后):文件“/hadoop/disk10/hadoop/yarn/local/usercache/bnem2103/appcache/application_1598371445148_44504/container_e247_1598371445148_44504_01_000003/pyspark.zip/ pyspark/worker.py",第 125 行,在 main ("%d.%d" % sys.version_info[:2], version))
    • 异常:worker 中的 Python 3.6 版本与驱动程序 3.8 中的版本不同,PySpark 无法使用不同的次要版本运行。请检查环境变量 PYSPARK_PYTHON 和 PYSPARK_DRIVER_PYTHON 是否设置正确。
    • 我在使用“列表理解”版本时遇到相同/类似的错误。
    • 你在哪里运行 spark?
    【解决方案3】:

    同时我自己也找到了解决办法:

    for i in range(1, max_nr_of_tickets):
        table = table.filter(F.col('nr_tickets') >= 1).union(test)
        table = table.withColumn('nr_tickets', F.col('nr_tickets') - 1)
    

    解释:DF的“table”和“test”开头是一样的。 所以“max_nr_of_tickets”只是最高的“nr_tickets”。有用。 我只是在为最大数字的格式苦苦挣扎:

    max_nr_of_tickets = df.select(F.max('nr_tickets')).collect()
    

    我不能在 for 循环的范围内使用结果,因为它是一个列表。所以我手动输入最高的数字。 有什么想法可以让 max_nr_of_tickets 转换为正确的格式,以便循环范围接受它?

    谢谢

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-10-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多