【问题标题】:Consuming Avro events from Kafka in Spark structured streaming在 Spark 结构化流中使用来自 Kafka 的 Avro 事件
【发布时间】:2019-11-28 06:37:57
【问题描述】:

我设计了一个 Nifi 流,以将 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流中使用它。

虽然 Kafka 部分工作正常,但 Spark 结构化流无法读取 Avro 事件。它失败并出现以下错误。

[Stage 0:>                                                          (0 + 1) / 1]2019-07-19 16:56:57 ERROR Utils:91 - Aborting task
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
        at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
        at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)

火花码

import org.apache.spark.sql.types.{ StructField, StructType }
import org.apache.spark.sql.types.{ DecimalType, LongType, ByteType, StringType }
import org.apache.spark.sql.types.DataType._
import scala.collection.Seq
import org.apache.spark._
import spark.implicits._
import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import java.nio.file.{Files, Path, Paths}

val spark = SparkSession.builder.appName("Spark-Kafka-Integration").master("local").getOrCreate()
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("schema.avsc")))
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:port").option("subscribe", "topic_name").load()
val df1 = df.select(from_avro(col("value"),jsonFormatSchema).as("data")).select("data.*")
df1.writeStream.format("console").option("truncate","false").start()
))

Spark 中使用的架构

{
 "type": "record",
 "name": "kafka_demo_new",
 "fields": [
  {
   "name": "host",
   "type": "string"
  },
  {
   "name": "event",
   "type": "string"
  },
  {
   "name": "connectiontype",
   "type": "string"
  },
  {
   "name": "user",
   "type": "string"
  },
  {
   "name": "eventtimestamp",
   "type": "string"
  }
 ]
}

Kafka 中的示例主题数据

{"host":"localhost","event":"Qradar_Demo","connectiontype":"tcp/ip","user":"user","eventtimestamp":"2018-05-24 23:15:07"}

以下是版本信息

HDP - 3.1.0
Kafka - 2.0.0
Spark - 2.4.0

感谢任何帮助。

【问题讨论】:

  • 问题可能是 Nifi 中的架构以及您使用的 Kafka 序列化程序。例如,您是否添加了 Schema Registry 提供程序?如果是这样,Spark 无法原生读取它
  • 您是否单独尝试了示例 avro 文件和 Spark SQL?没有 Kafka,没有 NIFI,没有结构化流,只有一个示例 avro 文件和 Spark SQL。这会给你一个更小的环境来隔离问题。我会试一试您的问题,但活动部件太多。
  • @Jacek Laskowski 我会单独尝试一下,avro 文件和 Spark SQL

标签: apache-spark avro spark-structured-streaming


【解决方案1】:

遇到了类似的问题,发现 Kafka / KSQL 有不同版本的 AVRO,导致其他组件抱怨。

这也可能是您的情况: 看看:https://github.com/confluentinc/ksql/issues/1742

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-09-30
    • 2017-04-04
    • 2019-07-29
    • 1970-01-01
    • 2021-11-23
    • 1970-01-01
    • 2020-01-31
    • 2019-09-13
    相关资源
    最近更新 更多