【问题标题】:Scala Reflection exception during creation of DataSet in Spark在 Spark 中创建数据集期间的 Scala 反射异常
【发布时间】:2018-09-05 16:18:26
【问题描述】:

我想在 Spark Jobserver 上运行 Spark Job。 在执行过程中,我遇到了一个异常:

堆栈

java.lang.RuntimeException: scala.ScalaReflectionException: 类 JavaMirror 中的 com.some.example.instrument.data.SQLMapping 与 org.apache.spark.util.MutableURLClassLoader@55b699ef 类型类 org.apache.spark.util.MutableURLClassLoader 与类路径 [file:/app/spark-job-server.jar] 和父母是 类型类的 sun.misc.Launcher$AppClassLoader@2e817b38 sun.misc.Launcher$AppClassLoader 与类路径 [.../classpath jars/] 未找到。

在 scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123) 在 scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22) 在 com.some.example.instrument.DataRetriever$$anonfun$combineMappings$1$$typecreator15$1.apply(DataRetriever.scala:136) 在 scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232) 在 scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) 在 org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:49) 在 org.apache.spark.sql.Encoders$.product(Encoders.scala:27​​5) 在 org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233) 在 org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33) 在 com.some.example.instrument.DataRetriever$$anonfun$combineMappings$1.apply(DataRetriever.scala:136) 在 com.some.example.instrument.DataRetriever$$anonfun$combineMappings$1.apply(DataRetriever.scala:135) 在 scala.util.Success$$anonfun$map$1.apply(Try.scala:237) 在 scala.util.Try$.apply(Try.scala:192) 在 scala.util.Success.map(Try.scala:237) 在 scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 在 scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) 在 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 在 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 在 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 在 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

DataRetriever 中,我将简单案例类转换为DataSet。

案例类定义

case class SQLMapping(id: String,
                      it: InstrumentPrivateKey,
                      cc: Option[String],
                      ri: Option[SourceInstrumentId],
                      p: Option[SourceInstrumentId],
                      m: Option[SourceInstrumentId])

case class SourceInstrumentId(instrumentId: Long,
                              providerId: String)

case class InstrumentPrivateKey(instrumentId: Long,
                                providerId: String,
                                clientId: String)

导致问题的代码:

import session.implicits._
def someFunc(future: Future[ID]): Dataset[SQLMappins] = {
future.map {f =>
val seq: Seq[SQLMapping] = getFromEndpoint(f)
val ds: Dataset[SQLMapping] = seq.toDS()
...
 }
}

作业有时有效,但如果我重新运行作业,它会抛出异常。

28.03.2018 更新 我忘了提到一个细节,事实证明这很重要。 数据集是在 Future 内部构建的。

【问题讨论】:

    标签: scala apache-spark apache-spark-dataset scala-reflect


    【解决方案1】:

    在未来调用 toDS() 导致 ScalaReflectionException。

    我决定在future.map之外构建DataSet。

    您可以使用此示例作业验证不能在future.map 中构造数据集。

    package com.example.sparkapplications
    
    import com.typesafe.config.Config
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SparkSession
    import scala.concurrent.Await
    import scala.concurrent.Future
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    import spark.jobserver.SparkJob
    import spark.jobserver.SparkJobValid
    import spark.jobserver.SparkJobValidation
    
    object FutureJob extends SparkJob{
      override def runJob(sc: SparkContext,
                          jobConfig: Config): Any = {
        val session = SparkSession.builder().config(sc.getConf).getOrCreate()
        import session.implicits._
        val f = Future{
          val seq = Seq(
            Dummy("1", 1),
            Dummy("2", 2),
            Dummy("3", 3),
            Dummy("4", 4),
            Dummy("5", 5)
          )
    
          val ds = seq.toDS
    
          ds.collect()
        }
    
        Await.result(f, 10 seconds)
      }
    
      case class Dummy(id: String, value: Long)
      override def validate(sc: SparkContext,
                            config: Config): SparkJobValidation = SparkJobValid
    }
    

    稍后我会提供信息,如果使用 spark 2.3.0 时问题仍然存在,以及当您直接通过 spark-submit 传递 jar 时。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-09-01
      • 2019-10-16
      • 2021-03-11
      • 2021-08-03
      • 1970-01-01
      • 1970-01-01
      • 2017-08-04
      相关资源
      最近更新 更多