【发布时间】:2018-05-07 07:56:32
【问题描述】:
我有一个使用 WebJobs SDK 的带有以下 ServiceBus 处理程序的 WebJob:
[Singleton("{MessageId}")]
public static async Task HandleMessagesAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)
{
using (var scope = Program.Container.BeginLifetimeScope())
{
var handler = scope.Resolve<MessageHandlers>();
logger.WriteLine(AsInvariant($"Handling message with label {message.Label}"));
// To avoid coupling Microsoft.Azure.WebJobs the return type is IEnumerable<T>
var outputMessages = await handler.OnMessageAsync(message).ConfigureAwait(false);
foreach (var outputMessage in outputMessages)
{
queue.Add(outputMessage);
}
}
}
如果未满足处理程序的先决条件,outputMessages 包含一个 BrokeredMessage,与我们当前正在处理的 MessageId、Label 和有效负载相同,但它包含一个 ScheduledEnqueueTimeUtcin未来。
想法是我们快速完成对当前消息的处理,并通过调度未来的新消息等待重试。
有时,尤其是当队列中的消息多于 SDK peek-locks 时,我看到消息在 ServiceBus 队列中重复。它们具有相同的MessageId、Label 和有效负载,但具有不同的SequenceNumber、EnqueuedTimeUtc 和ScheduledEnqueueTimeUtc。它们的交付计数均为 1。
查看我的处理程序代码,发生这种情况的唯一方法是,如果我多次收到相同的消息,我需要等待并创建一条新消息以供将来处理。处理程序成功完成,因此原始消息完成。
初始消息是唯一的。此外,我将SingletonAttribute 放在消息处理程序上,这样同一MessageId 的消息就不能被不同的处理程序使用。
为什么多个处理程序会使用相同的消息触发,我该如何防止这种情况发生?
我使用的是Microsoft.Azure.WebJobs 版本是v2.1.0
我的处理程序的持续时间最长为 17 秒,平均为 1 秒。锁定持续时间为 1m。仍然我最好的理论是消息(重新)锁定的东西不起作用,所以当我处理处理程序时,锁丢失了,消息返回队列并再次被消耗。如果两个处理程序都看到关键资源仍被占用,它们都会将新消息排入队列。
【问题讨论】:
-
您的消息处理程序在做什么?对不起,但你做事的方式听起来很奇怪......
-
我有几个不同的消息处理程序使用该模式。我无法透露他们到底在做什么。你认为究竟是什么听起来很有线?
-
您会收到一条消息,然后您会在未来将新消息排入队列。不是很明白为什么。您不能在消息到达时对其进行处理吗?
-
如果未满足处理程序的先决条件,我会将一条消息排入队列以供将来处理。假设我的系统中有两个实体需要在我用消息触发的操作开始之前完成,我需要检查这些先决条件的状态,如果它们没有完成,我需要稍等一下,直到我可以开始处理了。
-
哦,好吧,现在说得通了 :-) 也许您一次只应该处理一条消息?
标签: azure-webjobs azure-webjobssdk