【发布时间】:2017-03-22 21:55:42
【问题描述】:
我想出了一个例外:
ERROR yarn.ApplicationMaster:用户类抛出异常: org.apache.spark.SparkException:任务不可序列化 org.apache.spark.SparkException:任务不可序列化 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2032) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:889) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:888) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:306) 在 org.apache.spark.rdd.RDD.foreach(RDD.scala:888) 在 com.Boot$.test(Boot.scala:60) 在 com.Boot$.main(Boot.scala:36) 在 com.Boot.main(Boot.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525) 引起:java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer 序列化栈: - 对象不可序列化(类:org.apache.kafka.clients.producer.KafkaProducer,值: org.apache.kafka.clients.producer.KafkaProducer@77624599) - 字段(类:com.Boot$$anonfun$test$1,名称:producer$1,类型:类 org.apache.kafka.clients.producer.KafkaProducer) - org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 处的对象(com.Boot$$anonfun$test$1 类) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
// @transient
val sparkConf = new SparkConf()
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// @transient
val sc = new SparkContext(sparkConf)
val requestSet: RDD[String] = sc.textFile(s"hdfs:/user/bigdata/ADVERTISE-IMPRESSION-STAT*/*")
// @transient
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, NearLineConfig.kafka_brokers)
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type", "async")
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "49152")
// @transient
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
requestSet.foreachPartition((partisions: Iterator[String]) => {
partisions.foreach((line: String) => {
try {
producer.send(new ProducerRecord[String, String]("testtopic", line))
} catch {
case ex: Exception => {
log.warn(ex.getMessage, ex)
}
}
})
})
producer.close()
在这个程序中,我尝试从 hdfs 路径读取记录并将它们保存到 kafka。 问题是当我删除有关向 kafka 发送记录的代码时,它运行良好。 我错过了什么?
【问题讨论】:
标签: scala apache-spark kafka-producer-api