【问题标题】:Spark Kinesis Streaming Checkpoint Recovery: RDD nullpointer exceptionSpark Kinesis Streaming 检查点恢复:RDD 空指针异常
【发布时间】:2017-05-15 16:35:10
【问题描述】:

当从检查点恢复失败的作业时,应用程序逻辑被正确调用并重新实例化 RDD,但是对 RDD.map 的调用会导致 NullPointerException。

lazy val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)

private def createStreamingContext: StreamingContext = {
  val ssc = new StreamingContext(spark.sparkContext, batchInterval)
  ssc.checkpoint(checkpointDir)
  consumeStreamingContext(ssc)
  ssc
}

def consumeStreamingContext(ssc: StreamingContext) = {
  //... create dstreams
  val dstream = KinesisUtil.createStream(....
  ...

  dstream.checkpoint(batchInterval)

  dstream
    .foreachRDD(process)
}

def process(events: RDD[Event]) = {
  if (!events.isEmpty()) {
    logger.info("Transforming events for processing")
    //rdd seems to support some operations? 
    logger.info(s"RDD LENGTH: ${events.count}")
    //nullpointer exception on call to .map
    val df = events.map(e => {
      ...
    }

  }
}

编辑:更新通知我正在使用 Kinesis 并且 WAL 已启用。 S3 是否支持 WAL 检查点?我正在其他地方阅读没有得到很好的支持。 https://issues.apache.org/jira/browse/SPARK-9215

编辑:我在使用 HDFS 时遇到了类似的结果。

【问题讨论】:

    标签: apache-spark spark-streaming checkpointing


    【解决方案1】:

    我遇到了类似的问题 - 让我先解释一下我的问题,然后再解释一下我是如何解决它的。

    问题陈述:使用 Spark 流处理 Kinesis 数据。当 spark 流在 kinesis 之上工作时,我们仍然会得到一个非结构化流 (DStream),而不是我们在听 Kafka 时得到的结构化流。

    问题:RDD转DF或DataSet时出现空指针异常。下面是有问题的代码:

    def processData(spark: SparkSession, jobArgs: JobArgs, kinesisStream:ReceiverInputDStream[Array[Byte]]):Unit={
        val filenamesRDD = kinesisStream.map(decodeKinesisMessage)
        // Import spark implicits which add encoders for case classes.
        import spark.implicits._
        val events = filenamesRDD.flatMap(filenameToEvents(new AmazonS3Client))
        events.foreachRDD(rdd => {
          spark.createDataset(rdd)
            .write
            .partitionBy("date") // TODO add hour
            .mode(SaveMode.Append.name())
            .parquet(jobArgs.outputPath)
        })
    }
    

    问题出在哪里:此代码在检查点目录不存在时有效,但在检查点目录存在时失败并出现空指针异常。

    为什么:我的理论是它试图通过反序列化获取 SQLContext 和其他对象,但它们不可用。

    我是如何解决这个问题的:在将 rdd 转换为数据集之前再次构建 SQLContext。见以下代码:

    def processData(spark: SparkSession, kinesisStream: ReceiverInputDStream[Array[Byte]]): Unit = {
        val filenamesRDD = kinesisStream.map(decodeKinesisMessage)
        // Import spark implicits which add encoders for case classes.
        import spark.implicits._
        val events = filenamesRDD.flatMap(filenameToEvents(new AmazonS3Client))
    
        events.foreachRDD(rdd => {
          val sqlContext = SparkSession.builder().getOrCreate().sqlContext
          import sqlContext.implicits._
    
          val outputPath: String = sqlContext.sparkSession.conf.get("output.path")
          sqlContext.createDataset(rdd)
            .write
            .partitionBy("date") // TODO add hour
            .mode(SaveMode.Append.name())
            .parquet(outputPath)
        })
    }
    

    如果有帮助,请告诉我。

    谢谢, 侯赛因·博拉

    【讨论】:

      【解决方案2】:

      解决方案是在 foreach 中的每次转换后调用 rdd.checkpoint。每个 RDD 转换都必须设置检查点。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-12-23
        • 2017-06-22
        • 1970-01-01
        • 2019-10-16
        • 1970-01-01
        • 2017-07-02
        相关资源
        最近更新 更多