【问题标题】:Clickhouse can't get messages from Kafka with TabSeparated formattingClickhouse 无法使用 TabSeparated 格式从 Kafka 获取消息
【发布时间】:2020-11-27 17:40:55
【问题描述】:

我从我的 Spring Boot 应用程序向 Kafka 发送消息

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("uniqTopic123", "testKey", "Test\tTest");
future.addCallback(
        (v) -> System.out.println("SUCCESS: " + v),
        (v) -> System.out.println("FAIL: " + v)
);
kafkaTemplate.flush();

application.properties

spring.kafka.consumer.group-id=app.1
kafka.server=<kafka_host>:9092
kafka.producer.id=kafkaProducerId

配置

@Value("${kafka.server}")
private String kafkaServer;

@Value("${kafka.producer.id}")
private String kafkaProducerId;

@Value("${spring.kafka.consumer.group-id}")
private String kafkaGroupId;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    //props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerId);
    return props;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<String, String>(
            new DefaultKafkaProducerFactory<>(producerConfigs()));
}

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);

    return new DefaultKafkaConsumerFactory<>(props);
}

在日志中我可以看到如下消息:

成功:SendResult [producerRecord=ProducerRecord(topic=uniqTopic123, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=testKey, value=Test Test, timestamp=null), recordMetadata=uniqTopic123-0@1]

但我的听众没有收到任何消息

@KafkaListener(topics="uniqTopic123")
public void msgListener(ConsumerRecord<String, String> record){
    System.out.println("test ======> " + record.value());
}

而且 ClickHouse 中的表格是空的。我的 ClickHouse 表

CREATE TABLE IF NOT EXISTS test (key String, message String)
ENGINE = Kafka('<kafka_host>:9092', 'uniqTopic123', 'app.1', 'TabSeparated');

CREATE TABLE IF NOT EXISTS test_table (key String, message String)
ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO test_table AS
SELECT key, message FROM test;

我的代码有什么问题?

UPD.: Kafka Tool 显示消息在 Kafka

UPD错误是消息“Test\tTest\n”末尾缺少换行

【问题讨论】:

  • 首先我会确保该主题存在并包含一些事件:检查它可以使用Kafka Tool 或 cli 命令 docker run confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server kafka:9092 --topic foo --from-beginning --max-messages 5.
  • 是的,我使用的是 Kafka 工具。它表明 tyhe 消息在 Kafka 中。请在问题末尾查看我的更新。
  • 尝试在消息末尾添加换行符 - "Test\tTest\n" 并再次测试。查看日志cat /var/log/clickhouse-server/clickhouse-server.err.log
  • 换行帮助了!谢谢!
  • 没问题,我很乐意帮助你;)

标签: java spring-boot apache-kafka clickhouse


【解决方案1】:

错误是消息“Test\tTest\n”末尾缺少换行

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-06-12
    • 2013-12-23
    • 2017-10-27
    • 2021-04-13
    • 2015-12-29
    • 2020-12-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多