我在这个上花了几个小时
其实和 Abris 依赖无关(行为与原生 spark-avro apis 相同)
可能有几个根本原因,但就我而言……使用 Spark 3.0.1、Scala 和 Dataset:它与编码器和案例类处理数据中的错误类型有关。
简而言之,用 "type": ["null","int"] 定义的 avro 字段不能映射到 scala Int,它需要 Option[Int]
使用以下代码:
test("Avro Nullable field") {
val schema: String =
"""
|{
| "namespace": "com.mberchon.monitor.dto.avro",
| "type": "record",
| "name": "TestAvro",
| "fields": [
| {"name": "strVal", "type": ["null", "string"]},
| {"name": "longVal", "type": ["null", "long"]}
| ]
|}
""".stripMargin
val topicName = "TestNullableAvro"
val testInstance = TestAvro("foo",Some(Random.nextInt()))
import sparkSession.implicits._
val dsWrite:Dataset[TestAvro] = Seq(testInstance).toDS
val allColumns = struct(dsWrite.columns.head, dsWrite.columns.tail: _*)
dsWrite
.select(to_avro(allColumns,schema) as 'value)
.write
.format("kafka")
.option("kafka.bootstrap.servers", bootstrap)
.option("topic", topicName)
.save()
val dsRead:Dataset[TestAvro] = sparkSession.read
.format("kafka")
.option("kafka.bootstrap.servers", bootstrap)
.option("subscribe", topicName)
.option("startingOffsets", "earliest")
.load()
.select(from_avro(col("value"), schema) as 'Metric)
.select("Metric.*")
.as[TestAvro]
assert(dsRead.collect().contains(testInstance))
}
如果case类定义如下:
case class TestAvro(strVal:String,longVal:Long)
无法将 Catalyst 类型 LongType 转换为 Avro 类型 ["null","long"]。
org.apache.spark.sql.avro.IncompatibleSchemaException:无法将 Catalyst 类型 LongType 转换为 Avro 类型 ["null","long"]。
在 org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219)
在 org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)
它适用于:
case class TestAvro(strVal:String,longVal:Option[Long])
顺便说一句,在 Spark 编码器中支持 SpecificRecord 会非常好(您可以使用 Kryo,但效率较低)
因为,为了在我的 avro 数据中使用有效类型的数据集……我需要创建额外的案例类(与我的 SpecificRecords 重复)。