【问题标题】:pyspark equivalence of `df.loc`?`df.loc`的pyspark等价?
【发布时间】:2018-10-23 00:24:01
【问题描述】:

我正在寻找 pandas 数据框的 pyspark 等效项。 特别想对pyspark dataframe做如下操作

# in pandas dataframe, I can do the following operation
# assuming df = pandas dataframe
index = df['column_A'] > 0.0
amount = sum(df.loc[index, 'column_B'] * df.loc[index, 'column_C']) 
        / sum(df.loc[index, 'column_C'])

我想知道对 pyspark 数据框执行此操作的 pyspark 等效项是什么?

【问题讨论】:

    标签: python pandas apache-spark dataframe pyspark


    【解决方案1】:

    这对RDD 来说很简单(我对spark.sql.DataFrame 不太熟悉):

    x, y = (df.rdd
            .filter(lambda x: x.column_A > 0.0)
            .map(lambda x: (x.column_B*x.column_C, x.column_C))
            .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1])))
    amount = x / y
    

    或者filterDataFrame然后跳转到RDD:

    x, y = (df
            .filter(df.column_A > 0.0)
            .rdd
            .map(lambda x: (x.column_B*x.column_C, x.column_C))
            .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1])))
    amount = x / y
    

    经过一番挖掘,不确定这是不是最有效的方法,但没有进入RDD

    x, y = (df
            .filter(df.column_A > 0.0)
            .select((df.column_B * df.column_C).alias("product"), df.column_C)
            .agg({'product': 'sum', 'column_C':'sum'})).first()
    amount = x / y
    

    【讨论】:

      【解决方案2】:

      Spark DataFrame 没有严格的顺序,因此索引没有意义。相反,我们使用类似 SQL 的 DSL。在这里,您将使用 where (filter) 和 select。如果数据如下所示:

      import pandas as pd
      import numpy as np
      from pyspark.sql.functions import col, sum as sum_
      
      np.random.seed(1)
      
      df = pd.DataFrame({
         c: np.random.randn(1000) for c in ["column_A", "column_B", "column_C"]
      })
      

      amount 会是

      amount
      # 0.9334143225687774
      

      与 Spark 等效的是:

      sdf = spark.createDataFrame(df)
      
      (amount_, ) = (sdf
          .where(sdf.column_A > 0.0)
          .select(sum_(sdf.column_B * sdf.column_C) / sum_(sdf.column_C))
          .first())
      

      结果在数值上是等价的:

      abs(amount - amount_)
      # 1.1102230246251565e-16
      

      你也可以使用条件句:

      from pyspark.sql.functions import when
      
      pred = col("column_A") > 0.0
      
      amount_expr = sum_(
        when(pred, col("column_B")) * when(pred, col("column_C"))
      ) / sum_(when(pred, col("column_C")))
      
      sdf.select(amount_expr).first()[0]
      # 0.9334143225687773
      

      看起来更像 Pandas,但更冗长。

      【讨论】:

        【解决方案3】:

        更多快速的 Pysparky 答案

        import pyspark.sql.functions as f
        sdf=sdf.withColumn('sump',f.when(f.col('colA')>0,f.col('colB')*f.col('colC')).otherwise(0))
        z=sdf.select(f.sum(f.col('sump'))/f.sum(f.col('colA'))).collect()
        print(z[0])
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2018-05-12
          • 2016-01-27
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2022-11-01
          相关资源
          最近更新 更多