【发布时间】:2023-03-26 02:15:01
【问题描述】:
我想将 path 传递给在 Spark Streaming 中运行的函数 saveAsTextFile。但是,我得到了java.io.NotSerializableException。通常在类似的情况下,我使用骨架对象,但在这种特殊情况下,我不知道如何解决这个问题。有人可以帮帮我吗?
import java.util
import java.util.Properties
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.lambdaworks.jacks.JacksMapper
import org.sedis._
import redis.clients.jedis._
import com.typesafe.config.ConfigFactory
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.utils.Logging
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
class KafkaTestConsumer(val zkQuorum: String,
val group: String,
val topicMessages: String,
val path: String) extends Logging
{
// ...
// DStream[String]
dstream.foreachRDD { rdd =>
// rdd -> RDD[String], each String is a JSON
// Parsing each JSON
// splitted -> RDD[Map[String,Any]]
val splitted = rdd.map(line => Utils.parseJSON(line))
// ...
splitted.saveAsTextFile(path)
}
}
object Utils {
def parseJSON[T](json: String): Map[String,Any] = {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.readValue[Map[String,Any]](json)
}
}
整个堆栈跟踪:
16/09/22 17:03:28 错误实用程序:遇到异常 java.io.NotSerializableException:org.consumer.kafka.KafkaTestConsumer 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) 在 org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:180) 在 org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:175) 在 org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:175) 在 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205) 在 org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:175) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 在 org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializableWithWriteObjectMethod(SerializationDebugger.scala:230) 在 org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:189) 在 org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108) 在 org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206) 在 org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108) 在 org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:67) 在 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) 在 org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:560) 在 org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) 在 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) 在 org.consumer.kafka.KafkaDecisionsConsumer.run(KafkaTestConsumer.scala:136) 在 org.consumer.ServiceRunner$.main(QueuingServiceRunner.scala:20) 在 org.consumer.ServiceRunner.main(QueuingServiceRunner.scala)
【问题讨论】:
-
你能发布整个堆栈跟踪吗?
-
@bear911:完成。
-
这个DStream的类型参数是什么?你能发布更多代码吗?
-
@bear911:好的,我用 cmets 发布了更多代码。
-
问题可能出在 JSON 解析上。我会仔细检查您项目中使用的 Scala 版本,如果一切正常 - 也许使用其他一些 JSON 库。也许其他人可以解决这个问题。
标签: scala serialization apache-spark spark-streaming