【问题标题】:Kafka integration with RabbitMQKafka 与 RabbitMQ 的集成
【发布时间】:2019-08-16 13:45:37
【问题描述】:

我想整合Kafka和RabbitMQ,

我可以将消息发送到 RabbitMQ,但消息没有被使用。通道号状态在 RabbitMQ 控制台中显示 - 0。

将用户更改为管理员,但仍然是同样的问题。

通过 Confluent 部署了 Kafka(而不是单独部署 zookeeper Kafka 等),还通过 Confluent-hub 安装了 Rabbitmq 连接器。

请帮忙。

[2019-03-26 06:39:19,151] ERROR Consumer io.confluent.connect.rabbitmq.ConnectConsumer@454774b5 (amq.ctag-Unaj3jmbQQctolAwNzU2SQ) method handleDelivery for channel AMQChannel(amqp://guest@0:0:0:0:0:0:0:1:5672/,1) threw an exception for channel AMQChannel(amqp://guest@0:0:0:0:0:0:0:1:5672/,1) (com.rabbitmq.client.impl.ForgivingExceptionHandler:124)
java.lang.NullPointerException
at io.confluent.connect.rabbitmq.MessageConverter.basicProperties(MessageConverter.java:127)
at io.confluent.connect.rabbitmq.SourceRecordBuilder.sourceRecord(SourceRecordBuilder.java:40)
at io.confluent.connect.rabbitmq.ConnectConsumer.handleDelivery(ConnectConsumer.java:69)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

    标签: apache-kafka rabbitmq confluent-platform


    【解决方案1】:

    我在评估 RabbitMQ Kafka 连接器时遇到了同样的错误。

    AMQP 消息由两部分组成。标头和消息内容。我只提供消息内容而不是标题。

    您必须使用元数据创建一个 BasicProperties 对象。 Java Docs可以查看here

    然后,当您使用 basicPublish 方法发布到 RabbitMQ 时,使用您之前创建的 BasicProperties 对象作为参数之一。

    basicPublish 的 Java 文档可以在 here 找到。

    希望对你有帮助

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-31
      • 2017-01-01
      • 1970-01-01
      • 2021-05-15
      • 2012-05-08
      • 1970-01-01
      • 2017-04-02
      • 2018-03-07
      相关资源
      最近更新 更多