【问题标题】:The usage of serializable object: Caused by: java.io.NotSerializableException可序列化对象的使用: 引起:java.io.NotSerializableException
【发布时间】:2016-11-27 19:43:21
【问题描述】:

我关注this tutorial 和其他关于任务序列化的类似教程,但我的代码因Task serialization 错误而失败。我不明白为什么会这样。我在foreachRDD 之外设置变量topicOutputMessages,然后在foreachPartition 中读取它。我还创建了KafkaProducer INSIDE foreachPartition。那么,这里有什么问题呢?实在看不懂。

al topicsSet = topicInputMessages.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> metadataBrokerList_InputQueue)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)


messages.foreachRDD(rdd => {
    rdd.foreachPartition{iter =>
        UtilsDM.setMetadataBrokerList(metadataBrokerList)
        UtilsDM.setOutputTopic(topicOutputMessages)
        val producer = UtilsDM.createProducer
        iter.foreach { msg =>
              val record = new ProducerRecord[String, String](UtilsDM.getOutputTopic(), msg)
              producer.send(record)
        }
        producer.close()
    }
})

编辑:

object UtilsDM extends Serializable {

  var topicOutputMessages: String = ""
  var metadataBrokerList: String = ""
  var producer: KafkaProducer[String, String] = null

  def setOutputTopic(t: String): Unit = {
    topicOutputMessages = t
  }

  def setMetadataBrokerList(m: String): Unit = {
    metadataBrokerList = m
  }

 def createProducer: KafkaProducer[String, String] = {

    val  kafkaProps = new Properties()

    kafkaProps.put("bootstrap.servers", metadataBrokerList)

    // This is mandatory, even though we don't send key
    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("acks", "1")

    // how many times to retry when produce request fails?
    kafkaProps.put("retries", "3")
    // This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes)
    kafkaProps.put("batch.size", "5")
    // How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch
    kafkaProps.put("linger.ms", "5")

    new KafkaProducer[String, String](kafkaProps)
  }

}

完整的堆栈跟踪:

16/11/21 13:47:30 ERROR JobScheduler: Error running job streaming job 1479732450000 ms.0
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer
Serialization stack:
    - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more
16/11/21 13:47:30 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer
Serialization stack:
    - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more

【问题讨论】:

  • 上面代码中的KafkaDecisionsConsumer是什么?我认为原因不在提供的代码示例上。应该有一些函数run 导致了这个问题。
  • @maasg:请看我的回答。有函数runKafkaDecisionsConsumer改名为TestRunner)。基本上run 是放置整个代码的地方。但是,我实际上能够通过使用broadcast 变量和foreachPartition 来解决这个问题。我会尽快更新我的答案。

标签: scala apache-spark


【解决方案1】:

