【问题标题】:NoSuchElementException while using toDF from Spark / Livy从 Spark / Livy 使用 toDF 时出现 NoSuchElementException
【发布时间】:2019-08-15 05:03:03
【问题描述】:

我正在尝试从 Spark 中生成一个 Spark 数据帧,该数据帧已使用 apache Livy 进行了初始化。

我首先在这个更复杂的 hbase 调用中注意到了这个问题:

 import spark.implicits._

 ... 

        spark.sparkContext
          .newAPIHadoopRDD(
            conf,
            classOf[TableInputFormat],
            classOf[ImmutableBytesWritable],
            classOf[Result]
          )
          .toDF()

但我发现我可以在简单的情况下发生同样的事情:

 import spark.implicits._

  ...

  val filtersDf = filters.toDF() 

其中,filtersDf 只是一个案例类序列。

常见的问题是*.toDF(),但它也发生在*.toDS() 上,这让我认为import spark.implicits._ 上的隐式解析不起作用。要转换为数据帧的底层对象确实有数据。

错误堆栈看起来与使用 scala 运行时反射的运行时隐式解析有关。

请注意,我已经检查过,并且 spark 和编译的代码都使用相同版本的 Scala (2.11)。

我得到的例外是:

java.lang.RuntimeException: java.util.NoSuchElementException: head of empty list
scala.collection.immutable.Nil$.head(List.scala:420)
scala.collection.immutable.Nil$.head(List.scala:417)
scala.collection.immutable.List.map(List.scala:277)
scala.reflect.internal.Symbols$Symbol.parentSymbols(Symbols.scala:2117)
scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:301)
scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:341)
scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply$mcV$sp(SymbolLoaders.scala:74)
scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply(SymbolLoaders.scala:71)
scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply(SymbolLoaders.scala:71)
scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:263)
scala.reflect.runtime.SymbolLoaders$LazyPackageType.complete(SymbolLoaders.scala:71)
scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:174)
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:174)
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.info(SynchronizedSymbols.scala:174)
scala.reflect.internal.Types$TypeRef.thisInfo(Types.scala:2194)
scala.reflect.internal.Types$TypeRef.baseClasses(Types.scala:2199)
scala.reflect.internal.tpe.FindMembers$FindMemberBase.<init>(FindMembers.scala:17)
scala.reflect.internal.tpe.FindMembers$FindMember.<init>(FindMembers.scala:219)
scala.reflect.internal.Types$Type.scala$reflect$internal$Types$Type$$findMemberInternal$1(Types.scala:1014)
scala.reflect.internal.Types$Type.findMember(Types.scala:1016)
scala.reflect.internal.Types$Type.memberBasedOnName(Types.scala:631)
scala.reflect.internal.Types$Type.member(Types.scala:600)
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:66)
scala.reflect.internal.Mirrors$RootsBase.staticPackage(Mirrors.scala:204)
scala.reflect.runtime.JavaMirrors$JavaMirror.staticPackage(JavaMirrors.scala:82)
scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:263)
scala.reflect.runtime.JavaMirrors$class.scala$reflect$runtime$JavaMirrors$$createMirror(JavaMirrors.scala:32)
scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:49)
scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:47)
scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
scala.reflect.runtime.JavaMirrors$class.runtimeMirror(JavaMirrors.scala:46)
scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)
scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)

我的工作假设是我缺少依赖项或导入,这是某种 scala-ism。

我还没有找到任何其他关于这个问题的参考资料。最终,我认为这可能取决于导入/依赖项,但到目前为止我还不太清楚它是什么。非常感谢任何帮助。我很想知道解决问题的方法,或者通过比toDf() 更少神奇的方法来创建数据帧。

火花信息:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.2.0-mapr-1901
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_191)

【问题讨论】:

  • 你能在 toDF() 之前检查 rdd 的计数并检查结果吗?比如,spark.sparkContext.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result]).count()
  • 我查过了,rdd有数据。但我不认为它是 RDD 本身,而是与通过隐含的 toDF 调用更相关。如果您仔细查看该错误,则它是 scala 反射错误。

标签: scala apache-spark reflection hbase livy


【解决方案1】:

我在相同版本的 spark 上遇到了这个错误,但是,我在从 hdfs 读取 csv 时看到了这个错误,这是我正在做的一个例子:

val csv: DataFrame = ss
  .read
  .option("header", "true")
  .option("mode", "DROPMALFORMED")
  .csv(filePath)
println(csv.count())

This 是我看到错误源自 spark 的地方。

我制作了这个失败的一个小例子,并试图找出导致问题的原因。我正在使用 livy scala 编程 api 提交作业以激发火花。我发现它失败是因为我通过 livy 作为参数传递给 spark 的类型,这是有道理的,因为这是一个 scala 反射错误。

例如这个失败:

case class FailingJob(someSeq: Seq[String], filePath: String) {
...
def call(scalaJobContext: ScalaJobContext): Unit = {
    // It doesn't really matter what I do here.
// Main this is that the seq is used in some way.
val mappedSeq = someSeq.map(s => s.toUpperCase())

val ss: SparkSession = scalaJobContext.sparkSession

val csv: DataFrame = ss
  .read
  .option("header", "true")
  .option("mode", "DROPMALFORMED")
  .csv(filePath)

println(csv.count())
...
 for {
     _ <- livyClient.submit(FailingJob(someSeq, path).call)
...

虽然这很成功:

case class SuccessfulJob(someArray: Array[String], filePath: String) {
...
def call(scalaJobContext: ScalaJobContext): Unit = {
    // It doesn't really matter what I do here.
// Main this is that the seq is used in some way.
val mappedSeq = someArray.map(s => s.toUpperCase())

val ss: SparkSession = scalaJobContext.sparkSession

val csv: DataFrame = ss
  .read
  .option("header", "true")
  .option("mode", "DROPMALFORMED")
  .csv(filePath)

println(csv.count())
...
 for {
     _ <- livyClient.submit(SuccessfulJob(someArray, path).call)
...

因此,如果我传入 Seq 类型的参数,则会失败,这让我认为这是 kryo 的序列化/反序列化问题。要注意的另一件事是,如果我将值作为对象的属性引用,则 not 会抛出此错误。我试过升级到 spark 2.4 没有运气。我正在使用 livy 版本0.6.0-incubating。我目前的工作是将对象转换为使用Array 类型而不是Seq。我怀疑其他 scala 特定类型也会失败,尽管我还没有尝试过。

Here 是我对这个问题的复制,其中包括解决方法。我很欣赏这不能回答问题,但希望它可以帮助其他人在其他情况下努力调试问题。我还向livy 提交了一个问题,看看他们是否可以更深入地了解正在发生的事情。

【讨论】:

    猜你喜欢
    • 2022-01-17
    • 2019-10-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-11-04
    • 1970-01-01
    相关资源
    最近更新 更多