【问题标题】:How to define schema for custom type in Spark SQL?如何在 Spark SQL 中为自定义类型定义模式?
【发布时间】:2015-12-03 02:45:59
【问题描述】:

以下示例代码尝试将一些案例对象放入数据框中。代码包括使用此 trait 的案例对象层次结构和案例类的定义:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String, t: Some)

object Example {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName( "Example" )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
    df.show()
  }
}    

在执行代码时,不幸遇到如下异常:

java.lang.UnsupportedOperationException: Schema for type Some is not supported

问题

  • 是否可以为某些类型添加或定义架构(此处键入 Some)?
  • 是否存在另一种表示这种枚举的方法?
    • 我尝试直接使用Enumeration,但也没有成功。 (见下文)

Enumeration 的代码:

object Some extends Enumeration {
  type Some = Value
  val AType, BType = Value
}

提前致谢。我希望,最好的方法是不要使用字符串。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql case-class


    【解决方案1】:

    Spark 2.0.0+

    UserDefinedType 已在 Spark 2.0.0 中设为私有,目前还没有 Dataset 友好替换。

    见:SPARK-14155 (Hide UserDefinedType in Spark 2.0)

    大多数时候静态类型的Dataset 可以作为替代 有一个待处理的 Jira SPARK-7768 以使用目标版本 2.4 再次公开 UDT API。

    另见How to store custom objects in Dataset?

    火花

    是否有可能为某些类型添加或定义架构(此处键入 Some)?

    我想答案取决于您对它的需求程度。看起来可以创建一个UserDefinedType,但它需要访问DeveloperApi,而且不是很简单或有据可查。

    import org.apache.spark.sql.types._
    
    @SQLUserDefinedType(udt = classOf[SomeUDT])
    sealed trait Some
    case object AType extends Some
    case object BType extends Some
    
    class SomeUDT extends UserDefinedType[Some] {
      override def sqlType: DataType = IntegerType
    
      override def serialize(obj: Any) = {
        obj match {
          case AType => 0
          case BType => 1
        }
      }
    
      override def deserialize(datum: Any): Some = {
        datum match {
          case 0 => AType
          case 1 => BType
        }
      }
    
      override def userClass: Class[Some] = classOf[Some]
    }
    

    您也应该覆盖 hashCodeequals

    它的 PySpark 对应物可能如下所示:

    from enum import Enum, unique
    from pyspark.sql.types import UserDefinedType, IntegerType
    
    class SomeUDT(UserDefinedType):
        @classmethod
        def sqlType(self):
            return IntegerType()
    
        @classmethod
        def module(cls):
            return cls.__module__
    
        @classmethod 
        def scalaUDT(cls): # Required in Spark < 1.5
            return 'net.zero323.enum.SomeUDT'
    
        def serialize(self, obj):
            return obj.value
    
        def deserialize(self, datum):
            return {x.value: x for x in Some}[datum]
    
    @unique
    class Some(Enum):
        __UDT__ = SomeUDT()
        AType = 0
        BType = 1
    

    在 Spark

    对于像您这样的简单 UDT,您可以使用简单类型(例如 IntegerType 而不是整个 Struct)。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-11
    • 1970-01-01
    • 1970-01-01
    • 2023-03-11
    • 1970-01-01
    相关资源
    最近更新 更多