【发布时间】:2017-08-03 16:26:52
【问题描述】:
我在这里使用开源 Kafka Cassandra 连接器:https://github.com/tuplejump/kafka-connect-cassandra
我按照教程进行了设置说明。但是,连接器不会将任何数据插入我的数据库。这是我的 sink.properties 文件的内容:
name=cassandra-sink-connector
connector.class=com.tuplejump.kafka.connect.cassandra.CassandraSink
tasks.max=1
topics=hello-mqtt-kafka
cassandra.sink.route.hello-mqtt-kafka=devices_data.messages
我运行 Kafka、Cassandra 和 Zookeeper,他们正在工作。我向“hello-kafka”主题发送了一些消息。出于测试目的,我运行了控制台使用者,它会看到所有消息:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"text"}],"optional":false,"name":"devices.schema"},"payload":{"id":75679795,"text":"example5"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"text"}],"optional":false,"name":"devices.schema"},"payload":{"id":86874233,"text":"example6"}}
这是我的 cassandra 表的架构:
CREATE TABLE IF NOT EXISTS devices_data.messages (
id int,
created text,
message text,
PRIMARY KEY (id, created))
WITH ID = 2de24390-03d5-11e7-a32a-ed242ef1cc00
AND CLUSTERING ORDER BY (created ASC)
AND bloom_filter_fp_chance = 0.01
AND dclocal_read_repair_chance = 0.1
AND crc_check_chance = 1.0
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND min_index_interval = 128
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE'
AND comment = ''
AND caching = { 'keys': 'ALL', 'rows_per_partition': 'NONE' }
AND compaction = { 'max_threshold': '32', 'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' }
AND compression = { 'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor' }
现在,我的连接器正在运行,它没有抛出任何错误,但是当我从 cqlsh 进行选择查询时,我看到我的数据没有插入到 cassandra。我已按照设置说明进行操作,来自工作人员的日志也没有显示任何问题。出于调试目的,我将一些格式错误的数据传递给了 kafka,并且连接器报告了消息格式的错误。因此,它肯定会看到消息,但由于某种原因,它不会将其插入数据库。
我在这个错误上坐了好几个小时,不知道哪里出了问题……我真的很感激任何帮助或知道我可能错过了什么。
这是来自连接器的日志。关于“提交偏移量”的最后几行一直在重复,但数据库中没有出现任何内容。
[2017-03-13 15:46:40,540] INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-03-13 15:46:40,540] INFO Kafka commitId : b8642491e78c5a13 (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-03-13 15:46:40,547] INFO Created connector cassandra-sink-connector (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-03-13 15:46:40,554] INFO Configured 1 Kafka - Cassandra mappings. (com.tuplejump.kafka.connect.cassandra.CassandraSinkTask:86)
[2017-03-13 15:46:40,955] INFO Did not find Netty's native epoll transport in the classpath, defaulting to NIO. (com.datastax.driver.core.NettyUtil:83)
[2017-03-13 15:46:42,039] INFO Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor) (com.datastax.driver.core.policies.DCAwareRoundRobinPolicy:95)
[2017-03-13 15:46:42,040] INFO New Cassandra host localhost/127.0.0.1:9042 added (com.datastax.driver.core.Cluster:1475)
[2017-03-13 15:46:42,041] INFO Connected to Cassandra cluster: Test Cluster (com.tuplejump.kafka.connect.cassandra.CassandraCluster:81)
[2017-03-13 15:46:42,114] INFO com.datastax.driver.core.SessionManager@63cd5271 created. (com.tuplejump.kafka.connect.cassandra.CassandraCluster:84)
[2017-03-13 15:46:42,146] INFO CassandraSinkTask starting with 1 routes. (com.tuplejump.kafka.connect.cassandra.CassandraSinkTask:189)
[2017-03-13 15:46:42,149] INFO Sink task WorkerSinkTask{id=cassandra-sink-connector-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)
[2017-03-13 15:46:42,294] INFO Discovered coordinator ismop-virtual-machine:9092 (id: 2147483647 rack: null) for group connect-cassandra-sink-connector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:505)
[2017-03-13 15:46:42,302] INFO Revoking previously assigned partitions [] for group connect-cassandra-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
[2017-03-13 15:46:42,309] INFO (Re-)joining group connect-cassandra-sink-connector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
[2017-03-13 15:46:42,319] INFO Successfully joined group connect-cassandra-sink-connector with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
[2017-03-13 15:46:42,320] INFO Setting newly assigned partitions [hello-mqtt-kafka-0] for group connect-cassandra-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
[2017-03-13 15:46:43,337] INFO Reflections took 4909 ms to scan 64 urls, producing 3915 keys and 28184 values (org.reflections.Reflections:229)
[2017-03-13 15:46:50,462] INFO WorkerSinkTask{id=cassandra-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2017-03-13 15:47:00,460] INFO WorkerSinkTask{id=cassandra-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2017-03-13 15:47:10,455] INFO WorkerSinkTask{id=cassandra-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2017-03-13 15:47:20,455] INFO WorkerSinkTask{id=cassandra-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
【问题讨论】:
标签: cassandra apache-kafka apache-kafka-connect