【问题标题】:Kafka RabbitMQ connector - can only get byte arraysKafka RabbitMQ 连接器 - 只能获取字节数组
【发布时间】:2020-01-09 04:29:57
【问题描述】:

我已经设置了一个连接器来从 RabbitMQ 队列中提取并推送到 Kafka 主题中。连接器运行,队列清空。但是当我使用 kafka-console-consumer 或 kafkacat 查看主题时,每个条目看起来都像一个字节数组 - [B@xxxxxxxx

RabbitMQ 消息负载都是 JSON。我需要做什么才能从 Kafka 中取出 JSON?我已经尝试过 value.converter=org.apache.kafka.connect.storage.StringConverter 以及将 ByteArrayDeserializer 与控制台消费者一起使用。

connect-standalone.properties:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/home/robbie/kafka/plugins

RabbitMQSourceConnector.properties:

name=rabbitmq
tasks.max=1
connector.class=io.confluent.connect.rabbitmq.RabbitMQSourceConnector
rabbitmq.prefetch.count=500
rabbitmq.automatic.recovery.enabled=false
rabbitmq.network.recovery.interval.ms=10000
rabbitmq.topology.recovery.enabled=true
rabbitmq.queue=test1
rabbitmq.username=testuser1
rabbitmq.password=xxxxxxxxxxxxxxx
rabbitmq.host=rmqhost
rabbitmq.port=5672
kafka.topic=rabbitmq.test1

【问题讨论】:

  • 你试过使用 JSONConverter 吗?
  • 是的。 JSONConverter 甚至不起作用。我也找不到解决方案。它导致:org.apache.kafka.connect.errors.DataException:转换错误:必填且没有默认值的字段的空值
  • 你能显示完整的连接器配置属性吗?
  • 已添加连接器配置文件。
  • 这个人似乎没有得到字节数组stackoverflow.com/q/59632068/2308683也许你应该尝试使用AvroConverter

标签: apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

你需要设置

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

我就在今天写了a blog :)

【讨论】:

  • 我会发誓我尝试过,但没有成功。但我又试了一次,现在一切都很棒。我不太明白为什么这是正确的选择,但是我这周开始使用 Kafka,还有很长的路要走。谢谢!
猜你喜欢
  • 2012-07-20
  • 2021-06-03
  • 2019-05-18
  • 1970-01-01
  • 2021-11-07
  • 2021-07-26
  • 2016-01-19
  • 2018-03-05
相关资源
最近更新 更多