【问题标题】:The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue提供的锁无效。锁已过期,或者消息已从队列中删除
【发布时间】:2015-01-24 15:23:24
【问题描述】:

我正在使用 Microsoft azure 服务总线队列来处理计算,并且我的程序可以正常运行几个小时,但是从那时起我开始处理的每条消息都出现此异常。我不知道从哪里开始,因为前几个小时一切正常。我的代码似乎也是准确的。我将发布我处理 azure 服务总线消息的方法。

public static async Task processCalculations(BrokeredMessage message)
    {
        try
        {
            if (message != null)
            {
                if (connection == null || !connection.IsConnected)
                {
                    connection = await ConnectionMultiplexer.ConnectAsync("connection,SyncTimeout=10000,ConnectTimeout=10000");
                    //connection = ConnectionMultiplexer.Connect("connection,SyncTimeout=10000,ConnectTimeout=10000");
                }

                cache = connection.GetDatabase();

                string sandpKey = message.Properties["sandp"].ToString();
                string dateKey = message.Properties["date"].ToString();
                string symbolclassKey = message.Properties["symbolclass"].ToString();
                string stockdataKey = message.Properties["stockdata"].ToString();
                string stockcomparedataKey = message.Properties["stockcomparedata"].ToString();

                var sandpTask = cache.GetAsync<List<StockData>>(sandpKey);
                var dateTask = cache.GetAsync<DateTime>(dateKey);
                var symbolinfoTask = cache.GetAsync<SymbolInfo>(symbolclassKey);
                var stockdataTask = cache.GetAsync<List<StockData>>(stockdataKey);
                var stockcomparedataTask = cache.GetAsync<List<StockMarketCompare>>(stockcomparedataKey);

                await Task.WhenAll(sandpTask, dateTask, symbolinfoTask,
                    stockdataTask, stockcomparedataTask);

                List<StockData> sandp = sandpTask.Result;
                DateTime date = dateTask.Result;
                SymbolInfo symbolinfo = symbolinfoTask.Result;
                List<StockData> stockdata = stockdataTask.Result;
                List<StockMarketCompare> stockcomparedata = stockcomparedataTask.Result;

                StockRating rating = performCalculations(symbolinfo, date, sandp, stockdata, stockcomparedata);

                if (rating != null)
                {
                    saveToTable(rating);
                    if (message.LockedUntilUtc.Minute <= 1)
                    {
                        await message.RenewLockAsync();
                    }
                    await message.CompleteAsync(); // getting exception here
                }
                else
                {
                    Console.WriteLine("Message " + message.MessageId + " Completed!");
                    await message.CompleteAsync();
                }
            }
        }
        catch (TimeoutException time)
        {
            Console.WriteLine(time.Message);
        }
        catch (MessageLockLostException locks)
        {
            Console.WriteLine(locks.Message);
        }
        catch (RedisConnectionException redis)
        {
            Console.WriteLine("Start the redis server service!");
        }
        catch (MessagingCommunicationException communication)
        {
            Console.WriteLine(communication.Message);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
            Console.WriteLine(ex.StackTrace);
        }
    }

更新:我检查锁到期前的时间,如果需要,我会调用锁更新,但它会更新锁而没有错误,但我仍然收到此异常。

timeLeft = message.LockedUntilUtc - DateTime.UtcNow;
  if (timeLeft.TotalMinutes <= 2)
                    {
                        //Console.WriteLine("Renewed lock! " + ((TimeSpan)(message.LockedUntilUtc - DateTime.UtcNow)).TotalMinutes);
                        message.RenewLock();
                    }

catch (MessageLockLostException locks)
        {
            Console.WriteLine("Delivery Count: " + message.DeliveryCount);
            Console.WriteLine("Enqueued Time: " + message.EnqueuedTimeUtc);
            Console.WriteLine("Expires Time: " + message.ExpiresAtUtc);
            Console.WriteLine("Locked Until Time: " + message.LockedUntilUtc);
            Console.WriteLine("Scheduled Enqueue Time: " + message.ScheduledEnqueueTimeUtc);
            Console.WriteLine("Current Time: " + DateTime.UtcNow);
            Console.WriteLine("Time Left: " + timeLeft);
        }

到目前为止,我所知道的是我的代码运行良好一段时间并且更新锁被调用并工作,但我仍然收到锁异常并且在该异常中,我输出 timeleft 并且它不断增加时间差代码运行让我相信直到锁定到期的时间没有以某种方式改变?

