【问题标题】:Unable to solve the error: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema无法解决错误:java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
【发布时间】:2020-03-13 07:16:54
【问题描述】:

我正在尝试通过 SparkSession 从表中读取数据,并将其发布到 Kafka 主题。使用以下代码:

    import org.apache.avro.Schema
    import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
    import org.apache.avro.specific.SpecificDatumWriter
    import org.apache.avro.io._
    import org.apache.kafka.clients.CommonClientConfigs
    import org.apache.kafka.clients.producer._
    import org.apache.kafka.common.serialization.StringSerializer
    import org.apache.kafka.common.serialization.ByteArraySerializer
    import java.io.{ByteArrayOutputStream, StringWriter} 

object Producer extends Serializable {

  def main(args: Array[String]): Unit = {
        val props = new Properties()
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)

        val lines= Source.fromFile("file")
        val schema = new Schema.Parser().parse(lines)

        val spark = new SparkSession.Builder().enableHiveSupport() getOrCreate()

        import spark.implicits._
        val df = spark.sql("select * from table")

        df.rdd.map{
            value => {
              val prod = new KafkaProducer[String, Array[Byte]](props)

        val records = new GenericData.Record(schema)
              records.put("col1",value.getString(1))
              records.put("col2",value.getString(2))
              records.put("col3",value.getString(3))
              records.put("col4",value.getString(4))

        val writer = new SpecificDatumWriter[GenericRecord](schema)
              val out = new ByteArrayOutputStream()
              val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
              writer.write(records, encoder)
              encoder.flush()
              out.close()

        val serializedBytes: Array[Byte] = out.toByteArray()
        val record = new ProducerRecord("topic",col1.toString , serializedBytes)
        val data = prod.send(record)

        prod.flush()
        prod.close() }  
                  }

        spark.close()
     }
}

并且,当我执行它时会引发以下错误:

引起:java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema 序列化堆栈: - 对象不可序列化(类:org.apache.avro.Schema$RecordSchema,值: {"type":"record","name":"data","namespace":"com.data.record","fields":[{"name":"col1","type":"string" },{"name":"col2","type":"string"},{"name":"col3","type":"string"},{"name":"col4","type" :"字符串"}]})

字段(类:scala.runtime.ObjectRef,名称:elem,类型:类 java.lang.Object) 对象(类 scala.runtime.ObjectRef, {"type":"record","name":"data","namespace":"com.data.record","fields":[{"name":"col1","type":"string" },{"name":"col2","type":"string"},{"name":"col3","type":"string"},{"name":"col4","type" :“细绳”}]}) - 字段(类:com.kafka.driver.KafkaProducer.Producer$$anonfun$main$1,名称:schema$1,类型:class scala.runtime.ObjectRef)

但是,当我尝试使用 df.rdd.collect.foreach 将数据集传递给驱动程序时,它运行良好。相反,我需要在集群级别发布消息,因此使用 rdd.map 。不知道我到底错过了什么导致这个错误。任何帮助解决这个问题将不胜感激,谢谢!

【问题讨论】:

  • 为什么需要映射RDD?您需要使用 Avro 吗?您只是在 Producer 上调用 send,而不是更改 RDD 内容。此外,Hive 表通常是 Kafka 数据的目的地,而不是源
  • @cricket_007 ,如果我不将数据集更改为 RDD,它会抛出:无法找到存储在数据集中的类型的编码器异常。是的,我需要使用 AVRO 格式。此外,在我的用例中,源是一个配置单元表,我从中读取数据并推送到 Kafka 主题。
  • 我想我的意思是你可以只使用 JDBC 从 Hive 读取,然后使用标准的 Kafka 生产者。 Spark 可能包含为此的库,但对于这样一个简单的用例来说有点矫枉过正,而且不需要分发
  • Lunatech 最近发布了一篇博文,相信可以帮助您解决问题:lunatech.com/blog/Xc51ORQAACEAev0k/…
  • @cricket_007,由于源是一个具有 1000 多列和大量记录的 hive 表(每天需要处理约 2M),因此希望在传输时实现尽可能多的并行性网络上的数据。因此,为此使用火花。不过问题已经解决了。

标签: scala apache-spark apache-kafka avro kafka-producer-api


【解决方案1】:

发现对象、Schema 和 Kafka Producer 需要暴露给 executor。将上面的代码修改为:

    import org.apache.avro.Schema
    import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
    import org.apache.avro.specific.SpecificDatumWriter
    import org.apache.avro.io._
    import org.apache.kafka.clients.CommonClientConfigs
    import org.apache.kafka.clients.producer._
    import org.apache.kafka.common.serialization.StringSerializer
    import org.apache.kafka.common.serialization.ByteArraySerializer
    import java.io.{ByteArrayOutputStream, StringWriter} 

object Producer extends Serializable {

  def main(args: Array[String]): Unit = {
        val props = new Properties()
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)

        val spark = new SparkSession.Builder().enableHiveSupport() getOrCreate()

        import spark.implicits._
        val df = spark.sql("select * from table")

        df.foreachPartition{
            rows => {
              val prod = new KafkaProducer[String, Array[Byte]](props)
              val lines= Source.fromFile("file")
              val schema = new Schema.Parser().parse(lines)
            rows.foreach{
                  value =>
                      val records = new GenericData.Record(schema)
                      records.put("col1",value.getString(1))
                      records.put("col2",value.getString(2))
                      records.put("col3",value.getString(3))
                      records.put("col4",value.getString(4))

                      val writer = new SpecificDatumWriter[GenericRecord](schema)
                      val out = new ByteArrayOutputStream()
                      val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
                      writer.write(records, encoder)
                      encoder.flush()
                      out.close()

                      val serializedBytes: Array[Byte] = out.toByteArray()
                      val record = new ProducerRecord("topic",col1.toString , serializedBytes)
                      val data = prod.send(record)
                     }
        prod.flush()
        prod.close() 
                 }  
               }

        spark.close()
     }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-01-05
    • 2018-06-05
    • 2015-04-08
    • 2018-03-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多