【问题标题】:Checkpoint data corruption in Spark StreamingSpark Streaming 中的检查点数据损坏
【发布时间】:2017-02-10 21:28:46
【问题描述】:

我正在使用下面的基本 Spark 流代码测试检查点并提前写入日志。我正在检查本地目录。在启动和停止应用程序几次后(使用 Ctrl-C) - 它会拒绝启动,因为检查点目录中的某些数据损坏。我得到:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 80.0 failed 1 times, most recent failure: Lost task 0.0 in stage 80.0 (TID 17, localhost): com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)

完整代码:

import org.apache.hadoop.conf.Configuration
import org.apache.spark._
import org.apache.spark.streaming._

object ProtoDemo {
  def createContext(dirName: String) = {
    val conf = new SparkConf().setAppName("mything")
    conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

    val ssc = new StreamingContext(conf, Seconds(1))
    ssc.checkpoint(dirName)
    val lines = ssc.socketTextStream("127.0.0.1", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    val runningCounts = wordCounts.updateStateByKey[Int] {
      (values: Seq[Int], oldValue: Option[Int]) =>
        val s = values.sum
        Some(oldValue.fold(s)(_ + s))
      }

  // Print the first ten elements of each RDD generated in this DStream to the console
    runningCounts.print()
    ssc
  }

  def main(args: Array[String]) = {
    val hadoopConf = new Configuration()
    val dirName = "/tmp/chkp"
    val ssc = StreamingContext.getOrCreate(dirName, () => createContext(dirName), hadoopConf)
    ssc.start()
    ssc.awaitTermination()
  }

}

【问题讨论】:

  • 尝试使用可靠的文件系统,如hdfs,看看是否有任何错误。
  • 用 S3 尝试过,它仍然会发生。我相信损坏发生在预写日志中。
  • 您使用的是哪个版本的 spark ? 1.6 还是 2.0 ?
  • 你是怎么运行这个的?单机模式,YARN,Mesos,本地?
  • Spark 2.0。我可以在独立模式下重现该问题,也可以在从 sbt 启动本地集群时重现该问题。示例项目:github.com/thesamet/spark-issue

标签: scala apache-spark spark-streaming


【解决方案1】:

基本上,您要尝试做的是驱动程序故障场景,要使其工作,根据您正在运行的集群,您必须按照以下说明监控驱动程序进程并在失败时重新启动驱动程序

配置应用程序驱动程序的自动重新启动 - 要从驱动程序故障中自动恢复,用于运行流式应用程序的部署基础架构必须监视驱动程序进程并在它失败时重新启动驱动程序。不同的cluster managers 有不同的工具来实现这一点。

  1. Spark Standalone - 可以将 Spark 应用程序驱动程序提交到 在 Spark Standalone 集群中运行(参见cluster deploy mode),也就是说,应用程序驱动程序本身运行在其中一个 工作节点。此外,独立集群管理器可以 指示监督司机,如果司机重新启动它 由于非零退出代码或由于 运行驱动程序的节点。在 Spark 中查看集群模式和监督 独立guide 了解更多详情。

  2. YARN - Yarn 支持自动重启应用程序的类似机制。请参阅 YARN 文档 更多细节。

  3. Mesos - Marathon 已用于通过 Mesos 实现这一目标。

您需要如下配置预写日志,您需要遵循 S3 的特殊说明。

在使用 S3(或任何不支持刷新的文件系统)进行预写日志时,请记住启用

spark.streaming.driver.writeAheadLog.closeFileAfterWrite spark.streaming.receiver.writeAheadLog.closeFileAfterWrite.

有关更多详细信息,请参阅 Spark Streaming Configuration

【讨论】:

  • 我描述的问题发生在失败后手动重启时。当我第二次或第三次重新启动时,我会得到问题中提到的异常。自动重启会有什么不同吗?
  • 您是否将这些设置为 true ? spark.streaming.driver.writeAheadLog.closeFileAfterWrite spark.streaming.receiver.writeAheadLog.closeFileAfterWrite.
  • 是的,在使用 S3 进行测试时仍然存在此问题。本地文件系统需要它们吗?
【解决方案2】:

这个问题看起来更像是 Kryo Serializer 问题而不是检查点损坏。 在代码示例(包括 GitHub 项目)中,未配置 Kryo 序列化。 由于未配置 KryoException 异常无法发生。

当使用“预写日志”并从目录恢复时,所有 Spark 配置都从那里获取。 在您的示例中,createContext 方法在从检查点开始时不会调用。

我认为问题是另一个应用程序之前使用相同的检查点目录进行了测试,其中配置了 Kryo Serializer。 并且当前应用程序无法从该检查点恢复。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-09
    • 2021-06-03
    • 2017-12-23
    • 1970-01-01
    • 2020-10-29
    • 2017-06-22
    相关资源
    最近更新 更多