【问题标题】:pyspark approxQuantile functionpyspark 近似分位数函数
【发布时间】:2017-07-24 18:43:08
【问题描述】:

我有这些列 idpricetimestamp 的数据框。

我想找到按id 分组的中值。

我正在使用此代码来查找它,但它给了我这个错误。

from pyspark.sql import DataFrameStatFunctions as statFunc
windowSpec = Window.partitionBy("id")
median = statFunc.approxQuantile("price",
                                 [0.5],
                                 0) \
                 .over(windowSpec)

return df.withColumn("Median", median)

是不是不能用DataFrameStatFunctions来填充新列的值?

TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    好吧,确实不可能使用approxQuantile 来填充新数据框列中的值,但这不是您收到此错误的原因。不幸的是,整个背后的故事是一个相当令人沮丧的故事,因为 I have argued 的许多 Spark(尤其是 PySpark)功能和缺乏足够的文档就是这种情况。

    首先,不是一个,而是两个 approxQuantile 方法; first one 是标准 DataFrame 类的一部分,即您不需要导入 DataFrameStatFunctions:

    spark.version
    # u'2.1.1'
    
    sampleData = [("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)]
    
    df = spark.createDataFrame(sampleData, schema=["Name","Role","Salary"])
    df.show()
    # +------+---------+------+ 
    # |  Name|     Role|Salary|
    # +------+---------+------+
    # |   bob|Developer|125000| 
    # |  mark|Developer|108000|
    # |  carl|   Tester| 70000|
    # | peter|Developer|185000|
    # |   jon|   Tester| 65000|
    # | roman|   Tester| 82000|
    # | simon|Developer| 98000|
    # |  eric|Developer|144000|
    # |carlos|   Tester| 75000|
    # | henry|Developer|110000|
    # +------+---------+------+
    
    med = df.approxQuantile("Salary", [0.5], 0.25) # no need to import DataFrameStatFunctions
    med
    # [98000.0]
    

    The second oneDataFrameStatFunctions 的一部分,但是如果你照常使用它,就会得到你报告的错误:

    from pyspark.sql import DataFrameStatFunctions as statFunc
    med2 = statFunc.approxQuantile( "Salary", [0.5], 0.25)
    # TypeError: unbound method approxQuantile() must be called with DataFrameStatFunctions instance as first argument (got str instance instead)
    

    因为正确的用法是

    med2 = statFunc(df).approxQuantile( "Salary", [0.5], 0.25)
    med2
    # [82000.0]
    

    虽然您无法在 PySpark 文档中找到关于此的简单示例(我自己花了一些时间才弄清楚)... 最好的部分?这两个值不相等

    med == med2
    # False
    

    我怀疑这是由于使用了非确定性算法(毕竟,它应该是一个近似中位数),即使您使用相同的玩具数据重新运行命令您可能会得到不同的值(并且与我在此处报告的值不同)-我建议进行一些实验以感受一下...

    但是,正如我已经说过的,这不是您不能使用 approxQuantile 在新数据框列中填充值的原因 - 即使您使用正确的语法,您也会得到不同的错误:

    df2 = df.withColumn('median_salary', statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
    # AssertionError: col should be Column
    

    这里,col 指的是withColumn 操作的第二个参数,即approxQuantile 之一,错误消息说它不是Column 类型——实际上,它是一个列表:

    type(statFunc(df).approxQuantile( "Salary", [0.5], 0.25))
    # list
    

    因此,在填充列值时,Spark 需要Column 类型的参数,并且您不能使用列表;下面是一个使用每个角色的平均值而不是中间值创建一个新列的示例:

    import pyspark.sql.functions as func
    from pyspark.sql import Window
    
    windowSpec = Window.partitionBy(df['Role'])
    df2 = df.withColumn('mean_salary', func.mean(df['Salary']).over(windowSpec))
    df2.show()
    # +------+---------+------+------------------+
    # |  Name|     Role|Salary|       mean_salary| 
    # +------+---------+------+------------------+
    # |  carl|   Tester| 70000|           73000.0| 
    # |   jon|   Tester| 65000|           73000.0|
    # | roman|   Tester| 82000|           73000.0|
    # |carlos|   Tester| 75000|           73000.0|
    # |   bob|Developer|125000|128333.33333333333|
    # |  mark|Developer|108000|128333.33333333333| 
    # | peter|Developer|185000|128333.33333333333| 
    # | simon|Developer| 98000|128333.33333333333| 
    # |  eric|Developer|144000|128333.33333333333|
    # | henry|Developer|110000|128333.33333333333| 
    # +------+---------+------+------------------+
    

    之所以有效,是因为与approxQuantile 不同,mean 返回一个Column

    type(func.mean(df['Salary']).over(windowSpec))
    # pyspark.sql.column.Column
    

    【讨论】:

      【解决方案2】:

      按组计算分位数(聚合)示例

      由于组缺少聚合函数,我添加了一个按名称构造函数调用的示例(本例为percentile_approx):

      from pyspark.sql.column import Column, _to_java_column, _to_seq
      
      def from_name(sc, func_name, *params):
          """
             create call by function name 
          """
          callUDF = sc._jvm.org.apache.spark.sql.functions.callUDF
          func = callUDF(func_name, _to_seq(sc, *params, _to_java_column))
          return Column(func)
      

      在groupBy中应用percentile_approx函数:

      from pyspark.sql import SparkSession
      from pyspark.sql import functions as f
      
      spark = SparkSession.builder.getOrCreate()
      sc = spark.sparkContext
      
      # build percentile_approx function call by name: 
      target = from_name(sc, "percentile_approx", [f.col("salary"), f.lit(0.95)])
      
      
      # load dataframe for persons data 
      # with columns "person_id", "group_id" and "salary"
      persons = spark.read.parquet( ... )
      
      # apply function for each group
      persons.groupBy("group_id").agg(
          target.alias("target")).show()
      

      【讨论】:

        【解决方案3】:

        如果您对聚合而不是窗口函数感到满意,还可以选择使用 pandas_udf。不过,它们不如纯 Spark 快。这是来自docs 的改编示例:

        from pyspark.sql.functions import pandas_udf, PandasUDFType
        
        df = spark.createDataFrame(
            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "price")
        )
        
        @pandas_udf("double", PandasUDFType.GROUPED_AGG)
        def median_udf(v):
            return v.median()
        
        df.groupby("id").agg(median_udf(df["price"])).show()
        

        【讨论】:

        • pd.cut 会给我们提供与 approxQuantile 相同的结果吗?
        • 没有。如果你看一下它的例子,它会做一些完全不同的事情。无论如何,正如 ionathan 指出的那样,没有必要再依赖 pandas udfs。
        【解决方案4】:

        从 PySpark 3.1.0 开始,引入了 percentile_approx 函数来解决这个问题。

        函数percentile_approx返回一个列表,因此您需要对第一个元素进行切片。

        如:

        windowSpec = Window.partitionBy("id")
        df.withColumn("Median", F.percentile_approx("price", [0.5]).over(windowSpec)[0])
        

        【讨论】:

        • 非常感谢您提及 percentile_approx。由于无法在 groupBy 结果中使用 approxQuantiles,因此我围绕它编写了一个 for 循环。整个运行估计需要 55 小时,通过这个 percentile_approx 减少到 15 分钟。我比较了从两者得到的值 - 两者都是一样的!
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2020-01-23
        • 2023-03-20
        • 2023-03-05
        • 2021-10-09
        • 2018-04-01
        • 2021-10-10
        • 2021-08-17
        相关资源
        最近更新 更多