【问题标题】:groupby and join vs window in pysparkgroupby 并在 pyspark 中加入 vs 窗口
【发布时间】:2019-02-04 14:53:45
【问题描述】:

我在 pyspark 中有一个数据框,它有数亿行(这里是它的一个虚拟样本):

import datetime
import pyspark.sql.functions as F
from pyspark.sql import Window,Row
from pyspark.sql.functions import col
from pyspark.sql.functions import month, mean,sum,year,avg
from pyspark.sql.functions import concat_ws,to_date,unix_timestamp,datediff,lit
from pyspark.sql.functions import when,min,max,desc,row_number,col

dg = sqlContext.createDataFrame(sc.parallelize([
Row(cycle_dt=datetime.datetime(1984, 5, 2, 0, 0), network_id=4,norm_strength=0.5, spend_active_ind=1,net_spending_amt=0,cust_xref_id=10),
Row(cycle_dt=datetime.datetime(1984, 6, 2, 0, 0), network_id=4,norm_strength=0.5, spend_active_ind=1,net_spending_amt=2,cust_xref_id=11),
Row(cycle_dt=datetime.datetime(1984, 7, 2, 0, 0), network_id=4,norm_strength=0.5, spend_active_ind=1,net_spending_amt=2,cust_xref_id=12),
Row(cycle_dt=datetime.datetime(1984, 4, 2, 0, 0), network_id=4,norm_strength=0.5, spend_active_ind=1,net_spending_amt=2,cust_xref_id=13),
Row(cycle_dt=datetime.datetime(1983,11, 5, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=8,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1983,12, 2, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=2,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 1, 3, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=1,net_spending_amt=15,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 3, 2, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=7,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 4, 3, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=1,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 5, 2, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=1,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984,10, 6, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=1,net_spending_amt=10,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 1, 7, 0, 0), network_id=1,norm_strength=0.4, spend_active_ind=0,net_spending_amt=8,cust_xref_id=2 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=1,norm_strength=0.4, spend_active_ind=0,net_spending_amt=3,cust_xref_id=2 ),
Row(cycle_dt=datetime.datetime(1984, 2, 7, 0, 0), network_id=1,norm_strength=0.4, spend_active_ind=1,net_spending_amt=5,cust_xref_id=2 ),
Row(cycle_dt=datetime.datetime(1985, 2, 7, 0, 0), network_id=1,norm_strength=0.3, spend_active_ind=1,net_spending_amt=8,cust_xref_id=3 ),
Row(cycle_dt=datetime.datetime(1985, 3, 7, 0, 0), network_id=1,norm_strength=0.3, spend_active_ind=0,net_spending_amt=2,cust_xref_id=3 ),
Row(cycle_dt=datetime.datetime(1985, 4, 7, 0, 0), network_id=1,norm_strength=0.3, spend_active_ind=1,net_spending_amt=1,cust_xref_id=3 ),
Row(cycle_dt=datetime.datetime(1985, 4, 8, 0, 0), network_id=1,norm_strength=0.3, spend_active_ind=1,net_spending_amt=9,cust_xref_id=3 ),
Row(cycle_dt=datetime.datetime(1984, 4, 2, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=3,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 4, 3, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=2,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=5,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 3, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=1,net_spending_amt=6,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 3, 2, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=2,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 5, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=9,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 6, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=1,net_spending_amt=1,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 7, 0, 0), network_id=2,norm_strength=0.4, spend_active_ind=0,net_spending_amt=7,cust_xref_id=5 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=2,norm_strength=0.4, spend_active_ind=0,net_spending_amt=8,cust_xref_id=5 ),
Row(cycle_dt=datetime.datetime(1984, 2, 7, 0, 0), network_id=2,norm_strength=0.4, spend_active_ind=1,net_spending_amt=3,cust_xref_id=5 ),
Row(cycle_dt=datetime.datetime(1985, 2, 7, 0, 0), network_id=2,norm_strength=0.6, spend_active_ind=1,net_spending_amt=6,cust_xref_id=6 ),
Row(cycle_dt=datetime.datetime(1985, 3, 7, 0, 0), network_id=2,norm_strength=0.6, spend_active_ind=0,net_spending_amt=9,cust_xref_id=6 ),
Row(cycle_dt=datetime.datetime(1985, 4, 7, 0, 0), network_id=2,norm_strength=0.6, spend_active_ind=1,net_spending_amt=4,cust_xref_id=6 ),
Row(cycle_dt=datetime.datetime(1985, 4, 8, 0, 0), network_id=2,norm_strength=0.6, spend_active_ind=1,net_spending_amt=6,cust_xref_id=6 ),
Row(cycle_dt=datetime.datetime(1984, 4, 2, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 4, 3, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 3, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 3, 2, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 5, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 6, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 7, 0, 0), network_id=3,norm_strength=0.4, spend_active_ind=0,net_spending_amt=3,cust_xref_id=8 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=3,norm_strength=0.4, spend_active_ind=0,net_spending_amt=2,cust_xref_id=8 ),
Row(cycle_dt=datetime.datetime(1984, 2, 7, 0, 0), network_id=3,norm_strength=0.4, spend_active_ind=1,net_spending_amt=8,cust_xref_id=8 ),
Row(cycle_dt=datetime.datetime(1985, 2, 7, 0, 0), network_id=3,norm_strength=0.6, spend_active_ind=1,net_spending_amt=4,cust_xref_id=9 ),
Row(cycle_dt=datetime.datetime(1985, 3, 7, 0, 0), network_id=3,norm_strength=0.6, spend_active_ind=0,net_spending_amt=1,cust_xref_id=9 ),
Row(cycle_dt=datetime.datetime(1985, 4, 7, 0, 0), network_id=3,norm_strength=0.6, spend_active_ind=1,net_spending_amt=9,cust_xref_id=9 ),
Row(cycle_dt=datetime.datetime(1985, 4, 8, 0, 0), network_id=3,norm_strength=0.6, spend_active_ind=0,net_spending_amt=3,cust_xref_id=9 )
]))

我正在尝试对每个 cust_xref_id 求和spend_active_ind 并保持总和大于零的那些。一种方法是使用 grouby 和 join:

dg1 = dg.groupby("cust_xref_id").agg(sum("spend_active_ind").alias("sum_spend_active_ind"))
dg1 = dg1.filter(dg1.sum_spend_active_ind != 0).select("cust_xref_id")
dg = dg.alias("t1").join(dg1.alias("t2"),col("t1.cust_xref_id")==col("t2.cust_xref_id")).select(col("t1.*"))

我能想到的另一种方式是使用窗口:

w = Window.partitionBy ('cust_xref_id')
dg = dg.withColumn('sum_spend_active_ind',sum(dg.spend_active_ind).over(w))
dg = dg.filter(dg.sum_spend_active_ind!=0)

这些方法中的哪一种(或任何其他方法)对于我正在尝试做的事情更有效。 谢谢

【问题讨论】:

    标签: group-by pyspark window partition-by


    【解决方案1】:

    您可以尝试在localhost:4040 打开您的spark-ui,或使用explain 方法查看查询计划:

    (
        dg
        .groupby('cust_xref_id')
        .agg(F.sum('spend_active_ind').alias('sum_spend_active_ind'))
        .filter(F.col('sum_spend_active_ind') > 0)
    ).explain()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-06-13
      • 1970-01-01
      • 2022-11-18
      • 1970-01-01
      • 1970-01-01
      • 2016-02-19
      • 1970-01-01
      • 2021-06-22
      相关资源
      最近更新 更多