【发布时间】:2017-02-10 21:28:46
【问题描述】:
我正在使用下面的基本 Spark 流代码测试检查点并提前写入日志。我正在检查本地目录。在启动和停止应用程序几次后(使用 Ctrl-C) - 它会拒绝启动,因为检查点目录中的某些数据损坏。我得到:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 80.0 failed 1 times, most recent failure: Lost task 0.0 in stage 80.0 (TID 17, localhost): com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
完整代码:
import org.apache.hadoop.conf.Configuration
import org.apache.spark._
import org.apache.spark.streaming._
object ProtoDemo {
def createContext(dirName: String) = {
val conf = new SparkConf().setAppName("mything")
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint(dirName)
val lines = ssc.socketTextStream("127.0.0.1", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
val runningCounts = wordCounts.updateStateByKey[Int] {
(values: Seq[Int], oldValue: Option[Int]) =>
val s = values.sum
Some(oldValue.fold(s)(_ + s))
}
// Print the first ten elements of each RDD generated in this DStream to the console
runningCounts.print()
ssc
}
def main(args: Array[String]) = {
val hadoopConf = new Configuration()
val dirName = "/tmp/chkp"
val ssc = StreamingContext.getOrCreate(dirName, () => createContext(dirName), hadoopConf)
ssc.start()
ssc.awaitTermination()
}
}
【问题讨论】:
-
尝试使用可靠的文件系统,如hdfs,看看是否有任何错误。
-
用 S3 尝试过,它仍然会发生。我相信损坏发生在预写日志中。
-
您使用的是哪个版本的 spark ? 1.6 还是 2.0 ?
-
你是怎么运行这个的?单机模式,YARN,Mesos,本地?
-
Spark 2.0。我可以在独立模式下重现该问题,也可以在从 sbt 启动本地集群时重现该问题。示例项目:github.com/thesamet/spark-issue
标签: scala apache-spark spark-streaming