【问题标题】:Spark avro getting org.apache.spark.SparkException: Malformed records are detected in record parsing [duplicate]Spark avro 获取 org.apache.spark.SparkException:在记录解析中检测到格式错误的记录 [重复]
【发布时间】:2020-04-08 14:17:21
【问题描述】:

在我的 Spark 流式传输作业中,我试图从 Kafka 主题读取 Confluentavro 消息并得到“在记录解析中检测到格式错误的记录”。

我尝试了很多调试,但无法找出格式错误的记录。需要帮助了解如何从格式错误的行中获取记录。有没有办法可以打印 avro 消息以查看消息有什么问题。

我的代码:

object AvroReadMessage extends App {
val spark = SparkSession.builder.master("local[*]").appName("AvroReadMessage")
    .getOrCreate()
  spark.sparkContext.setLogLevel("WARN")

 val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("/read_message.avsc")))
 val readKafkaDF = spark.readStream.format("kafka")
   .option("kafka.bootstrap.servers", "localhost:9092")
   .option("subscribe", "topic2")
   .option("startingOffsets", "latest")
   .load()
val jmap = new java.util.HashMap[String, String]()
 jmap.put("mode", "PERMISSIVE")

  val query = readKafkaDF
    .select(from_avro('value, jsonFormatSchema, jmap) as 'value)
    .select("value.*")
    .writeStream.outputMode("append").format("console").start()

query.awaitTermination()
}

任何帮助将不胜感激。

【问题讨论】:

  • 您是否正在使用 Confluent Schema Registry 进行记录?

标签: apache-spark apache-kafka apache-spark-sql spark-streaming avro


【解决方案1】:

spark-avro 无法读取 Confluent Schema Registry 格式的数据。

请参考Integrating Spark Structured Streaming with the Confluent Schema Registry

【讨论】:

    猜你喜欢
    • 2023-03-03
    • 1970-01-01
    • 1970-01-01
    • 2022-11-24
    • 2023-01-09
    • 1970-01-01
    • 2020-01-20
    • 2011-02-15
    • 1970-01-01
    相关资源
    最近更新 更多