【问题标题】:Azure.Messaging.ServiceBus Error when completing or abandoning message完成或放弃消息时出现 Azure.Messaging.ServiceBus 错误
【发布时间】:2022-02-21 21:16:38
【问题描述】:

我目前正在考虑将我们非常简单的服务总线服务更新到最新版本 (Asure.Messaging.Servicebus),我在这里遇到了一个较小的问题。

问题是我想通过将消息委托给我的服务类中的方法来手动完成或放弃接收或峰值消息来处理工作。

到目前为止,这是我的简单类,由接口公开。

using myProject.Interfaces;
using myProject.Utilities;
using Azure.Messaging.ServiceBus;
using System;

namespace myProject.Services
{
    /// <summary>
    /// Service bus service controller
    /// </summary>
    public class ServiceBusService: IServiceBusService
    {
        private IConfigurationUtility _configurationUtility;
        static string connectionString;
        static ServiceBusClient client;
        static ServiceBusSender sender;
        static ServiceBusReceiver receiver;

        public ServiceBusService(IConfigurationUtility configurationUtility)
        {
            _configurationUtility = configurationUtility;
            connectionString = _configurationUtility.GetSetting("ServiceBusConnectionString");
            client = new ServiceBusClient(connectionString);
        }

        /// <summary>
        /// Sending message.
        /// </summary>
        /// <param name="messageContent"></param>
        /// <param name="queueName"></param>
        public void SendMessage(string messageContent, string queueName)
        {
            sender = client.CreateSender(queueName);
            ServiceBusMessage message = new ServiceBusMessage(messageContent);
            sender.SendMessageAsync(message).Wait();
        }

        /// <summary>
        /// Receive message.
        /// </summary>
        /// <returns></returns>
        /// <param name="queueName"></param>
        public ServiceBusReceivedMessage ReceiveMessage(string queueName)
        {

            receiver = client.CreateReceiver(queueName);
            ServiceBusReceivedMessage receivedMessage = receiver.ReceiveMessageAsync().Result;
            return receivedMessage;
        }

        /// <summary>
        /// Peek message.
        /// </summary>
        /// <returns></returns>
        public ServiceBusReceivedMessage PeekMessage(string queueName)
        {
            receiver = client.CreateReceiver(queueName);
            ServiceBusReceivedMessage peekedMessage = receiver.PeekMessageAsync().Result;
            return peekedMessage;
        }

        /// <summary>
        /// Complete message.
        /// </summary>
        /// <param name="message"></param>
        public void CompleteMessage(ServiceBusReceivedMessage message)
        {
            receiver.CompleteMessageAsync(message).Wait();
        }

        /// <summary>
        /// Abandon message.
        /// </summary>
        /// <param name="message"></param>
        public void AbandonMessage(ServiceBusReceivedMessage message)
        {
            receiver.AbandonMessageAsync(message).Wait();
        }
    }
}

当我一次处理一条消息时,一切正常,但如果我一次处理两条消息,我会收到此错误:

“提供的锁无效。锁已过期,或者消息已从队列中删除,或者已被不同的接收器实例接收”

我认为 "or was received by a different receiver instance" 部分是我的问题。

我是不是想错了?我不应该能够处理例如:一次放弃多个单个收到的消息吗?

提前致谢。

更新:

所以我设法让 Jesse Squire 考虑到微软提供的新 Servicebus 文档的建议工作。

我创建了一个名为 ServicebusFactory 的新类,在下面添加了 Jesse 提供的代码,以对其进行测试,并将“= new();”部分更改为初始化的“ConcurrentDictionary's”在示例顶部,因为像这样初始化它们会导致以下错误:

CS8400 功能“目标类型对象创建”在 C# 8.0 中不可用。请使用 9.0 或更高版本的语言。

结果:

    public class ServiceBusFactory : IAsyncDisposable, IServiceBusFactory
    {
        private readonly ServiceBusClient _client;
        private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new ConcurrentDictionary<string, ServiceBusSender>();
        private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new ConcurrentDictionary<string, ServiceBusReceiver>();

        public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
        public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);

        public ServiceBusSender GetSender(string entity) =>
            _senders.GetOrAdd(entity, entity => _client.CreateSender(entity));

        public ServiceBusReceiver GetReceiver(string entity) =>
            _receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));

        public async ValueTask DisposeAsync()
        {
            await _client.DisposeAsync().ConfigureAwait(false);
            GC.SuppressFinalize(this);
        }
    }
}

然后我为我的服务总线工厂创建了一个接口,并将它作为一个单例添加到我的“Startup.cs”类中的“ConfigurationService”方法的底部,用我的服务总线连接字符串初始化它。

界面:

public interface IServiceBusFactory
{
   public ServiceBusSender GetSender(string entity);

   public ServiceBusReceiver GetReceiver(string entity);

   public ValueTask DisposeAsync();
}

启动:

var serviceBusConnectionString = "[CONNECTION STRING]";
services.AddSingleton<IServiceBusFactory>(new ServiceBusFactory(serviceBusConnectionString));

