【问题标题】:Spark: java.io.NotSerializableException火花:java.io.NotSerializableException
【发布时间】:2023-03-26 02:15:01
【问题描述】:

我想将 path 传递给在 Spark Streaming 中运行的函数 saveAsTextFile。但是,我得到了java.io.NotSerializableException。通常在类似的情况下,我使用骨架对象,但在这种特殊情况下,我不知道如何解决这个问题。有人可以帮帮我吗?

import java.util
import java.util.Properties
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.lambdaworks.jacks.JacksMapper
import org.sedis._
import redis.clients.jedis._
import com.typesafe.config.ConfigFactory
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.utils.Logging
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

class KafkaTestConsumer(val zkQuorum: String,
                        val group: String,
                        val topicMessages: String,
                        val path: String) extends Logging
{

// ...
// DStream[String]
dstream.foreachRDD { rdd =>
   // rdd -> RDD[String], each String is a JSON
   // Parsing each JSON
   // splitted -> RDD[Map[String,Any]]
   val splitted = rdd.map(line => Utils.parseJSON(line)) 
   // ...
   splitted.saveAsTextFile(path)
}

}

object Utils {

  def parseJSON[T](json: String): Map[String,Any] = {
    val mapper = new ObjectMapper() with ScalaObjectMapper
    mapper.registerModule(DefaultScalaModule)
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.readValue[Map[String,Any]](json)
  }
}

整个堆栈跟踪:

16/09/22 17:03:28 错误实用程序:遇到异常 java.io.NotSerializableException:org.consumer.kafka.KafkaTestConsumer 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) 在 org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:180) 在 org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:175) 在 org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:175) 在 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205) 在 org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:175) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 在 org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializableWithWriteObjectMethod(SerializationDebugger.scala:230) 在 org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:189) 在 org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108) 在 org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206) 在 org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108) 在 org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:67) 在 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) 在 org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:560) 在 org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) 在 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) 在 org.consumer.kafka.KafkaDecisionsConsumer.run(KafkaTestConsumer.scala:136) 在 org.consumer.ServiceRunner$.main(QueuingServiceRunner.scala:20) 在 org.consumer.ServiceRunner.main(QueuingServiceRunner.scala)

【问题讨论】:

  • 你能发布整个堆栈跟踪吗?
  • @bear911:完成。
  • 这个DStream的类型参数是什么?你能发布更多代码吗?
  • @bear911:好的,我用 cmets 发布了更多代码。
  • 问题可能出在 JSON 解析上。我会仔细检查您项目中使用的 Scala 版本,如果一切正常 - 也许使用其他一些 JSON 库。也许其他人可以解决这个问题。

标签: scala serialization apache-spark spark-streaming


【解决方案1】:

问题是您在 dstream 操作 forEach 中使用 rdd 操作 saveAsText 文件,该操作在工作人员上运行,这就是为什么当您在代码工作人员上方运行时尝试执行 splitted.saveAsTextFile(path) 时它会给出可序列化错误示例 这是 rdd 动作,这就是它给出序列化错误的原因,所以你可以像下面这样

dstream.foreachRDD { rdd =>
   // rdd -> RDD[String], each String is a JSON
   // Parsing each JSON
   // splitted -> RDD[Map[String,Any]]
   val splitted = rdd.map(line => Utils.parseJSON(line)) 
   // ...
}.saveAsTextFile(path)

【讨论】:

【解决方案2】:

在使用 Spark 2.3.0 版本时,我遇到了同样的问题,我没有删除检查点语句。 我通过做两件事解决了这个问题:"

运行命令:

chmod 777 检查点目录

并且 为抛出错误的类实现 Serializable 接口。

在您的情况下,您需要为下面的类实现 Serializable。希望能解决。

org.consumer.kafka.KafkaTestConsumer

【讨论】:

    猜你喜欢
    • 2015-07-17
    • 2011-01-22
    • 1970-01-01
    • 2017-04-27
    • 1970-01-01
    • 2016-10-20
    • 2016-06-07
    • 2016-12-18
    • 1970-01-01
    相关资源
    最近更新 更多