【问题标题】:Spark Get the udf name from column and execute itSpark 从列中获取 udf 名称并执行
【发布时间】:2020-02-05 15:44:55
【问题描述】:

我注册了一些udfs,都具有相同的输入参数类型和相同的输出类型(字符串)。 假设 udf1、udf2、udf3。都有不同的功能。

在我的数据集中,我有多个列,在一列中,我有要在这一行数据上执行的 udf 的名称。

数据集示例:

+---+-------+-------+
|A  |   B   |udf    |
+---+-------+-------+
|1  |   a   |udf1   |
|2  |   b   |udf2   |
|3  |   c   |udf3   |
+---+-------+-------+

我想做这样的事情:

ds.withColumn("TEST", functions.callUDF(<name of right udf>, col("A"), col("B"))

我怎样才能做到这一点?有没有可能,如果没有,有什么可能的解决方法?

背景:我的 Spark 作业有一组 UDF,我想为行动态执行正确的 udf。

【问题讨论】:

    标签: java apache-spark apache-spark-sql


    【解决方案1】:

    试试这个 ::

    def func1(y: Int, z: String): String = y+z
    def func2(y: Int, z: String): String = y+","+z
    def default(y: Int, z: String): String = y
    
    val udfName = udf({ (x: String, y: Int, z: String) => x match {
    case "func1" => func1(y,z)
    case "func2" => func2(y,z)
    case _ => default(y,z)
    }})
    
    val data = Seq((1,"a","func1"),
    (2,"b","func2")
    ).toDF("A", "B", "udf")
    
    data.withColumn("TEST", udfName(col("udf"), col("A"), col("B")))
    

    您还可以使用源代码库来获得更高级的处理方式:

    scala get function name that was sent as param

    【讨论】:

    • 不完全是我的想法,但它非常接近。谢谢!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-02
    • 1970-01-01
    • 2023-01-15
    • 1970-01-01
    • 2018-01-25
    • 2017-07-29
    相关资源
    最近更新 更多