【问题标题】:MassTransit fault handling configurationMassTransit 故障处理配置
【发布时间】:2018-07-13 12:04:20
【问题描述】:

在我的应用程序中配置故障使用者时遇到问题。问题是消费的消息被传递到*_error_skipped 队列并且它并没有完全消失。

下面是一个非常简单的例子。客户端应用收到失败的消息并从 test_error 队列中消失,但它仍然存在于 test_error_skipped 队列中。

服务项目

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.Util;

namespace MassTransitTest.Service
{

    public class RequestModel
    {
        public DateTime RequestTime { get; set; }
    }

    class MassTransitService : IDisposable
    {
        private readonly IBusControl _busControl;
        public MassTransitService()
        {
            _busControl = Bus.Factory.CreateUsingRabbitMq(configure =>
            {
                var host = configure.Host(new Uri("rabbitmq://localhost/mt_test"), h =>
                {
                    h.Username("guest");
                    h.Password("guest");
                });
                configure.ReceiveEndpoint(host, "test", c =>
                {
                    c.UseRetry(r => r.None());
                    c.Consumer<RequestConsumer>();
                });
            });
            TaskUtil.Await(_busControl.StartAsync());
            Console.WriteLine("bus started");
        }

        public void Dispose()
        {
            _busControl?.StopAsync().Wait();
        }
    }

    class RequestConsumer : IConsumer<RequestModel>
    {
        public Task Consume(ConsumeContext<RequestModel> context)
        {
            Console.WriteLine($"request with message id {context.MessageId} received: {context.Message.RequestTime}");
            throw new NotImplementedException();
        }
    }
}

客户项目

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.Util;
using MassTransitTest.Service;

namespace MassTransitTest.Client
{
    class MassTransitClient
    {
        private readonly IBusControl _busControl;

        public MassTransitClient()
        {
            _busControl = Bus.Factory.CreateUsingRabbitMq(configure =>
            {
                var host = configure.Host(new Uri("rabbitmq://localhost/mt_test"), h =>
                {
                    h.Username("guest");
                    h.Password("guest");
                }); enter code here
                configure.ReceiveEndpoint(host, "test_error", c =>
                {
                    c.Consumer<ErrorConsumer>();
                });
            });
            TaskUtil.Await(_busControl.StartAsync());
            Console.WriteLine("bus started");
        }

        public async Task Send()
        {
            Console.WriteLine("sending request");

            await (await _busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/mt_test/test"))).Send(new RequestModel()
            {
                RequestTime = DateTime.Now
            });
            Console.WriteLine("request sent");

        }
    }
    class ErrorConsumer : IConsumer<Fault<RequestModel>>
    {
        public Task Consume(ConsumeContext<Fault<RequestModel>> context)
        {
            Console.WriteLine($"request with message id {context.Message.FaultedMessageId} failed. requested time: {context.Message.Message.RequestTime}");
            return Task.CompletedTask;
        }
    }
}

我正在使用 .net core 2.1.1 和 MassTransit 5.1.3

【问题讨论】:

    标签: .net-core rabbitmq masstransit


    【解决方案1】:

    首先要回答你的问题,它有几个部分:

    当消费者抛出异常时,MassTransit 接收端点将消息移动到_error 队列。不建议在 _error 队列上创建接收端点,也不应该这样做。

    如果您只是想观察消费者是否发生故障,您可以创建一个单独的接收端点(例如 fault-queue)并注册您的Fault&lt;T&gt; 消费者。 MassTransit 将发布实现Fault&lt;T&gt; 的消息,代理将通过接收端点将其路由到您的消费者。

    但是,根据您上面的示例,您正在发送请求并希望客户端知道是否发生了故障。为此,我建议使用请求客户端 - 它设置消息标头以将故障返回给请求发起者。它还允许发送响应。如果您不想等待响应,或者等待查看故障是否发生,上述故障观察器是您的最佳选择。

    您可以在documentation看到如何使用请求客户端。

    【讨论】:

    • 我正在寻找一些关于如何使用 MT 正确使用卡在_error 队列中的消息的文档。我已经尝试使用 MT 明确地针对这些队列,但它故意将它们移动到 _error_skipped 队列中。我的观点是成功地从一种不能正确处理错误的拓扑迁移到能够正确处理错误的拓扑,同时优雅地处理卡住的消息。
    • 虽然您可以使用来自错误队列的消息,但并不强烈推荐。最好使用工具将这些消息移动到不同的队列中进行重新处理(有时是原始队列,但应注意避免消息被移动太多次)。此外,如果您收到跳过的消息,则不会使用错误队列中的消息类型。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-04-29
    • 1970-01-01
    • 1970-01-01
    • 2016-08-30
    • 1970-01-01
    • 2018-01-23
    相关资源
    最近更新 更多