【问题标题】:How do you handle nested source data with AVRO serialization in Apache Kafka?如何在 Apache Kafka 中使用 AVRO 序列化处理嵌套的源数据?
【发布时间】:2021-11-15 23:39:20
【问题描述】:

我的目标是从 HTTP 源获取 JSON 数据并使用 AVRO 序列化将其存储在 Kafka 主题中。

使用 Kafka Connect 和 HTTP source connector 以及一堆 SMT,我设法创建了一个 Connect 数据结构,当使用 StringConverter 写入主题时如下所示:

Struct{base=stations,cod=200,coord=Struct{lat=54.0,lon=9.0},dt=1632150605}

因此,JSON 被成功解析为 STRUCT,我可以使用 SMT 操作单个元素。接下来,我在 Confluent Schema Registry 中创建了一个具有相应模式的新主题,并将连接器的值转换器切换到 Confluent AVRO 转换器,并使用"value.converter": "io.confluent.connect.avro.AvroConverter"

我收到一条错误消息,而不是预期的序列化:

org.apache.kafka.common.errors.SerializationException:序列化 Avro 消息时出错 引起:org.apache.avro.SchemaParseException:无法重新定义:io.confluent.connect.avro.ConnectDefault

只要我使用 ReplaceField 删除嵌套的 STRUCT 或使用 Flatten 简化结构,AVRO 序列化就像一个魅力。所以看起来转换器无法处理嵌套结构。

当您有嵌套元素并希望它们被序列化而不是将 JSON 存储为字符串并尝试在消费者或其他地方处理对象创建时,正确的方法是什么?这在 Kafka Connect 中是否可行?

【问题讨论】:

  • Avro 可以很好地处理嵌套记录。您遇到的问题是反序列化器无法处理具有不同字段的重复/重复命名空间记录...有一些“设置架构元数据”转换,您可能想查看
  • 这会将异常转换为org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema。所以看起来这解决了最初的问题,但我仍然不完全在那里。它在消息中说明的模式与模式注册表中的模式相同,字符对字符。它还寻找什么来匹配架构?
  • 按 id 检索?模式文本不应该太重要,但我个人并没有使用我提到的转换。此外,该异常似乎在反序列化器之前,而不是像您的其他错误一样
  • 只能按名称和版本检索,但只有在您在架构定义中包含附加条目connect.nameconnect.version 时才有效。我在任何地方都没有看到记录的东西。不幸的是,我有点回到原点。内部 STRUCT coord 仍然被 io.confluent.kafka.serializers.AbstractKafkaAvroSerializer 解析为 io.confluent.connect.avro.ConnectDefault,所以只要我添加第二个不同类型的内部 STRUCT,我就会得到原始错误。因此,我需要能够为每个嵌套元素设置元数据......但是如何?

标签: apache-kafka avro apache-kafka-connect


【解决方案1】:

可以通过不同的方式从 JSON 字符串创建 STRUCT 元素。最初,使用 SMT ExpandJson 是为了简单。但是,它没有创建足够命名的 STRUCT,因为它没有可供使用的模式。这就是导致初始错误消息的原因,因为 AVRO 序列化程序对这些 STRUCT 使用泛型类 io.confluent.connect.avro.ConnectDefault,如果存在多个 STRUCT,则会出现歧义,从而引发异常。

另一个看似相同的 SMT 是 Json Schema,它有一个记录在案的 FromJson 转换。它确实接受模式,从而解决了 ExpandJson 将嵌套元素解析为泛型类型的问题。但是,接受的是 JSON 模式,并且通过将单词“properties”作为命名空间并复制字段名称来映射到 AVRO 全名。在此示例中,您最终将使用 properties.coord 作为内部元素的全名。

例如,当将以下 JSON Schema 传递给 SMT 时:

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": {
    "coord": {
      "type": "object",
      "properties": {
        "lon": {
          "type": "number"
        },
        "lat": {
          "type": "number"
        }
      },
      "required": [
        "lon",
        "lat"
      ]
    },
    ...
}

它产生的 AVRO 模式(并因此在模式注册表中查找)是:

{
    "type": "record",
    "fields": [
        ...
        {
            "name": "coord",
            "type": {
                "type": "record",
                "name": "coord",
                "namespace": "properties",
                "fields": [
                    {
                        "name": "lat",
                        "type": "double"
                    },
                    {
                        "name": "lon",
                        "type": "double"
                    }
                ],
                "connect.name": "properties.coord"
            }
        },
    ...
}

理论上,如果您在第二级有另一个带有coord 元素的架构,它将获得相同的全名,但由于这些不是架构注册表中需要引用的单独条目,这不会导致碰撞。无法从 JSON Schema 控制 AVRO 记录的命名空间有点遗憾,因为感觉就像你就在那里,但我无法深入挖掘以提供解决方案。

建议的 SMT SetSchemaMetadata(请参阅问题的第一个回复)在此过程中可能很有用,但 it's documentation 与 AVRO 命名约定有些冲突,因为它在示例中显示了 order-value。它将尝试查找包含以此名称作为根元素的 AVRO 记录的模式,并且由于“-”是 AVRO 名称中的非法字符,因此您会收到错误消息。但是,如果您使用正确的根元素名称,SMT 会做一些非常有用的事情:它的 RestService 类会查询架构注册表以查找匹配的架构,但会失败并显示一条消息,打印出需要被创建,所以你不必记住所有的转换规则。

因此,原始问题的答案是:是的,可以使用 Kafka Connect 来完成。如果您这样做,这也是最好的选择

  • 不想编写自己的生产者/连接器
  • 希望以类型化的方式存储 JSON Blob,而不是在遇到初始主题后对其进行转换

如果数据摄取后转换是一个选项,de-, re- and serialization capabilities of ksqlDB 似乎非常强大。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-07-12
    • 1970-01-01
    • 2019-11-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-30
    相关资源
    最近更新 更多