【问题标题】:Sending from Logastash to Kafka in with Avro使用 Avro 从 Logastash 发送到 Kafka
【发布时间】:2020-08-05 06:36:54
【问题描述】:

我正在尝试使用 avro 架构将数据从 logstash 发送到 kafka。

我的 logstash 输出如下所示:

kafka{
  codec => avro {
    schema_uri => "/tmp/avro/hadoop.avsc"
  }
  topic_id => "hadoop_log_processed"
}

我的架构文件如下所示:

{"type": "record",
 "name": "hadoop_schema",
 "fields": [
     {"name": "loglevel", "type": "string"},
     {"name": "error_msg",  "type": "string"},
     {"name": "syslog", "type": ["string", "null"]},
     {"name": "javaclass", "type": ["string", "null"]}
 ]
}

kafka-console-consumer 的输出:

CElORk+gAURvd24gdG8gdGhlIGxhc3QgbWVyZ2UtcGCzcywgd2l0aCA3IHNlZ21lbnRzIGxlZnQgb2YgdG90YWwgc256ZTogMjI4NDI0NDM5IGJ5dGVzAAxbbWFpbl0APm9yZy5hcGFjaGUuaGFkb29wLm1hcHJlZC5NZXJnZXI=
CElORk9kVGFzayAnYXR0ZW1wdF8xNDQ1JDg3NDkxNDQ1XzAwMDFfbV8wMDAwMDRfMCcgZG9uZS4ADFttYWluXQA6t3JnLmFwYWNoZS5oYWRvb6AubWFwcmVkLlRhc2s=
CElORk9kVGFzayAnYXR0ZW1wdF8xNDQ1JDg3NDkxNDQ1XzAwMDFfbV8wMDAwMDRfMCcgZG9uZS4ADFttYWluXQA6t3JnLmFwYWNoZS5oYWRvb6AubWFwcmVkLlRhc2s=
CElORk9OVGFza0hlYAJ0YmVhdEhhbmRsZXIgdGhyZWFkIGludGVycnVwdGVkAERbVGFza0hlYXJdYmVhdEhhbmRsZXIgUGluZ0NoZWNrZXJdAG5vcmcuYVBhY2hlLmhhZG9vcC5tYXByZWR1Y2UudjIuYXBwLlRhc2tIZWFydGJ3YXRIYW5kbGVy

我的连接器中也出现以下错误:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hadoop_log_processed to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我知道我在 logstash 网站上对数据进行了编码。在kafka的输入过程中我是否必须解码消息,或者我可以在连接器配置中解码/反序列化数据吗?

有没有办法在logstash 网站上禁用编码?我读到了一个 base64_encoding 选项,但似乎没有这个选项。

【问题讨论】:

    标签: apache-kafka logstash avro apache-kafka-connect confluent-platform


    【解决方案1】:

    您遇到的问题是 Logstash 的 Avro 编解码器没有将数据序列化为 Confluent Schema Registry Avro 反序列化器期望的 Avro 形式。

    虽然 Logstash 采用 avsc 并将数据编码为基于此的二进制形式,但 Confluent Schema Registry [de]serializer 直接从注册表(不是avsc 文件)存储和检索模式。

    因此,当您收到 Failed to deserialize data … SerializationException: Unknown magic byte! 时,这是 Avro 反序列化器,它说它无法将数据识别为使用 Schema Registry 序列化器序列化的 Avro。

    我快速搜索了一下 Google,发现 this codec 看起来支持模式注册表(因此支持 Kafka Connect,以及任何其他以这种方式反序列化 Avro 数据的消费者)。

    或者,将您的数据以 JSON 格式写入 Kafka,并使用 Kafka Connect 中的 org.apache.kafka.connect.json.JsonConverter 从主题中读取数据。

    参考:

    【讨论】:

    • 现在看起来不错。关于 Avro 的事情稍微清楚一点。但现在我收到另一个错误: 由:org.apache.kafka.common.errors.SerializationException:检索 id 6 的 Avro 模式版本时出错 由:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:未找到主题。错误代码:40401 我用谷歌搜索了很多,但我找不到答案。我可以使用 avro 消费者读取由 logstash 生成的消息,但我的接收器收到上述错误。我应该为这个问题打开一个新的 Stackoverflow 问题吗?
    • 我醒来,我的数据库已满,我的连接器状态为“正在运行”:S Kafka 只是喜欢让我感到困惑 xD 但最棒的是我有一个工作环境......
    • 难道是我通过控制中心向我用于连接器的主题添加了架构?
    • 您需要确保您的消费者(这里是 Kafka Connect)使用与您的生产者(Logstash)相同的模式注册表。除此之外,打开一个包含更多细节的新问题,我们可以在那里解决:)
    • 谢谢!我会先尝试重建环境,看看会遇到什么障碍……:)
    猜你喜欢
    • 2021-11-25
    • 2020-05-03
    • 1970-01-01
    • 2019-01-10
    • 1970-01-01
    • 1970-01-01
    • 2019-01-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多