【发布时间】:2020-09-07 20:04:51
【问题描述】:
我在 Kafka 中创建了与 SQL Server 的 JDBC 连接,并且数据已成功加载到主题中。
/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my-testsql-topic --from-beginning
如何在 Clickhouse 中阅读此主题?
我用引擎 Kafka(关于这个主题)和物化视图创建了一个新表,但没有成功。
CH 脚本:
CREATE TABLE default.test_topic (Id UInt32, Name FixedString(100))
ENGINE = Kafka
SETTINGS
kafka_broker_list = localhost:9092,
kafka_topic_list = my-testsql-topic,
kafka_group_name = test-consumer-group,
kafka_format = JSONEachRow,
kafka_skip_broken_messages = 99999
CREATE TABLE default.test_topic_hist (Id UInt32, Name FixedString(100))
ENGINE = MergeTree ORDER BY Id SETTINGS index_granularity = 8192
CREATE MATERIALIZED VIEW default.load_test_topic_hist TO default.test_topic_hist (Id UInt32, Name FixedString(100)) AS
SELECT Id, Name FROM default.test_topic
描述组:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
test-consumer-group my-testsql-topic 0 - 0 -
clickhouse-server.log:
2020.05.21 12:07:35.704680 [ 11942 ] {} <Trace> StorageKafka (test_topic): Already subscribed to topics: [ my-testsql-topic ]
2020.05.21 12:07:35.704697 [ 11942 ] {} <Trace> StorageKafka (test_topic): Already assigned to : [ my-testsql-topic[0:#] ]
2020.05.21 12:22:36.898540 [ 11946 ] {} <Trace> StorageKafka (test_topic): Stalled
2020.05.21 12:22:36.898729 [ 11946 ] {} <Trace> StorageKafka (test_topic): Polled offset INVALID (topic: my-testsql-topic, partition: 0)
2020.05.21 12:22:36.898741 [ 11946 ] {} <Trace> StorageKafka (test_topic): Nothing to commit.
2020.05.21 12:22:36.899433 [ 11946 ] {} <Trace> StorageKafka (test_topic): Committed offset INVALID (topic: my-testsql-topic, partition: 0)
2020.05.21 12:22:36.899504 [ 11946 ] {} <Trace> StorageKafka (test_topic): Execution took 501 ms.
【问题讨论】:
-
这能回答你的问题吗? Using kafka to produce data for clickhouse
-
1) 您能否提供用于 CH 中的 MV 和 Kafka-table 的 SQL 脚本。 2) 检查错误日志 - /var/log/clickhouse-server/clickhouse-server.err.log 并提供相关错误的描述。 3)可能的原因是消费者(使用Kafka引擎)的偏移量设置为最新位置;尝试向 MSQL 插入新行并检查相关的 CH 表
-
CH 脚本:
CREATE TABLE default.test_topic (Id UInt32, Name FixedString(100)) ENGINE = Kafka SETTINGS kafka_broker_list = localhost:9092, kafka_topic_list = my-testsql-topic, kafka_group_name = test-consumer-group, kafka_format = JSONEachRow, kafka_skip_broken_messages = 99999 CREATE TABLE default.test_topic_hist (Id UInt32, Name FixedString(100)) ENGINE = MergeTree ORDER BY Id SETTINGS index_granularity = 8192 CREATE MATERIALIZED VIEW default.load_test_topic_hist TO default.test_topic_hist (Id UInt32, Name FixedString(100)) AS SELECT Id, Name FROM default.test_topic -
描述组:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG test-consumer-group my-testsql-topic 0 - 0 -
标签: sql-server jdbc apache-kafka clickhouse