【问题标题】:ElasticsearchSinkConnector Failed to deserialize data to AvroElasticsearchSinkConnector 无法将数据反序列化到 Avro
【发布时间】:2018-10-15 09:40:53
【问题描述】:

我创建了最简单的 kafka sink 连接器配置,我使用的是 confluent 4.1.0:

{
  "connector.class": 
  "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "type.name": "test-type",
  "tasks.max": "1",
  "topics": "dialogs",
  "name": "elasticsearch-sink",
  "key.ignore": "true",
  "connection.url": "http://localhost:9200",
  "schema.ignore": "true"
}

在主题中,我将消息保存在 JSON

{ "topics": "resd"}

但结果我得到一个错误:

原因:org.apache.kafka.common.errors.SerializationException: 为 id -1 反序列化 Avro 消息时出错 原因:org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

【问题讨论】:

    标签: apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    正如 cricket_007 所说,您需要告诉 Connect 使用 Json 反序列化器,如果这是您的数据格式。将其添加到您的连接器配置中:

    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false"
    

    【讨论】:

      【解决方案2】:

      发生该错误是因为它试图读取非 Confluent Schema Registry 编码的 Avro 消息。

      如果主题数据是Avro,则需要使用Schema Registry。

      否则,如果主题数据是 JSON,那么您已经在属性文件中的键或值上使用 AvroConverter 启动了连接集群,您需要使用 JsonConverter 来代替

      【讨论】:

      • 谢谢。你是对的。没有好的文档的confluence问题
      • 你的意思是“融合”? Connect 转换器在 Github 和网站上都有记录。
      猜你喜欢
      • 1970-01-01
      • 2020-03-04
      • 1970-01-01
      • 1970-01-01
      • 2020-12-13
      • 1970-01-01
      • 2021-05-09
      • 2021-03-14
      • 2021-06-09
      相关资源
      最近更新 更多