【发布时间】:2017-05-15 16:35:10
【问题描述】:
当从检查点恢复失败的作业时,应用程序逻辑被正确调用并重新实例化 RDD,但是对 RDD.map 的调用会导致 NullPointerException。
lazy val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)
private def createStreamingContext: StreamingContext = {
val ssc = new StreamingContext(spark.sparkContext, batchInterval)
ssc.checkpoint(checkpointDir)
consumeStreamingContext(ssc)
ssc
}
def consumeStreamingContext(ssc: StreamingContext) = {
//... create dstreams
val dstream = KinesisUtil.createStream(....
...
dstream.checkpoint(batchInterval)
dstream
.foreachRDD(process)
}
def process(events: RDD[Event]) = {
if (!events.isEmpty()) {
logger.info("Transforming events for processing")
//rdd seems to support some operations?
logger.info(s"RDD LENGTH: ${events.count}")
//nullpointer exception on call to .map
val df = events.map(e => {
...
}
}
}
编辑:更新通知我正在使用 Kinesis 并且 WAL 已启用。 S3 是否支持 WAL 检查点?我正在其他地方阅读没有得到很好的支持。 https://issues.apache.org/jira/browse/SPARK-9215
编辑:我在使用 HDFS 时遇到了类似的结果。
【问题讨论】:
标签: apache-spark spark-streaming checkpointing