【问题标题】:Spark Streaming losing SparkContextSpark Streaming 丢失 SparkContext
【发布时间】:2016-08-06 17:20:35
【问题描述】:

我有一些非典型问题。当我尝试处理从 kafka 收到的 rdd 时,当我尝试访问 sparkContext 时出现异常(java.lang.NullPointerException)。 RDDProcessor 是可序列化的

def convertArrayToDataFrame(byteArray: Array[Byte], sqlContext: SQLContext) = {
val stringFromByte = b2s(byteArray)
val rdd = sqlContext.sparkContext.parallelize(stringFromByte.split("\n"))
val rows = rdd.map(_.split(";")).map(attributes => Row.fromSeq(attributes.toSeq))
val dateframe = sqlContext.createDataFrame(rows,RDDComponents.schema)
dateframe
}

问题开始于:

val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
  log.info("Received RDD attempt")
  if (!rdd.isEmpty()) {
      rdd.foreach(a => rddProcessor.processingRDD(a, sqlContext))
  }

但是当我只处理第一个rdd时,问题不会发生

val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
  log.info("Received RDD attempt")
  if (!rdd.isEmpty()) {
     rddProcessor.processingRDD(rdd.first(), sqlContext)
  }

我真的不知道为什么会有这么大的问题。如果有人有提示,我将不胜感激

@编辑 我定义 StreamingContext

val sparkConf = new SparkConf().setAppName("KafkaConsumer")
val ssc = new StreamingContext(sparkConf, Milliseconds(batchDuration))

【问题讨论】:

  • 您能否提供代码,在哪里定义您的 ssc : SparkContext?
  • 好的,我把这个加到帖子里了

标签: scala apache-kafka spark-streaming


【解决方案1】:

嗯,SparkContext 是不可序列化的,它可以通过SparkSessionSqlContext 中使用,它被标记为@transient。所以如果你不能写出processingRDD,使其从不使用SparkContext,你就不能在需要序列化的lambda中使用它,比如foreach's 或map's论点(但不是foreachRDD的!)。

【讨论】:

  • 我明白,但为什么它对一个 rdd 有效?当对象序列化时,所有函数也将被序列化?
  • foreachRDD 不将其参数发送到其他节点,它完全在驱动节点上运行(并且rdd.first() 将元素从其他节点发送到驱动节点)。 rdd.foreach 确实需要将其参数发送到每个节点以在那里执行它。
  • 当我必须用 sparkContext 处理这个 rdd 时,我怎么能跳过这个问题? rdd.colect() 有效,但方法不正确
  • 这取决于processingRDD 应该做什么。
  • 我在 foreachRDD 范围内创建了数据框并且它工作正常,所以我的每个问题都得到了解决。感谢 Alexey 的帮助。
猜你喜欢
  • 2017-10-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-03-19
  • 1970-01-01
  • 2015-11-06
  • 2020-12-24
相关资源
最近更新 更多