【问题标题】:Masstransit consumer death - Peek()大众运输消费者死亡 - Peek()
【发布时间】:2012-12-08 16:10:22
【问题描述】:

我很好奇 MassTransit 消费者是否可以在实际检索 msg 之前 Peek() MSMQ 队列。

步骤/过程是什么:

1) 消息发送到队列

2) 消费者得到它并且必须进行数据库更新——大约需要 5 秒

3) 如果第一轮成功,消费者必须进行第二轮更新。

我的问题是,如果第一次数据库更新失败,消息留在队列中(即网络问题,无法到达数据库),我该如何处理。

目前,一旦它从队列中读取消息,它就会将其删除,然后如果数据库更新失败,它就会消失..

此外,我该如何处理电源故障 - 我的意思是,如果消费者的“工作”进行到一半,无论是什么(数据库更新或其他)并且电源消失等,我该如何重新运行队列中味精上的进程?可以说这项工作(无论如何在我目前的情况下)正在将新行推送到表中。我的意思是我可以编写代码首先检查该行是否存在,如果存在则删除消息,如果没有则运行任务,但我怎样才能让它首先重新运行整个过程?

我已经读到我可以Peek() 队列然后运行任务,然后真正读取队列消息并将其删除,但我终生无法弄清楚这是否适用于公共交通.. . 有点失落...

此外,我知道 Masstransit 有 .RetryLater,但我是否在此过程中使用它?是Initially --> When --> Then --> .RetryLater 吗??

任何指针都会被应用

最诚挚的问候 罗宾

编辑

PS:我用的是saga....

Define(() =>
            {
                RemoveWhen(saga => saga.CurrentState == Completed);

                Initially(
                    When(NewAC)
                        .Then((saga, message) => saga.ProcessPSM(message),
                            InCaseOf<Exception>()
                               .TransitionTo(Problem)                                  
                                )
                        .Then((saga, message) => saga.PostProcessPSM())
                        .Complete()
                    );
                During(Problem,
                    When(Waiting)
                               // NOTE: THIS DOES NOT WORK!!!!
                        .RetryLater()
                    );
                });

RetryLater 会抛出以下错误: "现有的 saga 无法接受该消息"

我不确定我还能如何访问“RetryLater”。

【问题讨论】:

    标签: masstransit


    【解决方案1】:

    MassTransit 抽象了底层队列的概念。所以 Peek 不是解决方案, 但它确实有其他重试消息的方法。如果您只对处理错误和故障情况感兴趣,那么以下机制就足够了。

    默认情况下,如果消费者抛出异常,消息将被重试 N 次:

    • 其中 N 在总线上配置,默认为 5。可以在 使用 ServiceBusConfigurator 上的 SetDefaultRetryLimit 进行总线初始化
    • 重试意味着消息将被添加到队列的末尾

    如果您想要更细粒度的错误处理方法,您可以实现上下文使用者,捕获可恢复或暂时性异常并手动调用 RetryLater。据我了解,这可以做多少次是没有限制的。

    public class RetryConsumer : Consumes<AwesomeMessage>.Context
    {
    
        public void Consume(IConsumeContext<AwesomeMessage> message)
        {
            try
            {
                Console.WriteLine("This is Attempt " + message.RetryCount);
                // Do Something
            }
            catch (SomeTransientException e)
            {
                message.RetryLater();
            }
        }
    }
    

    【讨论】:

    • 谢谢伙计。我将在今天的测试中尝试一下。然而,在一种情况下,我的问题仍然存在。如果中途断电怎么办?消息不再在队列中。所以它丢失了是吗?因为消费者永远不会到达message.retrylater()... 有没有办法处理挂起的进程或类似的完全失败,以便在这些情况下稍后重试消息?或者我错过了上面的东西?谢谢!
    • 您始终可以将事务队列与 MSMQ 一起使用,这样可以保证消息在断电情况下不会丢失。 URI 上的 ?tx=true 可让您到达那里。
    • @ChrisPatterson 是的,我已经在使用它了。我不认为这涵盖了阅读消息的场景。我认为这只是保证,如果你“添加”到队列中,它可以保证进入队列,然后因为可恢复设置为真,所以持久化。但是事务设置是否也确保当我从队列中读取消息时,读取​​它的进程完成其工作?我认为不是还是...?
    • 不知道MSMQ & MT 怎么玩,但我认为rabbitmq & MT 自动解决了停电问题。如果消费者未能完成,则消息不会被确认,并将保留在队列中
    • 使用 MSMQ 上的 TX 队列,如果事务未提交,则消息仍保留在队列中,即使在断电时也是如此。你很厉害。传输在事务范围内读取消息,因此任何超出范围(包括抛出异常)的内容都将回滚 tx 并将消息留在队列的前面。
    猜你喜欢
    • 1970-01-01
    • 2015-04-01
    • 2016-06-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-03-06
    • 2011-05-04
    相关资源
    最近更新 更多