【问题标题】:Azure Function v3 .NET Core 3.1 ServiceBusTrigger "Message processing error (Action=RenewLock)" even though autoComplete = falseAzure Function v3 .NET Core 3.1 ServiceBusTrigger“消息处理错误(Action=RenewLock)”,即使 autoComplete = false
【发布时间】:2021-05-06 01:25:26
【问题描述】:

我有一个运行长时间运行任务的队列触发器。遇到锁定持续时间和更新的各种问题,所以我决定在整个过程开始时手动完成消息。如果作业遇到错误,请将消息发送回队列。

在下面的日志数据中,您可以看到它收到消息,完成它,启动任务,记录一个似乎没有任何影响的lock supplied is invalid 错误,然后完成任务。我想停止这个错误。很烦人。

[FunctionName("QueueTrigger")]
public async Task RunQueue([ServiceBusTrigger("queuename", Connection = "cn")] Message message, MessageReceiver messageReceiver, [ServiceBus("queuename", Connection = "cn")] IAsyncCollector<Message> queue)
{
    var msg = JsonConvert.DeserializeObject<RequestObj>(Encoding.UTF8.GetString(message.Body));
    logger.LogInfo("*** Before complete");
    await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
    logger.LogInfo("*** After complete");
    try
    {
        if(msg.Whatever > 1)   throw new Exception("Bogus exception thrown for testing.");

        var result = await RunTheTask(msg);
        if (!result.IsSuccessful)
        {
            logger.LogError(result.Failure);
            throw new ApplicationException(result.Failure);
        };
    }
    catch (Exception ex)
    {
        logger.LogError(ex.Message);
        logger.LogInfo("*** About to requeue");
        await queue.AddAsync(new Message(message.Body));
        logger.LogSuccess("*** Queued the message again.");
    }
}

host.json 文件有 serviceBus 这样的配置(关闭 autoComplete

"serviceBus": {
      "prefetchCount": 1,
      "messageHandlerOptions": {
        "autoComplete": false,
        "maxConcurrentCalls": 1
      },
      "SessionHandlerOptions": {
        "autoComplete": false
      },
      "BatchOptions": {
        "AutoComplete": false
      }

在这两种情况下,代码都按预期工作,但在这两种情况下,我总是看到“无效锁”错误

一些日志内容:

[2021-02-01T20:46:27.248Z] Executing 'QueueTrigger' (Reason='(null)', Id=520a670c-e636-49b8-96b5-7502f6b479ac)
[2021-02-01T20:46:27.251Z] Trigger Details: MessageId: d7af56ed083947e99ce0fc468f111e4d, SequenceNumber: 37, DeliveryCount: 1, EnqueuedTimeUtc: 2021-02-01T20:40:44.5390000Z, LockedUntilUtc: 2021-02-01T20:41:44.5550000Z, SessionId: (null)
[2021-02-01T20:46:28.331Z] *** Before complete
[2021-02-01T20:46:28.471Z] *** After complete
[2021-02-01T20:46:28.498Z] Running the Task
[2021-02-01T20:46:28.826Z] Message processing error (Action=RenewLock, ClientId=whatever, EntityPath=queueName, Endpoint=blah.servicebus.windows.net)
[2021-02-01T20:46:28.830Z] Microsoft.Azure.ServiceBus: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:30780d70-2887-4923-8725-329ab94dd4b2, TrackingId:e3e85f42-1c62-44da-b858-1c856f972a1f_B14, SystemTracker:x:Queue:queueName, Timestamp:2021-02-01T20:40:46.
[2021-02-01T20:46:38.855Z] Finished the Task
[2021-02-01T20:46:42.221Z] Executed 'QueueTrigger' (Succeeded, Id=520a670c-e636-49b8-96b5-7502f6b479ac, Duration=14992ms)

【问题讨论】:

  • 您的处理时间是否超过了在处理之前完成邮件所需的 5 分钟?它有点违背 Peek-Lock 模式的想法,也不是很安全。
  • 通常不会超过 5 分钟,但可以。这就是为什么我一收到消息就将其标记为完成,然后我才不管需要多长时间。
  • 您是否尝试过先读取数据再完成消息?或者,克隆它而不是使用它的主体。
  • 如果我需要,获取消息、阅读内容、完成消息、运行任务或重新排队消息都没有问题。问题是,即使我手动调用CompleteAsync,某处的底层管道仍在尝试访问该消息。它可能不会影响任何事情,但我不希望发生错误。
  • 我会在github.com/Azure/azure-functions-servicebus-extension 中提出一个问题,看看这是否是一个错误。

标签: azure-functions azureservicebus queuetrigger


【解决方案1】:

我认为你应该把'complete'方法移到最后,该功能会自动刷新锁的过期时间以保持它在正常情况下可用。但是一旦你使用了'complete'方法,函数会生成最后一个锁,如果你的后续逻辑超过了锁时间,就会抛出锁过期异常。大家可以看看下面的代码,对比一下使用完整和不使用完整的区别:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace FunctionApp71
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task Run([ServiceBusTrigger("queuename", Connection = "cn")] Message message, MessageReceiver messageReceiver, [ServiceBus("queuename", Connection = "cn")] IAsyncCollector<Message> queue, ILogger logger)
        {
            logger.LogInformation("When the message comes in, the expire time is: "+message.SystemProperties.LockedUntilUtc.ToString());//Now the time is 2/4/2021 8:29:00 AM

            Thread.Sleep(6000);

            logger.LogInformation("After 6 seconds, the expired time is :"+message.SystemProperties.LockedUntilUtc.ToString());//The time of the lock is renewed, so now the time is 2/4/2021 8:29:04 AM

            //When you complete the message, there is no way to renew the lock because the lock is not exist(Azure function stop renew the lock.).
            await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
            logger.LogInformation("After complete the message, the expired time is :"+message.SystemProperties.LockedUntilUtc.ToString());//Now the time is still 2/4/2021 8:29:04 AM

            Thread.Sleep(6 * 1000);//Your try catch is here.

            logger.LogInformation("After complete message and 6 seconds later, the expired time is :"+message.SystemProperties.LockedUntilUtc.ToString());
        }
    }
}

【讨论】:

  • 我收到消息,反序列化正文,然后完成消息。我不关心除此之外的消息,也不与它或MessageReceiver 交互。通过我的代码进行的任何交互都不会导致错误。
  • @DevonS 我已经更改了答案,我认为您可以将“完整”方法而不是函数放在末尾。问题应该是由于消息已完成,函数不更新锁。
【解决方案2】:

记录了一个问题:https://github.com/Azure/azure-functions-servicebus-extension/issues/137 问题有一个 repo 链接到 repro。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-08-02
    • 2020-05-02
    • 1970-01-01
    • 2019-09-30
    • 2022-01-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多