【问题标题】:Kafka (Confluent Platform) input for Logstash - broken message encodingLogstash 的 Kafka(Confluent 平台)输入 - 消息编码损坏
【发布时间】:2018-05-22 18:58:19
【问题描述】:

我有一个 Confluent 平台(版本 4.1.1)。它被配置为从数据库中读取数据。对此的配置是:

name = source-mysql-requests
connection.url = jdbc:mysql://localhost:3306/Requests
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
connection.user = ***
connection.password = ***
mode = incrementing
incrementing.column.name = ID
tasks.max = 5
topic.prefix = requests_
poll.interval.ms = 1000
batch.max.rows = 100
table.poll.interval.ms = 1000

我还有一个 Logstash(6.2.4 版)用于阅读相关的 Kafka 主题。这是它的配置:

kafka { 
    bootstrap_servers => "localhost:9092" 
    topics => ["requests_Operation"] 
    add_field => { "[@metadata][flag]" => "operation" }
}
output { 
 if [@metadata][flag] == "operation" { 
    stdout { 
        codec => rubydebug 
    } 
 } 
}

当我为测试运行“kafka-avro-console-consumer”时,我收到了这种类型的消息:

{"ID":388625154,"ISSUER_ID":"8e427b6b-1176-4d4a-8090-915fedcef870","SERVICE_ID":"mercury-g2b.service:1.4","OPERATION":"prepareOutcomingConsignmentRequest","STATUS":"COMPLETED","RECEIVE_REQUEST_DATE":1525381951000,"PRODUCE_RESULT_DATE":1525381951000}

但是在 Logstash 中我有一些可怕且难以理解的东西:

"\u0000\u0000\u0000\u0000\u0001����\u0002Hfdebfb95-218a-11e2-a69b-b499babae7ea.mercury-g2b.service:1.4DprepareOutcomingConsignmentRequest\u0012COMPLETED���X���X"

会出什么问题?

【问题讨论】:

  • 看起来 logstash 不理解 avro 格式?

标签: jdbc apache-kafka logstash confluent-platform


【解决方案1】:

例如,您可以通过将 value.converterkey.converter 的配置更改为使用 JSON 来将 Kafka Connect 更改为不使用 Avro。

否则,您需要 Logstash 知道如何解释 Schema Registry 编码的 Avro 数据并将其转换为人类可读的格式。

或者,您可以使用 Connect 的 Elasticsearch 或控制台接收器并完全跳过 Logstash,假设这是目标

您也可以使用 Connect SMT 替换 Logstash add_field : operation 配置

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-11-10
    • 2023-03-22
    • 2021-01-05
    • 2011-11-07
    • 2015-12-30
    • 2022-12-03
    • 1970-01-01
    相关资源
    最近更新 更多