【问题标题】:Using Scala classes as UDF with pyspark使用 Scala 类作为带有 pyspark 的 UDF
【发布时间】:2018-09-12 21:36:45
【问题描述】:

在使用 Apache Spark 时,我正在尝试将一些计算从 Python 卸载到 Scala。我想使用 Java 的类接口来使用持久变量,就像这样(这是基于我更复杂的用例的无意义的 MWE):

package mwe

import org.apache.spark.sql.api.java.UDF1

class SomeFun extends UDF1[Int, Int] {
  private var prop: Int = 0

  override def call(input: Int): Int = {
    if (prop == 0) {
      prop = input
    }
    prop + input
  }
}

现在我尝试在 pyspark 中使用这个类:

import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext

conf = pyspark.SparkConf()
conf.set("spark.jars", "mwe.jar")
sc = SparkContext.getOrCreate(conf)

sqlContext = SQLContext.getOrCreate(sc)
sqlContext.registerJavaFunction("fun", "mwe.SomeFun")

df0 = sc.parallelize((i,) for i in range(6)).toDF(["num"])
df1 = df0.selectExpr("fun(num) + 3 as new_num")
df1.show()

并得到以下异常:

pyspark.sql.utils.AnalysisException: u"cannot resolve '(UDF:fun(num) + 3)' due to data type mismatch: differing types in '(UDF:fun(num) + 3)' (struct<> and int).; line 1 pos 0;\n'Project [(UDF:fun(num#0L) + 3) AS new_num#2]\n+- AnalysisBarrier\n      +- LogicalRDD [num#0L], false\n"

实现这一点的正确方法是什么?我必须求助于Java本身来上课吗?非常感谢您的提示!

【问题讨论】:

    标签: scala apache-spark pyspark apache-spark-sql user-defined-functions


    【解决方案1】:

    异常的来源是使用了不兼容的类型:

    • 首先,o.a.s.sql.api.java.UDF* 对象需要外部 Java(不是 Scala 类型),因此预期整数的 UDF 应该采用装箱的 Integer (java.lang.Integer) 而不是 Int

      class SomeFun extends UDF1[Integer, Integer] {
        ...
        override def call(input: Integer): Integer = {
          ...
      
    • 除非您使用旧版 Python num 列使用 LongType 而不是 IntegerType

      df0.printSchema()
      root
       |-- num: long (nullable = true)
      

      所以实际的签名应该是

      class SomeFun extends UDF1[java.lang.Long, java.lang.Long] {
        ...
        override def call(input: java.lang.Long): java.lang.Long = {
          ...
      

      或者在应用UDF之前应该转换数据

      df0.selectExpr("fun(cast(num as integer)) + 3 as new_num")
      

    最后,在 UDF 中不允许使用可变状态。它不会导致异常,但整体行为将是不确定的。

    【讨论】:

    • 谢谢!我用 MWE 复制了这个,现在必须调整我的其他代码,以及看看是否有办法绕过可变状态(确定性不是关键)。
    猜你喜欢
    • 2017-04-03
    • 1970-01-01
    • 2017-06-06
    • 2020-12-20
    • 2018-07-22
    • 1970-01-01
    • 2021-08-18
    • 1970-01-01
    相关资源
    最近更新 更多