【发布时间】:2021-09-25 00:32:18
【问题描述】:
我想使用带 JSON 且不带架构的 JDBC 接收器连接器。 他们写道(source):
如果您需要在没有 Schema Registry 的情况下使用 JSON 来连接数据,您可以 可以使用 Kafka 支持的 JsonConverter。下面的例子 显示了添加到 配置:
key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false
当属性 key.converter.schemas.enable 和 value.converter.schemas.enable 设置为 true,键或值是 不被视为纯 JSON,而是作为复合 JSON 对象 包含内部架构和数据。当这些是 为源连接器启用,架构和数据都在 复合 JSON 对象。当这些为接收器连接器启用时, 架构和数据是从复合 JSON 对象中提取的。 请注意,此实现从不使用架构注册表。
当属性 key.converter.schemas.enable 和 value.converter.schemas.enable 设置为 false(默认),仅 数据在没有模式的情况下传递。这减少了有效载荷 不需要架构的应用程序的开销。
我配置了连接器:
{
"name": "noschemajustjson",
"config": {
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"schemas.enable": "false",
"name": "noschemajustjson",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"config.action.reload": "restart",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": "testconnect2",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "********",
"dialect.name": "PostgreSqlDatabaseDialect",
"table.name.format": "utp",
"auto.create": "false",
"auto.evolve": "false"
}
}
但我仍然收到错误:
原因:org.apache.kafka.connect.errors.ConnectException: Sink 连接器“noschemajustjson2”配置为 'delete.enabled=false' 和 'pk.mode=none' 因此需要 具有非空结构值和非空结构模式的记录,但是 发现记录在 (topic='testconnect2',partition=0,offset=0,timestamp=1626416739697) 具有 HashMap 值和空值模式。
那么我应该怎么做才能强制 Connect 在没有架构的情况下工作(只有纯 JSON)?
【问题讨论】: