【问题标题】:MassTransit store Message retry attempts in memory?MassTransit 在内存中存储消息重试尝试?
【发布时间】:2019-11-09 21:11:44
【问题描述】:

MassTransit 重试策略允许我们设置重试尝试。在消费者前端,我们可以通过调用

context.GetRetryAttempt()

但是当消费者应用程序重新启动时,它从 0 开始。 我需要它应该在服务器停止之前从它离开的地方开始。

作为 RabbitMQ 死信交换帮助我实现它。此处附有代码示例。我可以在 MassTransit 中做类似的事情吗?

public class Consumer<T> : IConsumer<T>
{
    private readonly ConsumerConfigOptions _consumerConfigOptions;
    private readonly RabbitMqPoc.Interface.IConnectionFactory _connectionFactory;
    public event ReceiveMessage<T> ReceiveMessageEvent;

    private AckType AckType = AckType.Accept;
    private bool IsRequeue = false;
    private const string RetryExchange = "RetryExchange";
    private const string RetryKeyName = "x-retries";

    public Consumer(ConsumerConfigOptions consumerConfigOptions,
        RabbitMqPoc.Interface.IConnectionFactory connectionFactory)
    {
        _connectionFactory = connectionFactory;
        _consumerConfigOptions = consumerConfigOptions;
    }

    public void Consume(string queue)
    {
        IModel channel = _connectionFactory.GetConnection().CreateModel();

        var consumer = new EventingBasicConsumer(channel);

        consumer.Received += (model, ea) =>
        {
            var header = ea.BasicProperties.Headers;
            byte[] body = ea.Body;

            try
            {
                //throw exception here
                CheckForAcknowledge(channel,ea);
            }
            catch (Exception exception)
            {
                Requeue(channel, ea, body);
            }
        };
        channel.BasicConsume(queue: queue,
                         autoAck: false,
                         consumer: consumer);
    }

    private void Requeue(IModel channel, BasicDeliverEventArgs ea, byte[] body)
    {
        var header = ea.BasicProperties.Headers;

        if (header.ContainsKey(RetryKeyName))
        {
            SetRetryCount(header, channel, ea, body);
        }
        else
        {
            RePublishOnDlx(channel, ea, body, 0);
        }
    }

    private void SetRetryCount(IDictionary<string, object> header, IModel channel, BasicDeliverEventArgs ea, byte[] body)
    {
        int currentRetryAttempt = (int)header[RetryKeyName] + 1;

        if (currentRetryAttempt >= _consumerConfigOptions.RetryAttempts)
        {
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        }
        else
        {
            RePublishOnDlx(channel, ea, body, currentRetryAttempt);
        }
    }

    private void RePublishOnDlx(IModel channel, BasicDeliverEventArgs eventArgs, byte[] body, int retryAttempt)
    {
        IBasicProperties basicProperties = channel.CreateBasicProperties();

        basicProperties.Headers = new Dictionary<string, object>
        {
            { RetryKeyName, retryAttempt }
        };

        channel.BasicNack(eventArgs.DeliveryTag, false, false);
        channel.BasicPublish(RetryExchange, string.Empty, basicProperties, body);
    }

    public void Acknowledge(AckType ackType = AckType.Accept, bool isRequeue = false)
    {
        AckType = ackType;
        IsRequeue = isRequeue;
    }

    private void CheckForAcknowledge(IModel channel, BasicDeliverEventArgs ea)
    {
        if (AckType == AckType.Accept)
        {
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        }
        else
        {
            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: IsRequeue);
        }
    }

【问题讨论】:

    标签: masstransit


    【解决方案1】:

    你是对的,MassTransit 中的UseMessageRetry 完全在内存中。如果重试次数用尽,消息将被移至 _error 队列。如果在重试挂起时进程退出,则消息将被 nack 并保留在队列中。当这种情况发生并且进程重新启动时,您是对的,重试策略从零开始 - 因为原始消息仍在队列中,并且无法修改标头。

    如果需要,您可以使用延迟交换 RabbitMQ 插件重新传递消息,以使用代理执行重试。一旦您在总线上配置了延迟交换插件和消息调度程序,UseScheduledRedelivery 过滤器就会执行此操作。

    cfg.UseDelayedExchangeMessageScheduler();
    

    【讨论】:

    • 谢谢你,@Charis Patterson。
    • 您好@Charis Patterson,RabbitMQ 延迟交换有限制,如此处所述-github.com/rabbitmq/rabbitmq-delayed-message-exchange。我们正在寻找另一种方法来获得当前的重试尝试,就像我可以使用 RabbitMQ 的死信交换一样?
    • 死信交换会立即写入队列,这意味着消息会立即被死信队列上的消费者读取,并且会立即重试。当您的 MassTransit 流程终止时,消息会在代理处得到确认,如果配置正确,代理会将这些消息移动到 DLQ。但是,MassTransit 无法在这种情况下向消息中添加任何标头,这一切都在 RMQ 的控制之下。
    • 同意,感谢您的快速回复。它对我有很大帮助:)
    猜你喜欢
    • 2013-03-29
    • 1970-01-01
    • 2022-01-03
    • 1970-01-01
    • 2017-01-08
    • 2021-11-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多