【问题标题】:How to avoid receiving messages multiple times from a ServcieBus Queue when using the WebJobs SDK使用 WebJobs SDK 时如何避免多次从 ServcieBus 队列接收消息
【发布时间】:2018-05-07 07:56:32
【问题描述】:

我有一个使用 WebJobs SDK 的带有以下 ServiceBus 处理程序的 WebJob:

[Singleton("{MessageId}")]
public static async Task HandleMessagesAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)
{
    using (var scope = Program.Container.BeginLifetimeScope())
    {
        var handler = scope.Resolve<MessageHandlers>();
        logger.WriteLine(AsInvariant($"Handling message with label {message.Label}"));

        // To avoid coupling Microsoft.Azure.WebJobs the return type is IEnumerable<T>
        var outputMessages = await handler.OnMessageAsync(message).ConfigureAwait(false);

        foreach (var outputMessage in outputMessages)
        {
            queue.Add(outputMessage);
        }
    }
}

如果未满足处理程序的先决条件,outputMessages 包含一个 BrokeredMessage,与我们当前正在处理的 MessageIdLabel 和有效负载相同,但它包含一个 ScheduledEnqueueTimeUtcin未来。

想法是我们快速完成对当前消息的处理,并通过调度未来的新消息等待重试。

有时,尤其是当队列中的消息多于 SDK peek-locks 时,我看到消息在 ServiceBus 队列中重复。它们具有相同的MessageIdLabel 和有效负载,但具有不同的SequenceNumberEnqueuedTimeUtcScheduledEnqueueTimeUtc。它们的交付计数均为 1。

查看我的处理程序代码,发生这种情况的唯一方法是,如果我多次收到相同的消息,我需要等待并创建一条新消息以供将来处理。处理程序成功完成,因此原始消息完成。

初始消息是唯一的。此外,我将SingletonAttribute 放在消息处理程序上,这样同一MessageId 的消息就不能被不同的处理程序使用。

为什么多个处理程序会使用相同的消息触发,我该如何防止这种情况发生?

我使用的是Microsoft.Azure.WebJobs 版本是v2.1.0

我的处理程序的持续时间最长为 17 秒,平均为 1 秒。锁定持续时间为 1m。仍然我最好的理论是消息(重新)锁定的东西不起作用,所以当我处理处理程序时,锁丢失了,消息返回队列并再次被消耗。如果两个处理程序都看到关键资源仍被占用,它们都会将新消息排入队列。

【问题讨论】:

  • 您的消息处理程序在做什么?对不起,但你做事的方式听起来很奇怪......
  • 我有几个不同的消息处理程序使用该模式。我无法透露他们到底在做什么。你认为究竟是什么听起来很有线?
  • 您会收到一条消息,然后您会在未来将新消息排入队列。不是很明白为什么。您不能在消息到达时对其进行处理吗?
  • 如果未满足处理程序的先决条件,我会将一条消息排入队列以供将来处理。假设我的系统中有两个实体需要在我用消息触发的操作开始之前完成,我需要检查这些先决条件的状态,如果它们没有完成,我需要稍等一下,直到我可以开始处理了。
  • 哦,好吧,现在说得通了 :-) 也许您一次只应该处理一条消息?

标签: azure-webjobs azure-webjobssdk


【解决方案1】:

经过一些试验,我找出了根本原因,并找到了解决方法。

如果一条ServiceBus消息完成了,但是peek锁没有被放弃,锁过期后会回到active状态的队列中。

ServiceBus QueueClient 显然会在收到下一条消息(或一批消息)后放弃锁定。

因此,如果 WebJobs SDK 使用的 QueueClient 意外终止(例如,由于进程结束或 Web App 重新启动),所有已锁定的消息都会重新出现在队列中,即使它们已经完成。

在我的处理程序中,我现在手动完成消息并像这样放弃锁定:

public static async Task ProcessQueueMessageAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)
{
    using (var scope = Program.Container.BeginLifetimeScope())
    {
        var handler = scope.Resolve<MessageHandlers>();
        logger.WriteLine(AsInvariant($"Handling message with label {message.Label}"));

        // To avoid coupling Microsoft.Azure.WebJobs the return type is IEnumerable<T>
        var outputMessages = await handler.OnMessageAsync(message).ConfigureAwait(false);

        foreach (var outputMessage in outputMessages)
        {
            queue.Add(outputMessage);
        }

        await message.CompleteAsync().ConfigureAwait(false);
        await message.AbandonAsync().ConfigureAwait(false);
    }
}

这样我就不会在重启场景中将消息返回到队列中。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-11-16
    • 1970-01-01
    • 1970-01-01
    • 2018-01-28
    • 1970-01-01
    相关资源
    最近更新 更多