【问题标题】:Spark: How to map Python with Scala or Java User Defined Functions?Spark:如何将 Python 与 Scala 或 Java 用户定义函数映射?
【发布时间】:2016-01-18 22:34:27
【问题描述】:

例如,假设我的团队选择 Python 作为使用 Spark 开发的参考语言。但后来出于性能原因,我们希望开发特定的 Scala 或 Java 特定库,以便将它们与我们的 Python 代码映射(类似于具有 Scala 或 Java 骨架的 Python 存根)。

您不认为有可能将新的自定义 Python 方法与一些 Scala 或 Java 用户定义函数进行接口吗?

【问题讨论】:

    标签: java python scala apache-spark pyspark


    【解决方案1】:

    Spark 2.1+

    你可以使用SQLContext.registerJavaFunction:

    注册一个 java UDF,以便它可以在 SQL 语句中使用。

    这需要name、Java 类的完全限定名和可选的返回类型。不幸的是,目前它只能在 SQL 语句中使用(或与 expr / selectExpr 一起使用)并且需要 Java org.apache.spark.sql.api.java.UDF*

    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "2.1.0"
    )
    
    package com.example.spark.udfs
    
    import org.apache.spark.sql.api.java.UDF1
    
    class addOne extends UDF1[Integer, Integer] {
      def call(x: Integer) = x + 1
    } 
    
    sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne")
    sqlContext.sql("SELECT add_one(1)").show()
    
    ## +------+
    ## |UDF(1)|
    ## +------+
    ## |     2|
    ## +------+
    

    版本独立

    我不会说它受支持,但它肯定是可能的。目前在 PySpark 中可用的所有 SQL 函数都只是 Scala API 的包装器。

    假设我想重用GroupConcat UDAF,我创建了作为SPARK SQL replacement for mysql GROUP_CONCAT aggregate function 的答案,它位于包com.example.udaf 中:

    from pyspark.sql.column import Column, _to_java_column, _to_seq
    from pyspark.sql import Row
    
    row = Row("k", "v")
    df = sc.parallelize([
        row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF()
    
    def groupConcat(col):
        """Group and concatenate values for a given column
    
        >>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
        >>> df.select(groupConcat("v").alias("vs"))
        [Row(vs=u'foo,bar')]
        """
        sc = SparkContext._active_spark_context
        # It is possible to use java_import to avoid full package path
        _groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply
        # Converting to Seq to match apply(exprs: Column*)
        return Column(_groupConcat(_to_seq(sc, [col], _to_java_column)))
    
    df.groupBy("k").agg(groupConcat("v").alias("vs")).show()
    
    ## +---+---------+
    ## |  k|       vs|
    ## +---+---------+
    ## |  1|foo1,foo2|
    ## |  2|bar1,bar2|
    ## +---+---------+
    

    根据我的喜好,前导下划线太多了,但正如您所见,这是可以做到的。

    相关:

    【讨论】:

    • 我在做如下操作,但是每次遇到“py4j.protocol.Py4JError”:com.example.udf.GroupConcat.apply does not exist in the JVM.我的包是“com.example.udf”
    • 我有一个包含枚举常量和 UDF 的 jar。如何修改此代码以使用它?
    • 我错过了一些关于registerJavaFunction 知道在哪里可以找到您的 UDF 的内容...您能详细说明这里的目录结构吗?你有 sbt clean assembly 你的 scalaVersion :=... (build.sbt??) 和 package com.example.spark.udfs... (src/main/scala??) 来自另一个目录的文件吗?其他地方?
    • 值得注意的是,您应该首先检查repo1.maven.org/maven2/org/apache/spark 以确保您的Scala 和Spark 版本首先兼容...我只花了一整天(我第一天使用@ 987654344@ ?) 试图使 scalaVersion := "2.12.7"sparkVersion = "2.3.1" 一起工作,但 Scala 2.12+ 仅与 Spark 2.4+ 兼容(或者我认为)
    猜你喜欢
    • 1970-01-01
    • 2016-06-06
    • 1970-01-01
    • 2020-01-05
    • 2023-03-19
    • 2015-01-05
    • 2021-12-11
    • 1970-01-01
    • 2016-11-18
    相关资源
    最近更新 更多