【问题标题】:kafka connect Transform base64decode a fieldkafka connect 变换base64decode一个字段
【发布时间】:2020-03-30 14:52:30
【问题描述】:

我正在使用 kafka RMQ 源连接器从 RMQ 队列中获取数据。其中一个字段是base64encoded,它有一个json结构。我正在使用提取转换来提取此字段,但我不确定如何解码此字段,我尝试编写自己的 smt 来解码此字段,但当我解码此字段并尝试解码时出现错误java.lang.ClassCastException: [B cannot be cast to java.lang.String将其放入记录的更新值中,因为消息是 Json 结构。非常感谢您的帮助。

以下是我的连接器配置。

{
    "name" : "RabbitMQ_Source",
    "connector.class" : "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max" : "1",
    "kafka.topic" : "RMQ_Topic",
    "rabbitmq.queue" : "rmqqueue",
    "rabbitmq.username":"username",
    "rabbitmq.virtual.host":"dummy",
    "rabbitmq.password":"password",
    "rabbitmq.host":"x.x.x.x",
    "rabbitmq.port":"5674",
    "transforms": "ExtractField",
    "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.ExtractField.field":"body"
  }

下面是我正在尝试消费的队列中的消息,Body是解码后需要发送到Kafka主题的主要字段。如果我只使用提取转换,那么它工作正常,但我只能在 kafka 主题中看到编码消息。

{
   "consumerTag": "abcd",
   "envelope": {
      "deliveryTag": 1,
      "isRedeliver": false,
      "exchange": "rmqqueue",
      "routingKey": "rmqqueue"
   },
   "basicProperties": {
      "contentType": "text/plain",
      "contentEncoding": null,
      "headers": {},
      "deliveryMode": 2,
      "priority": 0,
      "correlationId": null,
      "replyTo": null,
      "expiration": null,
      "messageId": null,
      "timestamp": null,
      "type": null,
      "userId": null,
      "appId": null
   },
   "body": "eyJXSFMiOlt7IkNoYXJhY3RlciBTZXQiOiJVVEYtOCIsImFjdGlvbiI6InJld3JpdGUiLCJVcGRhdGUtRGF0ZS1UaW1lIjoiMjAyMC0wMy0yNSAwOTowMDowMjoxOSJ9XX0="
}

【问题讨论】:

    标签: apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    您需要使用ByteArrayConverter

    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
    

    here 有一个例子。

    【讨论】:

      猜你喜欢
      • 2020-11-27
      • 2020-11-01
      • 2018-01-26
      • 2020-08-14
      • 2020-11-26
      • 2019-05-29
      • 2019-09-21
      • 2020-11-20
      • 2020-08-05
      相关资源
      最近更新 更多