【问题标题】:pyspark udf that counts when condition is satified满足条件时计数的pyspark udf
【发布时间】:2020-05-01 16:34:42
【问题描述】:

问题:我有一个 pyspark 数据框,我想按列汇总并计算每个 ID 的计数,满足特定条件。我的数据集如下所示:

my_dict = {'ID': {0: u'00319383',
  1: u'00337642',
  2: u'0346945',
  3: u'00400193',
  4: u'00405079',
  5: u'0426407',
  6: u'00445573',
  7: u'00485834',
  8: u'0493307',
  9: u'00501281'},
 'type_A': {0: u'A',
  1: u'A',
  2: u'A',
  3: u'A',
  4: u'A',
  5: u'A',
  6: u'A',
  7: u'A',
  8: u'A',
  9: u'A'},
 'type_B': {0: u'None',
  1: u'B',
  2: u'None',
  3: u'None',
  4: u'None',
  5: u'None',
  6: u'None',
  7: u'None',
  8: u'B',
  9: u'None'},
 'type_C': {0: u'C',
  1: u'C',
  2: u'C',
  3: u'C',
  4: u'C',
  5: u'C',
  6: u'C',
  7: u'C',
  8: u'C',
  9: u'C'},
 'type_D': {0: u'None',
  1: u'None',
  2: u'None',
  3: u'None',
  4: u'None',
  5: u'None',
  6: u'None',
  7: u'D',
  8: u'None',
  9: u'None'}}

目标是按 ID 计算产品的出现次数。我用 SQL 开发了一个解决方案,它可以满足我的需求:

spark.sql('''
            select total, count(contract_id) as freq
            from 
            (
                select id, (typeA + typeB + typeC + typeD) as total
                from
                    (
                        select id
                        , case when type_A = 'A' then 1 else 0 end as typeA
                        , case when type_B = 'B' then 1 else 0 end as typeB 
                        , case when type_C = 'C' then 1 else 0 end as typeC  
                        , case when type_D = 'D' then 1 else 0 end as typeD  
                        from df 
                    ) a
            ) b

            group by total

         ''').toPandas()

我怎么能用 python/pyspark 函数做到这一点?寻找解决此类问题的想法?

【问题讨论】:

  • 您心中的条件是什么? df.groupBy("total").count() 看起来确实很简单?
  • 像 type_A = 'A' 这样的条件。我基本上是想看看有人如何将该 sql 查询实现为 python/pyspark 函数。

标签: python-3.x pandas dataframe apache-spark pyspark


【解决方案1】:

好的,那么这应该可以解决问题:

from pyspark.sql.functions import *
df.select(
    (when(col("type_A") == lit("A"), lit(1)).otherwise(lit(0))+
    when(col("type_B") == lit("B"), lit(1)).otherwise(lit(0))+
    when(col("type_C") == lit("C"), lit(1)).otherwise(lit(0))+
    when(col("type_D") == lit("D"), lit(1)).otherwise(lit(0))).alias("total"), col("id")
).groupBy("total").agg(count(col("id")).alias("freq"))

【讨论】:

  • 那么如果你有10列,你必须重复聚合十次?每列一次?
  • 不,您可以根据需要在.agg(...) 中放置任意数量的聚合函数。 .groupBy(...)(例如df.groupBy("total", "someOtherCol",...)...)也是如此,请参阅我的答案 - 我在.agg(...)中添加了更多字母
  • 好吧,我试过你的想法,但我没有得到任何接近 SQL 所做的事情。
  • 对,因为您只得到最内部的查询。你想翻译整个事情吗?给我10分钟
  • 立即尝试。 IIUC 这就是全部(您在查询中混合 id-s - idcontract_id
猜你喜欢
  • 1970-01-01
  • 2019-07-15
  • 1970-01-01
  • 2023-03-31
  • 2019-04-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-09-06
相关资源
最近更新 更多