【问题标题】:Auto-Table generation in Cassandra with kafka connect cassandra sink使用 kafka connect cassandra sink 在 Cassandra 中自动生成表
【发布时间】:2019-05-01 11:43:43
【问题描述】:

我是using confluent.connect.cassandra.CassandraSinkConnector,为kafka连接cassandra sink。

我想知道是否可以使用 io.confluent.connect.cassandra.CassandraSinkConnector 作为连接器从 kafka 主题自动生成 cassandra 表。

如果可能,您能否建议设置什么配置来启用此功能。我已经尝试了文档中提到的所有配置,但我没有成功创建表。

这是我正在使用的配置:

{
  "name": "cassandra-test4",

  "config": {

    "connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",

    "tasks.max": "3",

    "topics": "orders-topic2",

    "cassandra.contact.points": "my_ip",

    "cassandra.keyspace": "test_cas",

    "cassandra.write.mode": "Insert",

    "cassandra.table.manage.enabled": "true",

    "cassandra.sink.route": "test_cas.orders",

    "key.converter.schema.registry.url": "http://localhost:8081",

    "value.converter.schema.registry.url": "http://localhost:8081",

    "value.converter": "io.confluent.connect.avro.AvroConverter",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",

    "flush.size": "1",

    "cassandra.keyspace.create.enabled": "true",

    "name": "cassandra-test4"

  },

  "tasks": [

    {

      "connector": "cassandra-test4",

      "task": 0

    },

    {

      "connector": "cassandra-test4",

      "task": 1

    },

    {

      "connector": "cassandra-test4",

      "task": 2

    }

  ],

  "type": null

}

【问题讨论】:

  • 你能显示你的主题数据,包括键吗?

标签: cassandra apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

这应该通过将cassandra.keyspace.create.enabledcassandra.table.manage.enabled 属性设置为true 来完成。见documentation

但请务必小心 - 在您的集群中很容易出现架构分歧,然后您需要执行额外的步骤才能从中恢复。最好在启动连接器之前预先创建表...

【讨论】:

  • 我已尝试使用上述属性,正在创建键空间,但无法创建表。
  • 也许您可以启用调试或跟踪日志记录?
  • @Yana 这些看起来不像 Kafka Connect 进程的日志,而是组协调员的日志
  • @cricket_007 感谢您指出我的错误,我检查了连接器日志并收到此错误:Caused by: org.apache.kafka.connect.errors.DataException: Key must be a struct or map. This connector requires that records from Kafka contain the keys for the Cass andra table. Please use a transformation like org.apache.kafka.connect.transforms. ValueToKey to create a key with the proper fields.
  • 能够通过在 kafka 主题的数据中使用结构键将我的数据加载到 cassandra。
猜你喜欢
  • 2020-04-14
  • 2021-10-12
  • 2019-04-16
  • 2019-08-16
  • 2018-06-13
  • 2018-03-24
  • 2020-05-28
  • 2017-11-08
  • 2020-02-27
相关资源
最近更新 更多