【问题标题】:Data pipeline 'MSSQL -> Kafka -> CH' not work数据管道“MSSQL -> Kafka -> CH”不起作用
【发布时间】:2020-09-07 20:04:51
【问题描述】:

我在 Kafka 中创建了与 SQL Server 的 JDBC 连接,并且数据已成功加载到主题中。

/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my-testsql-topic --from-beginning

如何在 Clickhouse 中阅读此主题?

我用引擎 Kafka(关于这个主题)和物化视图创建了一个新表,但没有成功。

CH 脚本:

CREATE TABLE default.test_topic (Id UInt32, Name FixedString(100)) 
ENGINE = Kafka 
SETTINGS 
    kafka_broker_list = localhost:9092, 
    kafka_topic_list = my-testsql-topic, 
    kafka_group_name = test-consumer-group, 
    kafka_format = JSONEachRow, 
    kafka_skip_broken_messages = 99999 

CREATE TABLE default.test_topic_hist (Id UInt32, Name FixedString(100)) 
ENGINE = MergeTree ORDER BY Id SETTINGS index_granularity = 8192 

CREATE MATERIALIZED VIEW default.load_test_topic_hist TO default.test_topic_hist (Id UInt32, Name FixedString(100)) AS 
SELECT Id, Name FROM default.test_topic

描述组:

GROUP                TOPIC              PARTITION    CURRENT-OFFSET   LOG-END-OFFSET  LAG 
test-consumer-group  my-testsql-topic   0            -                0               - 

clickhouse-server.log:

2020.05.21 12:07:35.704680 [ 11942 ] {} <Trace> StorageKafka (test_topic): Already subscribed to topics: [ my-testsql-topic ] 
2020.05.21 12:07:35.704697 [ 11942 ] {} <Trace> StorageKafka (test_topic): Already assigned to : [ my-testsql-topic[0:#] ]    
2020.05.21 12:22:36.898540 [ 11946 ] {} <Trace> StorageKafka (test_topic): Stalled 
2020.05.21 12:22:36.898729 [ 11946 ] {} <Trace> StorageKafka (test_topic): Polled offset INVALID (topic: my-testsql-topic, partition: 0) 
2020.05.21 12:22:36.898741 [ 11946 ] {} <Trace> StorageKafka (test_topic): Nothing to commit. 
2020.05.21 12:22:36.899433 [ 11946 ] {} <Trace> StorageKafka (test_topic): Committed offset INVALID (topic: my-testsql-topic, partition: 0) 
2020.05.21 12:22:36.899504 [ 11946 ] {} <Trace> StorageKafka (test_topic): Execution took 501 ms.

【问题讨论】:

  • 这能回答你的问题吗? Using kafka to produce data for clickhouse
  • 1) 您能否提供用于 CH 中的 MV 和 Kafka-table 的 SQL 脚本。 2) 检查错误日志 - /var/log/clickhouse-server/clickhouse-server.err.log 并提供相关错误的描述。 3)可能的原因是消费者(使用Kafka引擎)的偏移量设置为最新位置;尝试向 MSQL 插入新行并检查相关的 CH 表
  • 您确定需要 Kafka 吗? CH 支持以ODBC 作为JDBC 数据源的连接。
  • CH 脚本:CREATE TABLE default.test_topic (Id UInt32, Name FixedString(100)) ENGINE = Kafka SETTINGS kafka_broker_list = localhost:9092, kafka_topic_list = my-testsql-topic, kafka_group_name = test-consumer-group, kafka_format = JSONEachRow, kafka_skip_broken_messages = 99999 CREATE TABLE default.test_topic_hist (Id UInt32, Name FixedString(100)) ENGINE = MergeTree ORDER BY Id SETTINGS index_granularity = 8192 CREATE MATERIALIZED VIEW default.load_test_topic_hist TO default.test_topic_hist (Id UInt32, Name FixedString(100)) AS SELECT Id, Name FROM default.test_topic
  • 描述组:GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG test-consumer-group my-testsql-topic 0 - 0 -

标签: sql-server jdbc apache-kafka clickhouse


【解决方案1】:

在 jdbc 配置中需要添加:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-08-26
    • 2022-11-10
    • 2016-12-30
    • 2018-02-02
    • 2017-05-28
    • 2017-02-23
    • 2011-09-09
    相关资源
    最近更新 更多