【问题标题】:Scala UDF with multiple parameters used in PysparkPyspark 中使用多个参数的 Scala UDF
【发布时间】:2018-07-22 05:22:09
【问题描述】:

我有一个用 Scala 编写的 UDF,我希望能够通过 Pyspark 会话调用它。 UDF 采用两个参数,字符串列值和第二个字符串参数。如果它只需要一个参数(列值),我已经能够成功调用 UDF。如果需要多个参数,我正在努力调用 UDF。以下是我迄今为止在 Scala 和 Pyspark 中能够做到的事情:

Scala UDF:

class SparkUDFTest() extends Serializable {
  def stringLength(columnValue: String, columnName: String): Int =
      LOG.info("Column name is: " + columnName)
      return columnValue.length
}

在 Scala 中使用它时,我已经能够注册和使用这个 UDF:

Scala 主类:

val udfInstance = new SparkUDFTest()
val stringLength = spark.sqlContext.udf.register("stringlength", udfInstance.stringLength _)
val newDF = df.withColumn("name", stringLength(col("email"), lit("email")))

以上工作成功。这是通过 Pyspark 进行的尝试:

def testStringLength(colValue, colName):
  package = "com.test.example.udf.SparkUDFTest"
udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().stringLength().apply
  return Column(udfInstance(_to_seq(sc, [colValue], _to_java_column), colName))

在 Pyspark 中调用 UDF:

df.withColumn("email", testStringLength("email", lit("email")))

执行上述操作并在 Pyspark 中进行一些调整会给我以下错误:

py4j.Py4JException: Method getStringLength([]) does not exist
or
java.lang.ClassCastException: com.test.example.udf.SparkUDFTest$$anonfun$stringLength$1 cannot be cast to scala.Function1
or
TypeError: 'Column' object is not callable

我能够将 UDF 修改为仅采用一个参数(列值),并且能够成功调用它并取回一个新的 Dataframe。

Scala UDF 类

class SparkUDFTest() extends Serializable {
  def testStringLength(): UserDefinedFunction = udf(stringLength _)
  def stringLength(columnValue: String): Int =
      LOG.info("Column name is: " + columnName)
      return columnValue.length
}

更新 Python 代码:

def testStringLength(colValue, colName):
  package = "com.test.example.udf.SparkUDFTest"
  udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().testStringLength().apply
  return Column(udfInstance(_to_seq(sc, [colValue], _to_java_column)))

以上工作成功。如果 UDF 需要一个额外的参数,我仍然在努力调用 UDF。如何通过 Pyspark 将第二个参数传递给 UDF?

【问题讨论】:

  • 为什么不在 pyspark 中实现相同的 udf?那么您就不必经历如此复杂的过程
  • @RameshMaharjan 有几个原因: 性能因为 Python UDF 由于调用 Python 解释器而具有相当大的开销。另外,如果可以通过 Pyspark 会话调用,我想限制代码重复,并且没有用 Python 编写的完全相同的函数
  • 不希望代码重复,您正在引发性能问题。代码序列化和反序列化以及代码编译和反编译。
  • @RameshMaharjan 是的,这就是计划,但只是想看看在朝着这个方向前进之前是否可以通过 Pyspark 调用 Scala UDF 而没有任何复杂性。

标签: scala apache-spark pyspark user-defined-functions


【解决方案1】:

我能够通过使用柯里化来解决这个问题。首先将UDF注册为

def testStringLength(columnName): UserDefinedFunction = udf((colValue: String) => stringLength(colValue, colName)

称为 UDF

udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().testStringLength("email").apply
df.withColumn("email", Column(udfInstance(_to_seq(sc, [col("email")], _to_java_column))))

这可以再清理一下,但这是我让它工作的方式。

编辑:我使用 currying 的原因是,即使我在第二个参数上使用 'lit' 我想作为字符串传递给 UDF,我仍然遇到“TypeError: 'Column' 对象是不可调用”错误。在 Scala 中我没有遇到这个问题。我不确定为什么在 Pyspark 会发生这种情况。这可能是由于 Python 解释器和 Scala 代码之间可能发生的一些复杂性。仍然不清楚,但柯里化对我有用。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-06-06
    • 1970-01-01
    • 2019-01-09
    • 2018-09-12
    • 2020-12-20
    • 2019-01-25
    相关资源
    最近更新 更多