【问题标题】:DataException: Key must be a struct or mapDataException:键必须是结构或映射
【发布时间】:2020-09-05 21:50:42
【问题描述】:

我正在尝试使用 KSQLdb 为 Kafka Connect 配置 Confluent Cassandra Sink 连接器

  CREATE SINK CONNECTOR cassandra WITH(
    "name" = 'CASSANDRA',
    "connector.class" = 'io.confluent.connect.cassandra.CassandraSinkConnector',
    "tasks.max" = '1',
    "topics" = 'users',
    "cassandra.contact.points" = 'cassandra',
    "cassandra.keyspace" = 'test',
    "confluent.topic.bootstrap.servers" = 'kafka:29092',
    "confluent.topic.replication.factor" = '1',
    "key.converter" = 'org.apache.kafka.connect.storage.StringConverter',
    "value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
    "transforms" = 'createKey,extractId',
    "transforms.createKey.type" = 'org.apache.kafka.connect.transforms.ValueToKey',
    "transforms.createKey.fields" = 'ID',
    "transforms.extractId.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
     "transforms.extractId.field" = 'ID');

Users 主题是一个 KSQLdb 表。

打印主题;结果是:

密钥格式:KAFKA_STRING

值格式:JSON 或 KAFKA_STRING

行时间:2020/05/19 10:51:35.036 Z,键:P343434,值:{"ID":"P343434"}

例外:

Caused by: org.apache.kafka.connect.errors.DataException: Key must be a struct or map. This connector requires that records from Kafka contain the keys for the Cassandra table. Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.

异常提到 key 必须是 struct 或 map !!! 我做了转换给它一个钥匙,但问题仍然存在!

有什么方法可以排除故障或了解此 Cassandra 连接器所需的键/值格式吗?

【问题讨论】:

    标签: apache-kafka cassandra apache-kafka-connect ksqldb


    【解决方案1】:

    Confluent 的 Cassandra 连接器有以下限制(在我看来非常大):

    1. 主题的键必须与 Cassandra 表的主键直接匹配。因此,在您的情况下,您的 Cassandra 表应该具有由一列类型为text 的主键。如果您有复合主键,那么您需要将您的主题转换为另一个主题,其中结构或映射匹配 Cassandra 中的主键
    2. 主题的值应与表的“常规”列匹配(除主键外的所有内容)

    这种限制导致中间主题等的扩散。

    更灵活的解决方案可能是使用Kafka Sink Connector from DataStax

    1. 它没有这样的限制 - 您可以定义如何将您的主题映射到表格的字段中
    2. 重量轻,性能高
    3. 同时使用 DSE 和 Cassandra
    4. 支持从单个主题写入多个表,无需创建中间主题(根据 Confluent 版本的要求)
    5. ...

    【讨论】:

    • 我假设连接器将使用 Ksqldb 的模式自己创建 Cassandra 表。这可行吗?
    • 不,因为这是个坏主意——Cassandra 还没有(还)防止并发模式更改的保护,而且我已经看到很多由于模式更改不正确而引起的集群问题...
    猜你喜欢
    • 2013-03-16
    • 2015-05-15
    • 1970-01-01
    • 1970-01-01
    • 2017-09-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-11-04
    相关资源
    最近更新 更多