【问题标题】:Spark Streaming: Broadcast variables, java.lang.ClassCastExceptionSpark Streaming:广播变量,java.lang.ClassCastException
【发布时间】:2016-08-18 14:57:41
【问题描述】:


我尝试从存储在 HDFS 中的静态文本文件中读取数据,将其内容存储到 ArrayBuffer 中,而后者又应通过 sparkContext.broadcast 作为 BroadcastVariable 进行广播。我正在使用 cloudera 的 spark,spark version 1.6.0-cdh5.7.0 和 spark-streaming_2.10

我使用 spark-submit 在 yarn 上启动应用程序:

spark-submit --class my.package.BroadcastStreamTest1 --master yarn --deploy-mode client --conf spark.executor.userClassPathFirst=true current.jar

当我这样做时,我得到一个 java.lang.ClassCastException:无法将 scala.Some 的实例分配给 org.apache.spark.Accumulator 实例中 scala.Option 类型的字段 org.apache.spark.Accumulable.name 与硬编码的 ArrayBuffer 一起使用的代码可以完美地工作,所以我认为它与静态文件资源有关...... 有谁知道我可能做错了什么?任何帮助表示赞赏。

这不起作用:

对象广播流测试1 { def main(args: Array[String]) { val sparkConf = 新 SparkConf() val streamingContext = new StreamingContext(sparkConf, batchDuration = Seconds(10)) val 内容 = streamingContext.sparkContext .textFile("hdfs:///data/someTextFile.txt") 。收藏() .toBuffer[字符串] val broadCastVar = streamingContext.sparkContext.broadcast(内容) broadCastVar.value.foreach(line => println(line)) 流上下文.start() streamingContext.awaitTermination() } }

这行得通:

对象广播流测试2 { def main(args: Array[String]) { val sparkConf = 新 SparkConf() val streamingContext = new StreamingContext(sparkConf, batchDuration = Seconds(10)) val content = new mutable.ArrayBuffer[String] (1 到 50).foreach(i => content += "line" + i) val broadCastVar = streamingContext.sparkContext.broadcast(内容) broadCastVar.value.foreach(line => println(line)) 流上下文.start() streamingContext.awaitTermination() } }

堆栈跟踪:

16/04/25 10:09:59 错误 scheduler.TaskSetManager: 阶段 0.0 中的任务 0 失败 4 次;中止工作 线程“主”org.apache.spark.SparkException 中的异常:作业因阶段失败而中止:阶段 0.0 中的任务 0 失败 4 次,最近一次失败:阶段 0.0 中丢失任务 0.3(TID 6,n525.hadoop.mxint。 net):java.io.IOException:java.lang.ClassCastException:无法将scala.Some实例分配给org.apache.spark.Accumulator实例中scala.Option类型的字段org.apache.spark.Accumulable.name 在 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1208) 在 org.apache.spark.Accumulable.readObject(Accumulators.scala:151) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 在 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 在 scala.collection.immutable.$colon$colon.readObject(List.scala:362) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 在 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 在 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) 在 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 在 java.lang.Thread.run(Thread.java:745) 原因:java.lang.ClassCastException:无法将 scala.Some 的实例分配给 org.apache.spark.Accumulator 实例中 scala.Option 类型的字段 org.apache.spark.Accumulable.name 在 java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) 在 java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) 在 java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) 在 org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:152) 在 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205) ... 30 更多 驱动程序堆栈跟踪: 在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 在 scala.Option.foreach(Option.scala:236) 在 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1843) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1856) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1869) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1940) 在 org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 在 org.apache.spark.rdd.RDD.collect(RDD.scala:926) 在 net.metrics.dada.streaming.application.BroadcastStreamTest1$.main(BroadcastStreamTest1.scala:14) 在 net.metrics.dada.streaming.application.BroadcastStreamTest1.main(BroadcastStreamTest1.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 原因:java.io.IOException:java.lang.ClassCastException:无法将 scala.Some 的实例分配给 org.apache.spark.Accumulator 实例中 scala.Option 类型的字段 org.apache.spark.Accumulable.name 在 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1208) 在 org.apache.spark.Accumulable.readObject(Accumulators.scala:151) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 在 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 在 scala.collection.immutable.$colon$colon.readObject(List.scala:362) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 在 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 在 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) 在 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 在 java.lang.Thread.run(Thread.java:745) 原因:java.lang.ClassCastException:无法将 scala.Some 的实例分配给 org.apache.spark.Accumulator 实例中 scala.Option 类型的字段 org.apache.spark.Accumulable.name 在 java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) 在 java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) 在 java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) 在 org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:152) 在 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205) ... 30 更多

【问题讨论】:

  • 嗯...我认为 scala.collection.mutable.Buffer[A] (非工作示例中的 content 变量的类型)与 scala 不同。 collection.mutable.ArrayBuffer[A] (你的工作示例中的 content 变量的类型),我想如果你希望你的广播变量是 ArrayBuffer[A] 类型,你肯定会得到一个 ClassCastException 时你传给它一个 Buffer[A]。
  • 这似乎不是问题。我没有声明我的广播变量期望一个特定的类型。我也尝试使用 toList 并创建一个新的 ArrayBuffer 并插入收集的数组中的所有元素。我认为可能与静态文件资源有关,但我不确定,它也可以使用提供的 sparkContext 在 spark shell 中工作。
  • 我发现问题与设置spark.executor.userClassPathFirst=true有关,可能是版本问题...找到原因后我会回答我的问题,

标签: scala apache-spark hdfs spark-streaming broadcast


【解决方案1】:

原因是与我提供的 jar 文件存在某种冲突。

没有设置

spark.executor.userClassPathFirst=true

它有效,不幸的是我无法找到问题的确切原因。

【讨论】:

  • 我没有使用这个conf,但是收到同样的错误。我仍在尝试找到正确的答案,但您的代码和使用的 Spark 之间的 Scala 版本似乎不兼容。
猜你喜欢
  • 2015-08-26
  • 2016-11-18
  • 1970-01-01
  • 1970-01-01
  • 2015-04-18
  • 1970-01-01
  • 1970-01-01
  • 2016-05-26
  • 1970-01-01
相关资源
最近更新 更多