【问题标题】:MassTransit: multiple consumers, separate queues/endpoints, messages not deliveredMassTransit:多个消费者、单独的队列/端点、未传递的消息
【发布时间】:2017-02-27 21:32:29
【问题描述】:

我设置了 2 个服务来接收相同的消息,例如ConsumerA : IConsumer<IMessageA>ConsumerB : IConsumer<IMessageA>。每个服务设置一个唯一的端点,例如queue_aqueue_b 并注册其消费者。我在 RabbitMQ 中看到了 IMessageA 的交换,它的类型为扇出并绑定到 queue_aqueue_b。到目前为止,一切都很好。

  • 我运行这两个服务并发布一条消息,但只有服务 A 得到它。
  • 我停止服务 A 并在 RabbitMQ 中手动向 B 发布消息(服务 A 是一个 Web 服务,它发布 IMessageA 以响应使用 IRequestClient<IMessageA, IMessageAResponse> 的 POST,这就是我需要手动发布的原因)现在服务B 获取消息并按预期使用它。

需要明确的是,在服务 A 停止的情况下,消息由 RabbitMQ 路由到 queue_a queue_b。如果服务 A 正在运行,则消息发送到 queue_b,尽管现有的交换绑定表明 queue_b 已绑定到 IMessageA 交换并且绝对应该得到他们。或者至少当我能够通过管理 Web UI 检查 RabbitMQ 时,没有证据表明曾经有消息传递到 queue_b(即 queue_b_errorqueue_b_skipped 中没有任何消息,后者甚至不存在)。

我已将 IReceiveObserver 添加到服务 A 和 B,但没有触发 ReceiveFaultConsumeFault

服务A中的消费者基本在做:

var result = await MethodThatReturnsIMessageAResponse(messageA);
context.Respond(result);

为什么服务 A 会干扰向服务 B 传递消息?我什至从哪里开始寻找?

【问题讨论】:

  • 你检查过绑定吗?这个东西总是有效的,这是一个基本功能。你有没有试过在这个A服务中抛出异常,看看消息是否进入错误队列?
  • 另外,您说如果服务 A 停止并且您发布消息,它们会进入队列。那么当你启动服务时这些消息会发生什么?它们从队列中消失,意味着它们被消耗了吗?
  • 服务 A 停止时,如果我在 RabbitMQ 中手动将消息直接发布到 IMessageA 交换器,它会同时传递到 queue_aqueue_b 并且服务 B 能够接收消息并使用它。如果我在那之后启动服务 A,它会接收消息并处理/使用它。只有在服务 A 运行时,消息才不会到达queue_b。您会在我的问题的第一段中看到我已经检查过并且绑定似乎是正确的,这就是为什么我对问题所在感到茫然。
  • 你能把这个放到 Github 上吗?
  • 很遗憾没有。我将尝试创建一个我可以的独立复制品,尽管我怀疑这会“正常工作”......

标签: rabbitmq masstransit


【解决方案1】:

问题是我正在“发布”消息:

c.Resolve<IBus>().CreateRequestClient<IToDoMessage, IToDoMessageResponse>(new Uri(QueueAddress + QueueName),TimeSpan.FromSeconds(5));

这需要一个特定的端点(例如队列名称)。相反,我需要使用CreatePublishRequestClient

c.Resolve<IBus>().CreatePublishRequestClient<IToDoMessage, IToDoMessageResponse>(TimeSpan.FromSeconds(5));

它使用总线发布,并通过交换,而不是特定的队列。 GitHub sample project 显示前者无济于事......

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-28
    • 2012-05-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多