【发布时间】:2021-11-15 23:39:20
【问题描述】:
我的目标是从 HTTP 源获取 JSON 数据并使用 AVRO 序列化将其存储在 Kafka 主题中。
使用 Kafka Connect 和 HTTP source connector 以及一堆 SMT,我设法创建了一个 Connect 数据结构,当使用 StringConverter 写入主题时如下所示:
Struct{base=stations,cod=200,coord=Struct{lat=54.0,lon=9.0},dt=1632150605}
因此,JSON 被成功解析为 STRUCT,我可以使用 SMT 操作单个元素。接下来,我在 Confluent Schema Registry 中创建了一个具有相应模式的新主题,并将连接器的值转换器切换到 Confluent AVRO 转换器,并使用"value.converter": "io.confluent.connect.avro.AvroConverter"。
我收到一条错误消息,而不是预期的序列化:
org.apache.kafka.common.errors.SerializationException:序列化 Avro 消息时出错 引起:org.apache.avro.SchemaParseException:无法重新定义:io.confluent.connect.avro.ConnectDefault
只要我使用 ReplaceField 删除嵌套的 STRUCT 或使用 Flatten 简化结构,AVRO 序列化就像一个魅力。所以看起来转换器无法处理嵌套结构。
当您有嵌套元素并希望它们被序列化而不是将 JSON 存储为字符串并尝试在消费者或其他地方处理对象创建时,正确的方法是什么?这在 Kafka Connect 中是否可行?
【问题讨论】:
-
Avro 可以很好地处理嵌套记录。您遇到的问题是反序列化器无法处理具有不同字段的重复/重复命名空间记录...有一些“设置架构元数据”转换,您可能想查看
-
这会将异常转换为
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema。所以看起来这解决了最初的问题,但我仍然不完全在那里。它在消息中说明的模式与模式注册表中的模式相同,字符对字符。它还寻找什么来匹配架构? -
按 id 检索?模式文本不应该太重要,但我个人并没有使用我提到的转换。此外,该异常似乎在反序列化器之前,而不是像您的其他错误一样
-
只能按名称和版本检索,但只有在您在架构定义中包含附加条目
connect.name和connect.version时才有效。我在任何地方都没有看到记录的东西。不幸的是,我有点回到原点。内部 STRUCTcoord仍然被io.confluent.kafka.serializers.AbstractKafkaAvroSerializer解析为io.confluent.connect.avro.ConnectDefault,所以只要我添加第二个不同类型的内部 STRUCT,我就会得到原始错误。因此,我需要能够为每个嵌套元素设置元数据......但是如何?
标签: apache-kafka avro apache-kafka-connect