【问题标题】:How to use Scala UDF in PySpark?如何在 PySpark 中使用 Scala UDF?
【发布时间】:2017-06-06 09:58:47
【问题描述】:

我希望能够在 PySpark 中将 Scala 函数用作 UDF

package com.test

object ScalaPySparkUDFs extends Serializable {
    def testFunction1(x: Int): Int = { x * 2 }
    def testUDFFunction1 = udf { x: Int => testFunction1(x) }
} 

我可以在 PySpark 中访问 testFunction1 并让它返回值:

functions = sc._jvm.com.test.ScalaPySparkUDFs 
functions.testFunction1(10)

我希望能够将此函数用作 UDF,最好是在 withColumn 调用中:

row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))

我认为这里有一个很有前途的方法: Spark: How to map Python with Scala or Java User Defined Functions?

但是,当对那里找到的代码进行更改时,请改用testUDFFunction1

def udf_test(col):
    sc = SparkContext._active_spark_context
    _f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
    return Column(_f(_to_seq(sc, [col], _to_java_column)))

我明白了:

 AttributeError: 'JavaMember' object has no attribute 'apply' 

我不明白这一点,因为我相信testUDFFunction1 确实有一个 apply 方法?

我不想使用此处找到的类型的表达式: Register UDF to SqlContext from Scala to use in PySpark

任何关于如何完成这项工作的建议都将不胜感激!

【问题讨论】:

    标签: python scala apache-spark pyspark apache-spark-sql


    【解决方案1】:

    同意@user6910411,你必须直接在函数上调用apply方法。 所以,你的代码将是。

    Scala 中的 UDF:

    import org.apache.spark.sql.expressions.UserDefinedFunction
    import org.apache.spark.sql.functions._
    
    
    object ScalaPySparkUDFs {
    
        def testFunction1(x: Int): Int = { x * 2 }
    
        def getFun(): UserDefinedFunction = udf(testFunction1 _ )
    }
    

    PySpark 代码:

    def test_udf(col):
        sc = spark.sparkContext
        _test_udf = sc._jvm.com.test.ScalaPySparkUDFs.getFun()
        return Column(_test_udf.apply(_to_seq(sc, [col], _to_java_column)))
    
    
    row = Row("Value")
    numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
    numbers.withColumn("Result", test_udf(numbers['Value']))
    

    【讨论】:

    • 我想传递多个列,有什么办法可以做到吗?
    【解决方案2】:

    您链接的问题是使用 Scala object。 Scala object 是一个单例,你可以直接使用apply 方法。

    这里你使用了一个返回UserDefinedFunction类对象的空函数,你必须先调用这个函数:

    _f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1() # Note () at the end
    Column(_f.apply(_to_seq(sc, [col], _to_java_column)))
    

    【讨论】:

    • 如果我想传递多列是否有类似的东西。
    猜你喜欢
    • 2019-01-25
    • 2018-07-22
    • 1970-01-01
    • 1970-01-01
    • 2016-07-28
    • 2018-09-12
    • 2019-02-23
    • 1970-01-01
    相关资源
    最近更新 更多