【发布时间】:2020-09-05 21:50:42
【问题描述】:
我正在尝试使用 KSQLdb 为 Kafka Connect 配置 Confluent Cassandra Sink 连接器。
CREATE SINK CONNECTOR cassandra WITH(
"name" = 'CASSANDRA',
"connector.class" = 'io.confluent.connect.cassandra.CassandraSinkConnector',
"tasks.max" = '1',
"topics" = 'users',
"cassandra.contact.points" = 'cassandra',
"cassandra.keyspace" = 'test',
"confluent.topic.bootstrap.servers" = 'kafka:29092',
"confluent.topic.replication.factor" = '1',
"key.converter" = 'org.apache.kafka.connect.storage.StringConverter',
"value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
"transforms" = 'createKey,extractId',
"transforms.createKey.type" = 'org.apache.kafka.connect.transforms.ValueToKey',
"transforms.createKey.fields" = 'ID',
"transforms.extractId.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractId.field" = 'ID');
Users 主题是一个 KSQLdb 表。
打印主题;结果是:
密钥格式:KAFKA_STRING
值格式:JSON 或 KAFKA_STRING
行时间:2020/05/19 10:51:35.036 Z,键:P343434,值:{"ID":"P343434"}
例外:
Caused by: org.apache.kafka.connect.errors.DataException: Key must be a struct or map. This connector requires that records from Kafka contain the keys for the Cassandra table. Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.
异常提到 key 必须是 struct 或 map !!! 我做了转换给它一个钥匙,但问题仍然存在!
有什么方法可以排除故障或了解此 Cassandra 连接器所需的键/值格式吗?
【问题讨论】:
标签: apache-kafka cassandra apache-kafka-connect ksqldb