【问题标题】:Define UDF in Spark Scala在 Spark Scala 中定义 UDF
【发布时间】:2016-12-19 08:25:26
【问题描述】:

我需要在 Spark 中使用一个 UDF,它接收一个时间戳、一个整数和另一个数据帧,并返回一个包含 3 个值的元组。

我不断地遇到一个又一个错误,我不确定我是否正在尝试修复它。

函数如下:

def determine_price (view_date: org.apache.spark.sql.types.TimestampType , product_id: Int, price_df: org.apache.spark.sql.DataFrame) : (Double, java.sql.Timestamp, Double) = {
    var price_df_filtered = price_df.filter($"mkt_product_id" === product_id && $"created"<= view_date)
    var price_df_joined = price_df_filtered.groupBy("mkt_product_id").agg("view_price" -> "min", "created" -> "max").withColumn("last_view_price_change", lit(1))
    var price_df_final = price_df_joined.join(price_df_filtered, price_df_joined("max(created)") === price_df_filtered("created")).filter($"last_view_price_change" === 1)
    var result = (price_df_final.select("view_price").head().getDouble(0), price_df_final.select("created").head().getTimestamp(0), price_df_final.select("min(view_price)").head().getDouble(0))
    return result
}
val det_price_udf = udf(determine_price)

它给我的错误是:

error: missing argument list for method determine_price
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `determine_price _` or `determine_price(_,_,_)` instead of `determine_price`.

如果我开始添加参数,我会在其他错误中继续运行,例如 Int expected Int.type found 或 object DataFrame is not a member of package org.apache.spark.sql

给出一些上下文:

我的想法是我有一个价格数据框、一个产品 ID 和一个创建日期,以及另一个包含产品 ID 和查看日期的数据框。

我需要根据上次创建的比查看日期更早的价格条目来确定价格。

由于每个产品 ID 在第二个数据框中都有多个查看日期。我认为 UDF 比交叉连接更快。如果有人有不同的想法,我将不胜感激。

【问题讨论】:

    标签: apache-spark spark-dataframe


    【解决方案1】:

    您不能在 UDF 中传递 Dataframe,因为 UDF 将在特定分区上的 Worker 上运行。由于你不能在 Worker 上使用 RDD(Is it possible to create nested RDDs in Apache Spark?),同样你也不能在 Worker 上使用 DataFrame。!

    你需要解决这个问题!

    【讨论】:

    • 好的,我从 de UDF 参数中删除了数据框。数据帧被缓存和广播,它应该可以从函数中访问我仍然得到错误:error: type mismatch; found : Int.type required: Int val det_price_udf = udf(determine_price(org.apache.spark.sql.types.TimestampType, Int))
    • 好像dataframe不在UDF中,是不能使用的。它不是我在 Python 中习惯的“全局变量”。不知道如何解决这个问题。
    • 你的用例是什么?
    • 我有一个包含大量产品页面浏览量(product_id、查看日期)的数据框和另一个跟踪产品价格变化的数据框(product_id、change_date、价格)。对于第一个数据框中的每个视图,我需要确定在查看页面之前发生的价格变化。因此,如果产品 X 的价格在 18 日变为 500 和 20 日变为 600,我需要确定对于 19 日记录的视图,价格为 500,而对于 21 日的视图,价格为 600。
    • 我认为如果您可以将此作为单独的问题提出会更好,以便更多人可以帮助您解决此用例!直到那时我才想到这个问题!因为它解决了这个问题,所以接受这个答案!
    猜你喜欢
    • 2017-11-01
    • 1970-01-01
    • 2018-11-22
    • 1970-01-01
    • 1970-01-01
    • 2016-12-02
    • 2021-12-07
    • 1970-01-01
    • 2021-02-23
    相关资源
    最近更新 更多