【问题标题】:Creating Total and percentage of total columns in Pyspark在 Pyspark 中创建总计和总列的百分比
【发布时间】:2021-08-19 21:22:52
【问题描述】:

这是我的测试数据

test = spark.createDataFrame([
    ("2018-06-03",2, 4, 4 ),
    ("2018-06-04",4, 3, 3 ),
    ( "2018-06-03",8, 1, 1),
    ("2018-06-01",3, 1, 1),
    ( "2018-06-05", 3, 2, 0),
])\
  .toDF( "transactiondate", "SalesA", "SalesB","SalesC")
test.show()

我想添加一个按行总计列和对应于每个销售类别(A、B 和 C)的总计列的百分比

期望的输出:

+---------------+------+------+------+----------+------+------+------+
|transactiondate|SalesA|SalesB|SalesC|TotalSales|Perc_A|Perc_B|Perc_C|
+---------------+------+------+------+----------+------+------+------+
|     2018-06-03|     2|     4|     4|        10|   0.2|   0.4|   0.4|
|     2018-06-04|     4|     3|     3|        10|   0.4|   0.3|   0.3|
|     2018-06-03|     8|     1|     1|        10|   0.8|   0.1|   0.1|
|     2018-06-01|     3|     1|     1|         5|   0.6|   0.2|   0.2|
|     2018-06-05|     3|     2|     0|         5|   0.6|   0.4|   0.0|
+---------------+------+------+------+----------+------+------+------+

如何在 pyspark 中做到这一点?

编辑:即使我添加更多项目,我也希望代码具有适应性,即如果我还有一个列 salesD,代码应该创建总计和百分比列。 (即不应该对列进行硬编码)

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您可以使用selectExpr 并为每个添加的列执行简单的算术 SQL 操作

    test = test.selectExpr("*", 
                           "SalesA+SalesB+SalesC as TotalSales",
                           "SalesA/(SalesA+SalesB+SalesC) as Perc_A",
                           "SalesB/(SalesA+SalesB+SalesC) as Perc_B",
                           "SalesC/(SalesA+SalesB+SalesC) as Perc_C"
                           )
    

    或使用更灵活的解决方案

    from pyspark.sql.functions import col, expr
    
    # columns to be included in TotalSales calculation
    cols = ['SalesA', 'SalesB', 'SalesC']
    
    test = (test
            .withColumn('TotalSales', expr('+'.join(cols)))
            .select(col('*'), 
                    *[expr('{0}/TotalSales {1}'.format(c,'Perc_'+c)) for c in cols]))
    

    【讨论】:

    • 感谢您的回复。我已经编辑了这个问题。我正在寻找一种更具可扩展性的解决方案来添加更多项目并仍然获得所需的输出。很抱歉造成混乱
    • 尽管如此,您仍然需要明确选择哪些列应包含在计算中,例如将它们放在一个列表中(或者如果这些列都不是第一列,则使用test.columns[1:])。看看修改后的答案是否OK。
    • 这行得通。我可以使用 cols=test.columns[1:] 仅选择计算中包含的那些。感谢您的回答。
    【解决方案2】:

    一种选择是使用多个withColumn 语句

    import pyspark.sql.functions as F
    
    test\
      .withColumn('TotalSales', F.col('SalesA') + F.col('SalesB') + F.col('SalesC'))\
      .withColumn('Perc_A', F.col('SalesA') / F.col('TotalSales'))\
      .withColumn('Perc_B', F.col('SalesB') / F.col('TotalSales'))\
      .withColumn('Perc_C', F.col('SalesC') / F.col('TotalSales'))
    

    【讨论】:

    • @Ric S 感谢您的回答。我编辑了这个问题。我希望解决方案更具适应性(没有硬编码)。很抱歉造成混乱。
    【解决方案3】:

    试试这个 spark-sql 解决方案

    test.createOrReplaceTempView("sales_table")
    
    sales=[ x for x in test.columns if x.upper().startswith("SALES") ]
    sales2="+".join(sales)
    print(str(sales)) # ['SalesA', 'SalesB', 'SalesC']
    
    per_sales=[ x +"/TotalSales as " + "Perc_" +x  for x in sales ]
    per_sales2=",".join(per_sales)
    print(str(per_sales)) # ['SalesA/TotalSales as Perc_SalesA', 'SalesB/TotalSales as Perc_SalesB', 'SalesC/TotalSales as Perc_SalesC']
    
    spark.sql(f"""
    with t1 ( select *, {sales2} TotalSales from sales_table )
    select *, {per_sales2} from t1 
    """).show(truncate=False)
    
    +---------------+------+------+------+----------+-----------+-----------+-----------+
    |transactiondate|SalesA|SalesB|SalesC|TotalSales|Perc_SalesA|Perc_SalesB|Perc_SalesC|
    +---------------+------+------+------+----------+-----------+-----------+-----------+
    |2018-06-03     |2     |4     |4     |10        |0.2        |0.4        |0.4        |
    |2018-06-04     |4     |3     |3     |10        |0.4        |0.3        |0.3        |
    |2018-06-03     |8     |1     |1     |10        |0.8        |0.1        |0.1        |
    |2018-06-01     |3     |1     |1     |5         |0.6        |0.2        |0.2        |
    |2018-06-05     |3     |2     |0     |5         |0.6        |0.4        |0.0        |
    +---------------+------+------+------+----------+-----------+-----------+-----------+
    

    您还可以使用 aggregate() 高阶函数对 sales* 列求和。但为此,列必须是整数/双精度类型,不能长。

    test2=test.withColumn("SalesA",expr("cast(salesa as int)"))\
          .withColumn("SalesB",expr("cast(salesb as int)"))\
          .withColumn("SalesC",expr("cast(salesc as int)"))
    test2.createOrReplaceTempView("sales_table2")
    
    sales3=",".join(sales)  # just join the sales columns with comma
    
    spark.sql(f"""
    with t1 ( select *,  aggregate(array({sales3}),0,(acc,x) -> acc+x) TotalSales from sales_table2 )
    select *, {per_sales2} from t1 
    """).show(truncate=False)
    
    +---------------+------+------+------+----------+-----------+-----------+-----------+
    |transactiondate|SalesA|SalesB|SalesC|TotalSales|Perc_SalesA|Perc_SalesB|Perc_SalesC|
    +---------------+------+------+------+----------+-----------+-----------+-----------+
    |2018-06-03     |2     |4     |4     |10        |0.2        |0.4        |0.4        |
    |2018-06-04     |4     |3     |3     |10        |0.4        |0.3        |0.3        |
    |2018-06-03     |8     |1     |1     |10        |0.8        |0.1        |0.1        |
    |2018-06-01     |3     |1     |1     |5         |0.6        |0.2        |0.2        |
    |2018-06-05     |3     |2     |0     |5         |0.6        |0.4        |0.0        |
    +---------------+------+------+------+----------+-----------+-----------+-----------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-01-09
      • 2021-03-30
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多