【问题标题】:Why is "Unable to find encoder for type stored in a Dataset" when creating a dataset of custom case class?为什么在创建自定义案例类的数据集时“无法找到存储在数据集中的类型的编码器”?
【发布时间】:2016-12-04 12:49:25
【问题描述】:

Spark 2.0(最终版)和 Scala 2.11.8。以下超级简单的代码会产生编译错误Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

import org.apache.spark.sql.SparkSession

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder.
      master("local")
      .appName("example")
      .getOrCreate()

    val dataset = sparkSession.createDataset(dataList)
  }
}

【问题讨论】:

    标签: scala apache-spark apache-spark-dataset apache-spark-encoders


    【解决方案1】:

    Spark Datasets 需要 Encoders 用于即将存储的数据类型。对于常见类型(原子、产品类型),有许多可用的预定义编码器,但您必须先从 SparkSession.implicits 导入这些编码器才能使其工作:

    val sparkSession: SparkSession = ???
    import sparkSession.implicits._
    val dataset = sparkSession.createDataset(dataList)
    

    或者你可以直接提供一个明确的

    import org.apache.spark.sql.{Encoder, Encoders}
    
    val dataset = sparkSession.createDataset(dataList)(Encoders.product[SimpleTuple])
    

    或隐式

    implicit val enc: Encoder[SimpleTuple] = Encoders.product[SimpleTuple]
    val dataset = sparkSession.createDataset(dataList)
    

    Encoder 用于存储类型。

    注意Encoders还为原子类型提供了许多预定义的Encoders,为复杂类型提供了Encoders,可以使用ExpressionEncoder派生。

    进一步阅读:

    【讨论】:

    • 也许这对 scala 开发人员来说是显而易见的,但我比 scala 更了解 spark... 为什么使用 = ??? 好,或者比实际声明它更好?
    • @DanCiborowski-MSFT ???NotImplementedError - 换句话说,根据您的要求填写空白。
    【解决方案2】:

    对于其他用户(您的用户是正确的),请注意,在 object 范围之外定义 case class 也很重要。所以:

    失败:

    object DatasetTest {
      case class SimpleTuple(id: Int, desc: String)
    
      val dataList = List(
        SimpleTuple(5, "abc"),
        SimpleTuple(6, "bcd")
      )
    
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder
          .master("local")
          .appName("example")
          .getOrCreate()
        val dataset = sparkSession.createDataset(dataList)
      }
    }
    

    添加隐式,仍然失败并出现同样的错误:

    object DatasetTest {
      case class SimpleTuple(id: Int, desc: String)
    
      val dataList = List(
        SimpleTuple(5, "abc"),
        SimpleTuple(6, "bcd")
      )
    
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder
          .master("local")
          .appName("example")
          .getOrCreate()
    
        import sparkSession.implicits._
        val dataset = sparkSession.createDataset(dataList)
      }
    }
    

    作品:

    case class SimpleTuple(id: Int, desc: String)
    
    object DatasetTest {   
      val dataList = List(
        SimpleTuple(5, "abc"),
        SimpleTuple(6, "bcd")
      )
    
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder
          .master("local")
          .appName("example")
          .getOrCreate()
    
        import sparkSession.implicits._
        val dataset = sparkSession.createDataset(dataList)
      }
    }
    

    这是相关的错误:https://issues.apache.org/jira/browse/SPARK-13540,所以希望它会在 Spark 2 的下一个版本中得到修复。

    (编辑:看起来那个错误修复实际上是在 Spark 2.0.0 中......所以我不确定为什么这仍然失败)。

    【讨论】:

    • 这似乎仍然是 Spark 2.4 中的一个问题
    • 虽然为我工作!
    • 非常重要case class is defined outside of the object scope. 谢谢!
    • spark 3.0.1 还是失败,case 类需要在对象范围之外。
    • 对于那些在使用 Zeppelin 时遇到此错误的人,在一段中声明案例类并在下一段中创建数据集将解决问题。当我在同一段落中进行两次调用时,我遇到了错误。
    【解决方案3】:

    我会用我自己的问题的答案来澄清,如果目标是定义一个简单的文字 SparkData 框架,而不是使用 Scala 元组和隐式转换,更简单的方法是像这样直接使用 Spark API:

      import org.apache.spark.sql._
      import org.apache.spark.sql.types._
      import scala.collection.JavaConverters._
    
      val simpleSchema = StructType(
        StructField("a", StringType) ::
        StructField("b", IntegerType) ::
        StructField("c", IntegerType) ::
        StructField("d", IntegerType) ::
        StructField("e", IntegerType) :: Nil)
    
      val data = List(
        Row("001", 1, 0, 3, 4),
        Row("001", 3, 4, 1, 7),
        Row("001", null, 0, 6, 4),
        Row("003", 1, 4, 5, 7),
        Row("003", 5, 4, null, 2),
        Row("003", 4, null, 9, 2),
        Row("003", 2, 3, 0, 1)
      )
    
      val df = spark.createDataFrame(data.asJava, simpleSchema)
    

    【讨论】:

    • 山羊是创建数据集,而不是数据框。
    猜你喜欢
    • 2016-04-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-13
    • 1970-01-01
    相关资源
    最近更新 更多