【问题标题】:Apache Spark: Uncaught exception in thread driver-heartbeaterApache Spark:线程驱动程序心跳中未捕获的异常
【发布时间】:2016-02-28 02:53:11
【问题描述】:

下面我写了一个简单的spark程序,使用spark的StreamingContext和SQLContext。

注意:即使没有流上下文,该问题也是可重现的。更新了程序以反映相同的情况。

注意:将 spark 版本降级到 1.4.1(我使用的是 1.5.2)似乎已经解决了我的问题。使用 spark 1.5.1 这个问题我们可以重现。

def main(args: Array[String]) {
    val sc = new SparkContext("local[*]", "test")
    val sqc = new SQLContext(sc)

    val dataFrame = sqc.read.json(sc.textFile("<dir>"))
    println(dataFrame.groupBy("Product.SerialNumber").count().count())
    sc.stop()
}

这在开始时给出以下异常,但正在执行并打印结果。

15/11/25 15:48:55 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
        at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.spark.util.Utils$.deserialize(Utils.scala:91)
        at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:440)
        at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:430)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:430)
        at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:428)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:428)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:472)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:472)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
        at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
        ... 33 more 

2 分钟后,发生以下异常并终止执行。直到两分钟,执行完美无缺,并且没有报告任何问题/异常。

15/11/25 15:51:44 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 179219 ms exceeds timeout 120000 ms^M
15/11/25 15:51:44 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 179219 ms^M
15/11/25 15:51:44 WARN TaskSetManager: Lost task 4.0 in stage 193.0 (TID 7688, localhost): ExecutorLostFailure (executor driver lost)^M
15/11/25 15:51:44 ERROR TaskSetManager: Task 4 in stage 193.0 failed 1 times; aborting job^M
15/11/25 15:51:44 WARN TaskSetManager: Lost task 7.0 in stage 193.0 (TID 7691, localhost): ExecutorLostFailure (executor driver lost)^M
15/11/25 15:51:44 WARN TaskSetManager: Lost task 6.0 in stage 193.0 (TID 7690, localhost): ExecutorLostFailure (executor driver lost)^M
15/11/25 15:51:44 WARN TaskSetManager: Lost task 5.0 in stage 193.0 (TID 7689, localhost): ExecutorLostFailure (executor driver lost)^M
15/11/25 15:51:44 WARN SparkContext: Killing executors is only supported in coarse-grained mode^M
15/11/25 15:51:45 ERROR JobScheduler: Error running job streaming job 1448446890000 ms.0^M
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 193.0 failed 1 times, most recent failure: Lost task 4.0 in stage 193.0 (TID 7688, localhost): ExecutorLostFailure (executor driver lost)^M
Driver stacktrace:^M
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)^M
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)^M
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)^M
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)^M
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)^M
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)^M
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)^M
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)^M
        at scala.Option.foreach(Option.scala:257)^M
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)^M
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)^M
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)^M
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)^M
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)^M
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)^M
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)^M
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)^M
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)^M
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)^M
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)^M
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)^M
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)^M
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)^M
        at org.apache.spark.rdd.RDD.collect(RDD.scala:908)^M
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:177)^M
        at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)^M
        at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)^M
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)^M
        at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)^M
        at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)^M
        at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1402)^M
        at main$$anonfun$main$1.apply(Main.scala:72)^M
        at main$$anonfun$main$1.apply(Main.scala:68)^M
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)^M
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)^M
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)^M
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)^M
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)^M
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)^M
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)^M
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)^M
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)^M
        at scala.util.Try$.apply(Try.scala:192)^M
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)^M
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:218)^M
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:218)^M
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:218)^M
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)^M
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:217)^M
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)^M
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)^M
        at java.lang.Thread.run(Thread.java:745)^M
[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 193.0 failed 1 times, most recent failure: Lost task 4.0 in stage 193.0 (TID 7688, localhost): ExecutorLostFailure (executor driver lost)^M

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-streaming


    【解决方案1】:

    您可能会在提交 spark 作业时忘记添加一些依赖项 jar。 在将项目提交给 spark 之前尝试组装您的项目(以便包含所有依赖项):

    sbt assembly
    

    顺便说一句,当我跑步时

    sbt console
    

    并在 scala 解释器中运行命令,我会遇到和你一样的问题。但是,如果我先组装它并通过

    运行作业
    spark submit --class className target/scala-2.10/xxx-assembly-0.1.0.jar someArgs
    

    有效:)

    参考:Apache Spark 1.5 with Cassandra : Class cast exception

    【讨论】:

    【解决方案2】:

    试试看

    val dataFrame = sqc.read.json(sc.textFile("<dir>")).cache()
    

    我遇到了同样的问题;在同一个数据帧上多次运行.count() 操作会导致此错误。

    如果这没有帮助,试试这个:

    val dataFrame = sqc.read.json(sc.textFile("<dir>"))
    val serialNumberDF = dataFrame.groupBy("Product.SerialNumber").cache()
    println(serialNumberDF.count().count())
    

    我的猜测是,必须一遍又一遍地重新评估数据帧(因为数据帧是惰性评估的)在某处导致错误。此外,对于大量数据,在多个地方使用数据帧在不缓存的情况下可能会付出相当高的性能代价。

    【讨论】:

      【解决方案3】:

      在我们的例子中(Spark 1.6.1),这些相同的错误在通过 sbt 运行测试时随机出现。这个问题实际上似乎是sbt issue。解决方法(在上面的链接中提到)是在分叉的 jvm 中运行测试:

      fork in test := true

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-09-17
        • 1970-01-01
        • 2011-05-20
        • 1970-01-01
        相关资源
        最近更新 更多