序列化问题在于Spark如何处理闭包序列化(你可以在这个答案中详细阅读:How spark handles object

在失败的代码中,在这里引用metadataBrokerListtopicOutputMessages

rdd.foreachPartition{iter =>
    UtilsDM.setMetadataBrokerList(metadataBrokerList)
    UtilsDM.setOutputTopic(topicOutputMessages)

创建对创建这些变量的外部对象的引用,并强制 Spark 中的闭包清理器包含在“清理的”闭包中。 outer 然后在闭包中包含 sparkContextstreamingContext,它们不可序列化,因此会出现序列化异常。

在第二次尝试中(在作为答案发布的解决方法中),这些链接被破坏,因为变量现在包含在帮助对象中,并且闭包可以从 outer 上下文中“彻底清除”。

我认为在UtilsDM 对象中没有必要将@transient 添加到这些变量中,因为这些值是可序列化的。请注意,在每个执行程序中都会重新创建单例对象。因此驱动中改变的可变变量的值在执行器中是不可用的,如果处理不当,往往会导致 NullPointerException。

有一个序列化技巧可以帮助原始场景:

在闭包中复制引用的变量。例如

rdd.foreachPartition{iter =>
    val innerMDBL  = metadataBrokerList
    val innerTOM = topicOutputMessages
    UtilsDM.setMetadataBrokerList(innerMDBL)
    UtilsDM.setOutputTopic(innerTOM)

这样,值在编译时被复制,也没有与外部链接。

为了处理执行程序绑定的对象(例如不可序列化的连接甚至本地缓存),我更喜欢使用实例工厂方法,就像在这个答案中解释的那样:Redis on Spark:Task not serializable

【讨论】:

  • 好详细的答案!
【解决方案2】:

我认为问题在于您的UtilsDM 课程。它被闭包捕获,Spark 尝试序列化代码以将其发送给执行程序。

尝试使 UtilsDM 可序列化或在 foreachRDD 函数中创建它。

【讨论】:

  • 感谢您的反馈。我刚刚发布了UtilsDM 的代码。确实它扩展了Serializable,所以我希望它是可序列化的......
  • 如何在foreachRDD 中创建UtilsDM?您能否举个例子,以便我进行测试。非常感谢。
  • 能否提供完整的堆栈跟踪?您使用的是哪个版本的 Spark?
  • 我使用的是 Spark 1.6.2 和 Scala 2.10.6。好的,请稍等。
  • 我发布了完整的堆栈跟踪。另请参阅编辑以查看相应版本的代码。谢谢。
【解决方案3】:

这不是我问题的答案,但它是可行的选项。也许有人可以在最终答案中详细说明?这个解决方案的缺点是metadataBrokerListtopicOutputMessages应该使用@transient lazy val topicOutputMessages@transient lazy val topicOutputMessages@transient lazy val metadataBrokerListUtilsTest的代码中修复,但理想情况下我希望能够将这些参数作为输入参数传递:

object TestRunner {

  var zkQuorum: String = ""
  var metadataBrokerList: String = ""
  var group: String = ""
  val topicInputMessages: String = ""

  def main(args: Array[String]) {

    if (args.length < 14) {
      System.err.println("Usage: TestRunner <zkQuorum> <metadataBrokerList> " +
                          "<group> <topicInputMessages>")
      System.exit(1)
    }

    val Array(zkQuorum,metadataBrokerList,group,topicInputMessages) = args

    setParameters(zkQuorum,metadataBrokerList,group,topicInputMessages)

    run(kafka_num_threads.toInt)

  }


  def setParameters(mi: String,
                    mo: String,
                    g: String,t: String) {
    zkQuorum = mi
    metadataBrokerList = mo
    group = g
    topicInputMessages = t
  }


def run(kafkaNumThreads: Int) = {
    val conf = new SparkConf()
      .setAppName("TEST")
    val sc = new SparkContext(conf)
    sc.setCheckpointDir("~/checkpointDir")

val ssc = new StreamingContext(sc, Seconds(5)) 

val topicMessagesMap = topicInputMessages.split(",").map((_, 1)).toMap
    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMessagesMap).map(_._2)

    messages.foreachRDD(rdd => {
      rdd.foreachPartition{iter =>
        val producer = UtilsTest.createProducer
        iter.foreach { msg =>
          val record = new ProducerRecord[String, String](UtilsTest.getOutputTopic(), msg)
          producer.send(record)
        }
        producer.close()
      }

    })

    ssc.start()
    ssc.awaitTermination()

  }

}



object UtilsDM extends Serializable {

  @transient lazy val topicOutputMessages: String = "myTestTopic"
  @transient lazy val metadataBrokerList: String = "172.12.34.233:9092"

  var producer: KafkaProducer[String, String] = null

 def createProducer: KafkaProducer[String, String] = {

    val  kafkaProps = new Properties()

    kafkaProps.put("bootstrap.servers", metadataBrokerList)

    // This is mandatory, even though we don't send key
    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("acks", "1")

    // how many times to retry when produce request fails?
    kafkaProps.put("retries", "3")
    // This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes)
    kafkaProps.put("batch.size", "5")
    // How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch
    kafkaProps.put("linger.ms", "5")

    new KafkaProducer[String, String](kafkaProps)
  }

}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-07-19
    • 2012-09-08
    • 2021-08-12
    • 2015-10-15
    • 1970-01-01
    • 2014-02-10
    • 2014-02-05
    • 1970-01-01
    相关资源
    最近更新 更多