【问题标题】:Is there a way to specify the wait time of retrying a message?有没有办法指定重试消息的等待时间?
【发布时间】:2014-08-12 21:57:33
【问题描述】:

有没有办法指定针对特定异常重试消息的等待时间?

例如如果对象处于 SomethingInProgress 状态,则抛出 SomethignInProgressException 并且我希望消息在 40m 后重试。还是引发 SomethingInProgressEvent 并使用 bus.defer 更合适?

【问题讨论】:

    标签: rebus


    【解决方案1】:

    这就是 Rebus 没有 second-level retries 概念的部分原因 - 我根本没有看到任何可以以通用且仍然足够灵活的方式创建此函数的方法。

    简短回答您的问题:不,没有(内置)方法可以改变特定异常重试之间的时间。事实上,没有办法在重试之间配置等待时间根本 - 失败的消息将尽可能快地重试,然后如果它们继续失败则移动到错误队列以避免“阻塞管道”。

    在你的情况下,我建议你这样做:

    public void Handle(MyMessage message) {
        var headers = MessageContext.GetCurrent().Headers;
        var deliveryAttempt = headers.ContainsKey("attempt_no") 
            ? Convert.ToInt(headers["attempt_no"]) 
            : 0;
    
        try {
            DoWhateverWithThe(message);
        } catch(OneKindOfException e) {
            if (deliveryAttempt > 5) {
                bus.Advanced.Routing.ForwardCurrentMessage("error");
                return;
            }
    
            bus.AttachHeader(message, "attempt_no", deliveryAttempt + 1);
            bus.Defer(TimeSpan.FromSeconds(20), message);
        } catch(AnotherKindOfException e) {
            if (deliveryAttempt > 5) {
                bus.Advanced.Routing.ForwardCurrentMessage("error");
                return;
            }
    
            bus.AttachHeader(message, "attempt_no", deliveryAttempt + 1);
            bus.Defer(TimeSpan.FromMinutes(2), message);
        }
    }
    

    我只是在没有 100% 确定它确实可以编译的情况下将其从头顶上写下来......但它的要点是我们跟踪我们在消息的自定义标头中进行了多少次传递尝试, bus.Deferring 消息为每次失败的传递尝试设置适当的时间跨度,当超过我们的最大传递尝试次数时立即将消息转发到错误队列。

    我希望这是有道理的:)

    【讨论】:

    • 自发布此答案以来,已删除 AttachHeader 方法。新版 Rebus 中的 headers 是如何设置的?
    • IBus 上发送消息的所有方法现在都接受可选参数 optionalHeaders - 例如喜欢await bus.Send(message, new Dictionary<string, string>{{"custom-header", "wut"}})
    【解决方案2】:

    最近的一个例子是:

    public async Task Handle(IFailed<MyMessage> message)
    {
        var maxAttempts = 10;
        var optionalHeaders = new Dictionary<string, string>();
        if (message.Headers != null && message.Headers.ContainsKey("attemptNumber"))
        {
            // increment the attempt number
            var attemptNumber = int.Parse(message.Headers["attemptNumber"]);
            attemptNumber++;
            optionalHeaders.Add("attemptNumber", attemptNumber.ToString());
            if (attemptNumber > maxAttempts)
            {
                // log I give up message, message will move to dead queue
                return;
            }
        }
        else
            optionalHeaders.Add("attemptNumber", "1");
    
        // if message failed to process, defer processing for 5 minutes and try again
        await Bus.Defer(TimeSpan.FromMinutes(5), message.Message, optionalHeaders);
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-06-13
      • 1970-01-01
      • 1970-01-01
      • 2019-11-30
      • 1970-01-01
      • 2022-11-03
      • 1970-01-01
      • 2020-04-29
      相关资源
      最近更新 更多