【问题讨论】:

  • 你解决过这个问题吗?那是什么?
  • @FyodorSoikin 不,我最终不得不放弃并采用不同的方法来做同样的事情。据我所知,这是 api 中的一个错误,但微软没有人回复我的帖子
  • 您的任何个人消息的处理时间是否可能超过 60 秒?
  • @DalSoft 我会在密集计算完成后和处理消息之前更新锁
  • @user3610374 您是否尝试过将 LockDuration 设置为 5 分钟(这是最大设置),默认为 60 秒,如果在锁定更新前超过 60 秒,则会引发此异常。

标签: c# azure distributed azureservicebus


【解决方案1】:

我花了几个小时试图弄明白为什么我收到了MessageLockLostException。我的原因是AutoComplete 默认为真。

如果您要调用messsage.Complete()(或CompleteAsync()),那么您应该实例化一个OnMessageOptions 对象,将AutoComplete 设置为false,并将其传递给您的OnMessage 调用。

var options = new OnMessageOptions();
options.AutoComplete = false;

client.OnMessage(processCalculations, options);

【讨论】:

  • 谢谢杰弗里,这有帮助;你能说出原因吗?
  • @mebjas 你不应该多次“完成”一条消息。要么让 AutoComplete 完成工作,要么自己处理(Complete() / CompleteAsync()),但避免两者都做。
  • 会在host.json文件中设置“autoComplete”:false吗?
【解决方案2】:

我遇到了类似的问题。消息已成功处理,但当它们完成时,服务总线不再具有有效的锁。原来我的 TopicClient.PrefetchCount 太高了。

似乎所有预取的消息一被提取就开始锁定。如果您的累积消息处理时间超过锁定超时,则所有其他预取消息都将无法完成。它将返回到服务总线。

【讨论】:

  • 预取计数应该是多少?
  • 这就是为我做的!如果您将服务总线触发器用于长时间(ish)运行的作业,则需要考虑更改预取计数设置。
  • @MarcusW 嘿,我也遇到了同样的问题,您将预取计数设置为多少?此外,它还有一种清除服务总线队列的方法,我使用的是 azure web 作业 sdk,并且无法直接访问队列客户端。
  • @inquisitive 我不记得了,但尝试从一个开始,然后根据您的性能要求增加。一种试错游戏。对于监控和管理总线,我可以推荐这个工具:github.com/paolosalvatori/ServiceBusExplorer/releases
【解决方案3】:

我花了 2 天时间解决了类似的问题 - 相同的异常。
此异常可能有多种原因,我将描述几个可能对您陌生的配置选项...

ServiceBus 队列或主题订阅配置:

  • 队列/主题订阅上的消息锁定持续时间太低,将其设置为大约。消息处理时间

ServiceBusClient 选项配置:

  • tryTimeout 太短,将其设置为 ~10 秒以进行诊断

ServiceBusProcessor 选项配置:

  • AutoCompleteMessages默认设置为true,设置为false
  • PrefetchCount 太高,诊断时将其设置为 0
  • ReceiveMode 将其设置为 ServiceBusReceiveMode.PeekLock
  • MaxConcurrentCalls 用于诊断,将其设置为 1

在找到正确的值(针对给定系统进行了优化)后,我不再观察到任何问题。

【讨论】:

  • TY - 这极大地帮助了我,我能够进行大量本地主机检查以支持调试。
  • 将 prefetchCount 设置为 0 解决了我的问题。
【解决方案4】:

创建客户端订阅时,不要手动更新锁,而是尝试使用客户端的 OnMessageOptions() 自动更新它,如下所示:

        OnMessageOptions options = new OnMessageOptions();

        options.AutoRenewTimeout = TimeSpan.FromMinutes(1);

        try
        {
            client = Subscription.CreateClient();

            client.OnMessageAsync(MessageReceivedComplete, options);
        }
        catch (Exception ex)
        {
            throw new Exception (ex);
        }

【讨论】:

  • 您使用的是什么版本的 ServiceBus?我拥有的版本是 v2.2.0.0 并且 OnMessageOptions 类中没有属性调用 AutoRenewTimeout
  • 嗨罗加拉。我使用的是 2.7.0.0 版。
【解决方案5】:

就我而言,我只是在本地计算机上开发 V2,而 V1 已经部署并运行。

由于 V1 部署在 Azure 中(更接近同一个队列)并在发布模式下编译(而我的本地版本在调试模式下),因此部署的版本总是“赢得”队列资源的并发性。

这就是消息不再在队列中的原因:它被我的代码的部署版本使用了。我知道它有点笨。

【讨论】:

    猜你喜欢
    • 2020-07-12
    • 2020-12-13
    • 1970-01-01
    • 2020-08-30
    • 2023-03-03
    • 1970-01-01
    • 2022-09-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多