【发布时间】:2017-11-22 19:29:24
【问题描述】:
只能使用 Confluent Kafka Connect 将简单对象插入数据库。不知道如何使它支持复杂的 json/schema 结构。我不确定此功能是否可用。大约一年前有人问过类似的问题here,但直到现在还没有回答。请帮忙。
【问题讨论】:
标签: apache-kafka apache-kafka-connect
只能使用 Confluent Kafka Connect 将简单对象插入数据库。不知道如何使它支持复杂的 json/schema 结构。我不确定此功能是否可用。大约一年前有人问过类似的问题here,但直到现在还没有回答。请帮忙。
【问题讨论】:
标签: apache-kafka apache-kafka-connect
Kafka Connect 确实支持复杂的结构,包括 Struct、Map 和 Array。通常只有源连接器需要这样做,因为接收器连接器被传递了值并且只需要使用它们。 This documentation 描述了构建一个描述Struct 的Schema 对象的基础知识,然后创建一个遵循该模式的Struct 实例。在这种情况下,示例结构只是一个平面结构。
但是,您可以轻松添加使用另一个 Schema 实例定义的 Struct 类型的字段。实际上,它只是将这个简单的模式分层到结构中的多个级别:
Schema addressSchema = SchemaBuilder.struct().name(ADDRESS)
.field("number", Schema.INT16_SCHEMA)
.field("street", Schema.STRING_SCHEMA)
.field("city", Schema.STRING_SCHEMA)
.build();
Schema personSchema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT8_SCHEMA)
.field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
.field("address", addressSchema)
.build();
Struct addressStruct = new Struct(addressSchema)
.put("number", 100)
.put("street", "Main Street")
.put("city", "Springfield")
.build();
Struct personStruct = new Struct(personSchema)
.put("name", "Barbara Liskov")
.put("age", 75)
.put("address", addressStruct)
.build();
因为SchemaBuilder 是一个流畅的API,您实际上可以像自定义admin 布尔模式构建器一样嵌入它。但这有点困难,因为您需要引用 Schema 来创建 addressStruct。
通常您只需要在编写源连接器时担心如何执行此操作。如果您尝试使用现有的源连接器,您可能对键和值的结构几乎没有控制权。例如,Confluent's JDBC source connector 使用单独的 Schema 对每个表进行建模,并将该表中的每一行建模为使用该模式的单独 Struct。但由于行是扁平的,Schema 和 Struct 将只包含原始类型的字段。
Debezium'sMySQL 和 PostgreSQL 的 CDC 连接器还使用 Schema 为关系表建模,并为每一行对应 Struct 对象,但 CDC 会捕获有关该行的更多信息,例如更改之前和/或之后的行。因此,这些连接器对每个涉及嵌套 Struct 对象的表使用 a more complex Schema。
请注意,虽然每个源连接器都有自己的消息结构风格,但 Kafka Connect 的 Single Message Transforms (SMTs) 可以很容易地在源连接器生成的消息被写入之前对其进行过滤、重命名和轻微修改Kafka,或从 Kafka 读取的消息,然后再发送到接收器连接器。
【讨论】: