【问题标题】:Azure Service Bus Not Detecting DuplicatesAzure 服务总线未检测到重复项
【发布时间】:2016-12-25 06:46:45
【问题描述】:

我有一个从 Azure 服务总线队列读取消息并将该消息转换为要由 Azure 媒体服务编码的视频的进程。我注意到,如果该过程连续快速启动,则同一视频将紧接着被编码。这是我将视频添加到队列的代码

public class VideoManager
{
    string _connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
    string _queueName = ConfigurationManager.AppSettings["ServiceBusQueueName"];
    QueueClient _client;

    public VideoManager()
    {
        var conStringBuilder = new ServiceBusConnectionStringBuilder(_connectionString)
        {
            OperationTimeout = TimeSpan.FromMinutes(120)
        };

        var messagingFactory = MessagingFactory.CreateFromConnectionString(conStringBuilder.ToString());
        _client = messagingFactory.CreateQueueClient(_queueName);
    }

    public void Approve(Video video)
    {
        // Set video to approved. 
        video.ApprovalStatus = ApprovalStatus.Approved;
        var message = new BrokeredMessage(new VideoMessage(video, VideoMessage.MessageTypes.Approve, string.Empty));
        message.MessageId = video.RowKey;
        _client.Send(message);
    }
}

以及从队列中读取的进程

 class Program
{
    static QueueClient client;

    static void Main(string[] args)
    {
        VideoManager videoManager = new VideoManager();

        var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];

        var conStringBuilder = new ServiceBusConnectionStringBuilder(connectionString)
        {
            OperationTimeout = TimeSpan.FromMinutes(120)
        };

        var messagingFactory = MessagingFactory.CreateFromConnectionString(conStringBuilder.ToString());

        client = messagingFactory.CreateQueueClient(ConfigurationManager.AppSettings["ServiceBusQueueName"]);

        Console.WriteLine("Starting: Broadcast Center Continuous Video Processing Job");

        OnMessageOptions options = new OnMessageOptions
        {
            MaxConcurrentCalls = 25,
            AutoComplete = false
        };

        client.OnMessageAsync(async message =>
        {
            bool shouldAbandon = false;

            try
            {
                await HandleMessage(message);
            }
            catch (Exception ex)
            {
                shouldAbandon = true;
                Console.WriteLine(ex.Message);
            }
            if (shouldAbandon)
            {
                await message.AbandonAsync();
            }
        }, options);
        while (true) { }
    }
    async static Task<int> HandleMessage(BrokeredMessage message)
    {

        VideoMessage videoMessage = message.GetBody<VideoMessage>();

        Console.WriteLine(String.Format("Message body: {0}", videoMessage.Video.Title));
        Console.WriteLine(String.Format("Message id: {0}", message.MessageId));

        VideoProcessingService vp = new VideoProcessingService(videoMessage.Video);
        Task task;
        switch (videoMessage.MessageType)
        {
            case VideoMessage.MessageTypes.CreateThumbnail:
                task = new Task(() => vp.ProcessThumbnail(videoMessage.TimeStamp));
                task.Start();

                while (!task.IsCompleted)
                {
                    await Task.Delay(15000);
                    message.RenewLock();
                }
                await task;
                Console.WriteLine(task.Status.ToString());

                Console.WriteLine("Processing Complete");
                Console.WriteLine("Awaiting Message");
                break;
            case VideoMessage.MessageTypes.Approve:

                task = new Task(() => vp.Approve());
                task.Start();

                while (!task.IsCompleted)
                {
                    await Task.Delay(15000);
                    message.RenewLock();
                }
                await task;
                Console.WriteLine(task.Status.ToString());

                Console.WriteLine("Processing Complete");
                Console.WriteLine("Awaiting Message");
                break;
            default:
                break;
        }
        return 0;
    }
}

如果我连续 3 次启动该进程,我在控制台窗口中看到的内容如下

消息 ID:76aca19a-0698-449b-bf58-a24876fc4314

消息 ID:76aca19a-0698-449b-bf58-a24876fc4314

消息 ID:76aca19a-0698-449b-bf58-a24876fc4314

我想也许我的设置不正确,但它们在那里 我在这里真的很茫然,因为我希望这是开箱即用的行为。重复检测是否只有在消息完成后才起作用,所以我不能使用 OnMessageAsync()?

【问题讨论】:

  • 该队列上定义的 LockDuration 是什么?您可能会看到这一点,因为由于上一次尝试的锁定持续时间已过,消息正在被重新处理,但仍未完成。
  • 可能是这样,我已经包含了整个代码,它执行 RenewLock()
  • 要么这样做,要么利用本机 OnMessage API 为您完成。示例:weblogs.asp.net/sfeldman/azure-service-bus-autorenewtimeout

标签: c# azure azureservicebus


【解决方案1】:

问题不在于完成(就像在代码中那样),而是事实上您有多个消费者(25 个并发回调),而且看起来 LockDuration 的消逝速度比处理速度快。结果,消息重新出现并重新处理。因此,您会看到多次记录相同的消息 ID。

可能的解决方案是(正如我在上面的 comment 中概述的那样):

  1. 让 OnMessage API 为您管理超时扩展 (example)
  2. 使用BrokeredMessage.RenewLock 手动更新锁

【讨论】:

  • 只是消息 id 没有被记录多次,处理过程发生了多次。如果我走使用 OnMessage API 的路线,是否会停止将同一消息多次放入队列的情况?例如,如果放入该队列的 API 被调用两次?
  • 多次记录的消息 ID 表示多次处理。您必须在 LockDuration、AutoRenewal 时间和您的处理时间之间取得平衡。如果您多次删除具有相同 ID 的消息,这没有什么神奇之处,您将得到多次处理。您可以使用本机重复数据删除,但我不喜欢该功能,因为它非常脆弱。最好使您的处理程序具有幂等性。还有,native dedup 只是基于 Message ID,如果你需要使用别的东西,它就行不通了。
【解决方案2】:

您的 HandleMessage 代码中缺少一行代码。

async static Task<int> HandleMessage(BrokeredMessage message)
{
  VideoMessage videoMessage = message.GetBody<VideoMessage>();

  message.CompleteAsync(); // This line...

  Console.WriteLine(String.Format("Message id: {0}", message.MessageId));
  // Processes Message
}

所以是的,您必须使用 Complete、Defer 等来标记消息。

另见Answer,还发现this,这可能对重复检测的工作原理有用

【讨论】:

  • 我实际上正在这样做,我将它添加到 OP,它稍后会在流程中完成。
  • 好的,我会参考答案中提供的链接。希望这会对您有所帮助。
  • 谢谢,我看过这些,但它们不太适合我的简单问题。我想知道是否因为我以异步方式获取所有消息,所以它们在第一个消息完成之前得到处理。这是可能的,因为对视频进行编码可能需要一分钟以上的时间。
  • 通过它的工作原理,消息将从队列中删除。只需检查如果在等待调用之前将 message.Complete() 移动到 client.OnMessageAsync 会发生什么。
猜你喜欢
  • 2013-05-27
  • 2016-12-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多