【问题标题】:Sort the resulting DataFrame in pyspark using UDF使用 UDF 在 pyspark 中对生成的 DataFrame 进行排序
【发布时间】:2021-08-13 15:46:37
【问题描述】:

我正在使用 diamonds 数据框在 SPARK 中工作。数据如下:

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+

和架构:

root
|-- carat: double (nullable = true)
|-- cut: string (nullable = true)
|-- color: string (nullable = true)
|-- clarity: string (nullable = true)
|-- depth: double (nullable = true)
|-- table: double (nullable = true)
|-- price: integer (nullable = true)
|-- x: double (nullable = true)
|-- y: double (nullable = true)

我创建了一个自定义函数并注册为 UDF:

def rank_cut(cut):
    cut_class_dict = {"Fair": 1, "Good": 2, "Very Good": 3, "Premium": 4, "Ideal": 5}
    for cut, v in cut_class_dict():
        x['cut'] = v
 
    return v
spark.udf.register('rank_cut', rank_cut)

我想使用这个自定义函数对我的数据框进行如下排序:

( 
diamonds
.groupBy('cut')
.agg(
    expr('COUNT(*) AS n_diamonds'),
    expr('ROUND(AVG(price)) AS avg_price'),
    expr('ROUND(AVG(carat),2) AS avg_carat'),
    expr('ROUND(AVG(depth),2) AS avg_depth'),
    expr('ROUND(AVG(table),2) AS avg_table'),
  
  
)
.rank_cut('cut')
.show()
)

但它不起作用。我有什么遗漏吗?

【问题讨论】:

  • “不工作”是什么意思?您需要向我们展示错误消息(如果有),以及您的输出(如果存在)与您想要的输出有何不同。
  • 你不觉得UDF函数rank_cut是正确的吗?
  • 现在我看了一下,仅rank_cut 的定义就有几个错误,与 Spark 无关。这就是为什么拥有minimal reproducible example 很重要:
  • @jjramsey thx 提示。我收到的消息是 AttributeError: 'DataFrame' object has no attribute 'rank_cut'。实际上我得到的是上面已经显示的代码。我需要按字典中表示的剪切质量从 1-5 升序对 expr 转换生成的数据帧进行排序
  • @SuyogShimpi 我不确定。我收到的错误消息让我觉得我在这方面出了问题

标签: python dataframe sorting pyspark user-defined-functions


【解决方案1】:

问题解决了。

我将我的 udf 更改为:

cut_class_dict = {"Fair": 1, "Good": 2, "Very Good": 3, "Premium": 4, "Ideal": 5}
rank_cut =  udf(lambda cut: cut_class_dict.get(cut))
spark.udf.register('rank_cut', rank_cut)

并将其映射到剪切列

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-11-24
    • 2019-02-02
    • 2013-11-03
    • 2022-01-21
    • 1970-01-01
    • 1970-01-01
    • 2013-05-22
    • 2022-10-17
    相关资源
    最近更新 更多