【发布时间】:2020-04-08 14:17:21
【问题描述】:
在我的 Spark 流式传输作业中,我试图从 Kafka 主题读取 Confluentavro 消息并得到“在记录解析中检测到格式错误的记录”。
我尝试了很多调试,但无法找出格式错误的记录。需要帮助了解如何从格式错误的行中获取记录。有没有办法可以打印 avro 消息以查看消息有什么问题。
我的代码:
object AvroReadMessage extends App {
val spark = SparkSession.builder.master("local[*]").appName("AvroReadMessage")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("/read_message.avsc")))
val readKafkaDF = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic2")
.option("startingOffsets", "latest")
.load()
val jmap = new java.util.HashMap[String, String]()
jmap.put("mode", "PERMISSIVE")
val query = readKafkaDF
.select(from_avro('value, jsonFormatSchema, jmap) as 'value)
.select("value.*")
.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
}
任何帮助将不胜感激。
【问题讨论】:
-
您是否正在使用 Confluent Schema Registry 进行记录?
标签: apache-spark apache-kafka apache-spark-sql spark-streaming avro