【问题标题】:How to use java.time.LocalDate in Datasets (fails with java.lang.UnsupportedOperationException: No Encoder found)? [duplicate]如何在数据集中使用 java.time.LocalDate(因 java.lang.UnsupportedOperationException 失败:未找到编码器)? [复制]
【发布时间】:2017-12-24 20:40:14
【问题描述】:
  • Spark 2.1.1
  • Scala 2.11.8
  • Java 8
  • Linux Ubuntu 16.04 LTS

我想将我的 RDD 转换为数据集。为此,我使用了implicits 方法toDS(),它给了我以下错误:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "date")
- root class: "observatory.TemperatureRow"
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)

就我而言,我必须使用类型java.time.LocalDate,我不能使用java.sql.data。我读过我需要告知 Spark 如何将 Java 类型转换为 Sql 类型,我这个方向,我构建了下面的 2 个隐式函数:

implicit def toSerialized(t: TemperatureRow): EncodedTemperatureRow = EncodedTemperatureRow(t.date.toString, t.location, t.temperature)
implicit def fromSerialized(t: EncodedTemperatureRow): TemperatureRow = TemperatureRow(LocalDate.parse(t.date), t.location, t.temperature)

下面,一些关于我的应用程序的代码:

case class Location(lat: Double, lon: Double)

case class TemperatureRow(
                             date: LocalDate,
                             location: Location,
                             temperature: Double
                         )

case class EncodedTemperatureRow(
                             date: String,
                             location: Location,
                             temperature: Double

val s = Seq[TemperatureRow](
                    TemperatureRow(LocalDate.parse("2017-01-01"), Location(1.4,5.1), 4.9),
                    TemperatureRow(LocalDate.parse("2014-04-05"), Location(1.5,2.5), 5.5)
                )

import spark.implicits._
val temps: RDD[TemperatureRow] = sc.parallelize(s)
val tempsDS = temps.toDS

我不知道为什么 Spark 在编码器中搜索 java.time.LocalDate,我提供了 TemperatureRowEncodedTemperatureRow 的隐式转换...

【问题讨论】:

  • 您提供的隐式转换对 Spark 根本没有用 - 框架如何“知道”将对象转换为 EncodedTemperatureRow? Spark 需要org.apache.spark.sql.Encoder[T] 类型的隐式值来编码T 类型的值,因此您需要提供一个隐式Encoder[TemperatureRow]。创建这样的编码器并非易事,请参阅stackoverflow.com/a/39442829/5344058

标签: scala apache-spark apache-spark-sql


【解决方案1】:

直到 Spark 2.2 才支持java.time.LocalDate(我一直在尝试为该类型编写 Encoderfailed)。

您必须将java.time.LocalDate 转换为其他支持的类型(例如java.sql.Timestampjava.sql.Date),或字符串中的纪元或日期时间。

【讨论】:

猜你喜欢
  • 2016-12-08
  • 1970-01-01
  • 1970-01-01
  • 2019-09-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-11-11
  • 1970-01-01
相关资源
最近更新 更多