最后这是一个依赖注入 servicebusFactory 接口到我的控制器构造函数的问题,然后使用它来获取“sender”并使用它向我的名为“”的队列发送消息添加”。

构造函数:

private readonly IServiceBusFactory _serviceBusFactory;

public ServicebusController(IServiceBusFactory serviceBusFactory) 
{
    _serviceBusFactory = serviceBusFactory; 
}

控制器方法/动作实现:

var test = new List<ServiceBusMessage>();
test.Add(new ServiceBusMessage("This is a test message."));

var sender = _serviceBusFactory.GetSender("Add");
sender.SendMessagesAsync(test);

【问题讨论】:

    标签: asp.net-core message-queue azureservicebus messaging azure-servicebus-queues


    【解决方案1】:

    服务总线将消息锁与接收消息的 AMQP 链接相关联。对于 SDK,这意味着您必须使用与接收消息相同的 ServiceBusReceiver 实例来处理消息。

    在您的代码中,您正在为 ReceiveMessage 调用创建一个新接收者 - 因此,当您尝试完成或放弃该消息时,您使用的是一个链接,如果对 @ 进行任何其他调用,则该消息对其无效987654324@已经发生了。

    通常,您希望避免创建短期服务总线客户端对象的模式。它们是intended to be long-lived,并在应用程序的整个生命周期内重复使用。在您的代码中,您还隐含地放弃了发送者/接收者而不关闭它们。这将孤立 AMQP 链接,直到服务在 20 分钟后因空闲而强制关闭它。

    我建议将您的发件人/收件人汇集在一起​​,并将每个人作为关联队列的单例。对于给定队列,对SendMessageReceiveMessage 的每次调用都应使用相同的发送方/接收方实例。

    当您的应用程序关闭时,请务必关闭或处置ServiceBusClient,这将确保其所有子发送者/接收者也被适当地清理。

    我还强烈建议将您的类重构为异步的。您正在使用的 sync-over-async 模式将对线程池施加额外的压力,并可能导致线程饥饿和/或负载下的死锁。

    更新

    要添加一些额外的上下文,我建议不要包装服务总线操作,而是拥有一个专注于管理客户端并让调用者直接与它们交互的工厂。

    这确保了客户端被池化并正确管理它们的生命周期,同时还为调用者提供了灵活性,以保持发送者/接收者引用并用于多个操作,而不是支付检索它的成本。

    例如,以下是一个简单的工厂类,您可以在应用程序中创建和管理它作为单例。调用者可以为特定队列/主题/订阅请求发送者/接收者,它们将根据需要创建,然后汇集以供重复使用。

    // This class is intended to be treated as a singleton for most 
    // scenarios.  Each instance created will open an independent connection
    // to the Service Bus namespace, shared by senders and receivers spawned from it.
    
    public class ServiceBusFactory : IAsyncDisposable
    {
        // If throughput needs scale beyond a single connection, the factory can
        // manage multiple clients and ensure that child entities are evenly distributed
        // among them.
        
        private readonly ServiceBusClient _client;
        private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new();
        private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new();
    
        public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
        public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);
    
        public ServiceBusSender GetSender(string entity) =>
             _senders.GetOrAdd(entity, entity => _client.CreateSender(entity));
    
        public ServiceBusReceiver GetReceiver(string entity) =>
            _receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));
    
        public async ValueTask DisposeAsync()
        {
            await _client.DisposeAsync().ConfigureAwait(false);
            GC.SuppressFinalize(this);
        }
    }
    

    【讨论】:

    • 好的,所以在阅读了您的答案几次并且不熟悉所有流行语(对这一切仍然感到新鲜)之后,我想我开始理解了......但是拨打解释一下您所说的我应该做的基本上是在您链接的同一 Microsoft 指南中为每个队列创建“发送和接收消息”中显示的部分?反过来,如我的示例所示动态更改队列是错误的思维方式吗?您是否可以根据将其作为服务的想法向我展示一个正确的示例,如上所示?
    • 当然,我已经更新了一些额外的上下文和示例。我没有更新您的课程,而是说明了我们推荐的最佳实践方法。
    • 好的,所以......我已经更新了“问题帖子”,我认为我已经理解了关于正确实施它以反映你的解释杰西。至少有一条消息确实到达了队列。虽然我在想关于 DisposeAsync 的最后一种方法。而且我不太确定我是否理解那部分。如果我按照您提到的那样关闭我的应用程序,应用程序关闭句柄不应该将其全部清除吗?或者我是否必须在每个控制器运行之后处理客户端?这不会否定将客户作为单身人士吗?再次感谢杰西! :)
    • 如果您将客户端注册为 DI 的一部分,那么您不需要显式处置它;宿主环境将在正确的时间隐式处理。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-17
    • 1970-01-01
    • 2021-12-22
    • 1970-01-01
    • 2011-10-25
    相关资源
    最近更新 更多