【发布时间】:2020-03-13 07:16:54
【问题描述】:
我正在尝试通过 SparkSession 从表中读取数据,并将其发布到 Kafka 主题。使用以下代码:
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.io._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.serialization.ByteArraySerializer
import java.io.{ByteArrayOutputStream, StringWriter}
object Producer extends Serializable {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
val lines= Source.fromFile("file")
val schema = new Schema.Parser().parse(lines)
val spark = new SparkSession.Builder().enableHiveSupport() getOrCreate()
import spark.implicits._
val df = spark.sql("select * from table")
df.rdd.map{
value => {
val prod = new KafkaProducer[String, Array[Byte]](props)
val records = new GenericData.Record(schema)
records.put("col1",value.getString(1))
records.put("col2",value.getString(2))
records.put("col3",value.getString(3))
records.put("col4",value.getString(4))
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
writer.write(records, encoder)
encoder.flush()
out.close()
val serializedBytes: Array[Byte] = out.toByteArray()
val record = new ProducerRecord("topic",col1.toString , serializedBytes)
val data = prod.send(record)
prod.flush()
prod.close() }
}
spark.close()
}
}
并且,当我执行它时会引发以下错误:
引起:java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema 序列化堆栈: - 对象不可序列化(类:org.apache.avro.Schema$RecordSchema,值: {"type":"record","name":"data","namespace":"com.data.record","fields":[{"name":"col1","type":"string" },{"name":"col2","type":"string"},{"name":"col3","type":"string"},{"name":"col4","type" :"字符串"}]})
字段(类:scala.runtime.ObjectRef,名称:elem,类型:类 java.lang.Object) 对象(类 scala.runtime.ObjectRef, {"type":"record","name":"data","namespace":"com.data.record","fields":[{"name":"col1","type":"string" },{"name":"col2","type":"string"},{"name":"col3","type":"string"},{"name":"col4","type" :“细绳”}]}) - 字段(类:com.kafka.driver.KafkaProducer.Producer$$anonfun$main$1,名称:schema$1,类型:class scala.runtime.ObjectRef)
但是,当我尝试使用 df.rdd.collect.foreach 将数据集传递给驱动程序时,它运行良好。相反,我需要在集群级别发布消息,因此使用 rdd.map 。不知道我到底错过了什么导致这个错误。任何帮助解决这个问题将不胜感激,谢谢!
【问题讨论】:
-
为什么需要映射RDD?您需要使用 Avro 吗?您只是在 Producer 上调用 send,而不是更改 RDD 内容。此外,Hive 表通常是 Kafka 数据的目的地,而不是源
-
@cricket_007 ,如果我不将数据集更改为 RDD,它会抛出:无法找到存储在数据集中的类型的编码器异常。是的,我需要使用 AVRO 格式。此外,在我的用例中,源是一个配置单元表,我从中读取数据并推送到 Kafka 主题。
-
我想我的意思是你可以只使用 JDBC 从 Hive 读取,然后使用标准的 Kafka 生产者。 Spark 可能包含为此的库,但对于这样一个简单的用例来说有点矫枉过正,而且不需要分发
-
Lunatech 最近发布了一篇博文,相信可以帮助您解决问题:lunatech.com/blog/Xc51ORQAACEAev0k/…
-
@cricket_007,由于源是一个具有 1000 多列和大量记录的 hive 表(每天需要处理约 2M),因此希望在传输时实现尽可能多的并行性网络上的数据。因此,为此使用火花。不过问题已经解决了。
标签: scala apache-spark apache-kafka avro kafka-producer-api