【问题标题】:Cannot convert Catalyst type IntegerType to Avro type ["null","int"]无法将 Catalyst 类型 IntegerType 转换为 Avro 类型 ["null","int"]
【发布时间】:2020-11-18 09:22:06
【问题描述】:

我使用 Pyspark 构建了 Spark Structured Streaming 流程,它从 kafka 主题中读取 avro 消息,进行一些转换并将数据作为 avro 加载到目标主题中。

我使用 ABRIS 包 (https://github.com/AbsaOSS/ABRiS) 序列化/反序列化来自 Confluent 的 Avro,并与 Schema Registry 集成。

架构包含如下整数列:

{
  "name": "total_images",
  "type": [
    "null",
    "int"
  ],
  "default": null
},
{
  "name": "total_videos",
  "type": [
    "null",
    "int"
  ],
  "default": null
},

该过程引发以下错误:Cannot convert Catalyst type IntegerType to Avro type ["null","int"].

我尝试将列转换为可为空,但错误仍然存​​在。

如果有人有建议,我将不胜感激

【问题讨论】:

    标签: apache-spark pyspark apache-kafka avro


    【解决方案1】:

    我在这个上花了几个小时

    其实和 Abris 依赖无关(行为与原生 spark-avro apis 相同)

    可能有几个根本原因,但就我而言……使用 Spark 3.0.1、Scala 和 Dataset:它与编码器和案例类处理数据中的错误类型有关。

    简而言之,用 "type": ["null","int"] 定义的 avro 字段不能映射到 scala Int,它需要 Option[Int]

    使用以下代码:

    test("Avro Nullable field") {
    val schema: String =
      """
        |{
        | "namespace": "com.mberchon.monitor.dto.avro",
        | "type": "record",
        | "name": "TestAvro",
        | "fields": [
        |  {"name": "strVal", "type": ["null", "string"]},
        |  {"name": "longVal",  "type": ["null", "long"]}
        |  ]
        |}
      """.stripMargin
    val topicName = "TestNullableAvro"
    val testInstance = TestAvro("foo",Some(Random.nextInt()))
    
    import sparkSession.implicits._
    
    val dsWrite:Dataset[TestAvro] = Seq(testInstance).toDS
    val allColumns = struct(dsWrite.columns.head, dsWrite.columns.tail: _*)
    
    dsWrite
      .select(to_avro(allColumns,schema) as 'value)
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrap)
      .option("topic", topicName)
      .save()
    
    val dsRead:Dataset[TestAvro] = sparkSession.read
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrap)
      .option("subscribe", topicName)
      .option("startingOffsets", "earliest")
      .load()
      .select(from_avro(col("value"), schema) as 'Metric)
      .select("Metric.*")
      .as[TestAvro]
    
    assert(dsRead.collect().contains(testInstance))
    

    }

    如果case类定义如下:

    case class TestAvro(strVal:String,longVal:Long)
    

    无法将 Catalyst 类型 LongType 转换为 Avro 类型 ["null","long"]。 org.apache.spark.sql.avro.IncompatibleSchemaException:无法将 Catalyst 类型 LongType 转换为 Avro 类型 ["null","long"]。 在 org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219) 在 org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)

    它适用于:

    case class TestAvro(strVal:String,longVal:Option[Long])
    

    顺便说一句,在 Spark 编码器中支持 SpecificRecord 会非常好(您可以使用 Kryo,但效率较低) 因为,为了在我的 avro 数据中使用有效类型的数据集……我需要创建额外的案例类(与我的 SpecificRecords 重复)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-12-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-25
      相关资源
      最近更新 更多