【发布时间】:2022-02-21 21:16:38
【问题描述】:
我目前正在考虑将我们非常简单的服务总线服务更新到最新版本 (Asure.Messaging.Servicebus),我在这里遇到了一个较小的问题。
问题是我想通过将消息委托给我的服务类中的方法来手动完成或放弃接收或峰值消息来处理工作。
到目前为止,这是我的简单类,由接口公开。
using myProject.Interfaces;
using myProject.Utilities;
using Azure.Messaging.ServiceBus;
using System;
namespace myProject.Services
{
/// <summary>
/// Service bus service controller
/// </summary>
public class ServiceBusService: IServiceBusService
{
private IConfigurationUtility _configurationUtility;
static string connectionString;
static ServiceBusClient client;
static ServiceBusSender sender;
static ServiceBusReceiver receiver;
public ServiceBusService(IConfigurationUtility configurationUtility)
{
_configurationUtility = configurationUtility;
connectionString = _configurationUtility.GetSetting("ServiceBusConnectionString");
client = new ServiceBusClient(connectionString);
}
/// <summary>
/// Sending message.
/// </summary>
/// <param name="messageContent"></param>
/// <param name="queueName"></param>
public void SendMessage(string messageContent, string queueName)
{
sender = client.CreateSender(queueName);
ServiceBusMessage message = new ServiceBusMessage(messageContent);
sender.SendMessageAsync(message).Wait();
}
/// <summary>
/// Receive message.
/// </summary>
/// <returns></returns>
/// <param name="queueName"></param>
public ServiceBusReceivedMessage ReceiveMessage(string queueName)
{
receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = receiver.ReceiveMessageAsync().Result;
return receivedMessage;
}
/// <summary>
/// Peek message.
/// </summary>
/// <returns></returns>
public ServiceBusReceivedMessage PeekMessage(string queueName)
{
receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage peekedMessage = receiver.PeekMessageAsync().Result;
return peekedMessage;
}
/// <summary>
/// Complete message.
/// </summary>
/// <param name="message"></param>
public void CompleteMessage(ServiceBusReceivedMessage message)
{
receiver.CompleteMessageAsync(message).Wait();
}
/// <summary>
/// Abandon message.
/// </summary>
/// <param name="message"></param>
public void AbandonMessage(ServiceBusReceivedMessage message)
{
receiver.AbandonMessageAsync(message).Wait();
}
}
}
当我一次处理一条消息时,一切正常,但如果我一次处理两条消息,我会收到此错误:
“提供的锁无效。锁已过期,或者消息已从队列中删除,或者已被不同的接收器实例接收”
我认为 "or was received by a different receiver instance" 部分是我的问题。
我是不是想错了?我不应该能够处理例如:一次放弃多个单个收到的消息吗?
提前致谢。
更新:
所以我设法让 Jesse Squire 考虑到微软提供的新 Servicebus 文档的建议工作。
我创建了一个名为 ServicebusFactory 的新类,在下面添加了 Jesse 提供的代码,以对其进行测试,并将“= new();”部分更改为初始化的“ConcurrentDictionary's”在示例顶部,因为像这样初始化它们会导致以下错误:
CS8400 功能“目标类型对象创建”在 C# 8.0 中不可用。请使用 9.0 或更高版本的语言。
结果:
public class ServiceBusFactory : IAsyncDisposable, IServiceBusFactory
{
private readonly ServiceBusClient _client;
private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new ConcurrentDictionary<string, ServiceBusSender>();
private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new ConcurrentDictionary<string, ServiceBusReceiver>();
public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);
public ServiceBusSender GetSender(string entity) =>
_senders.GetOrAdd(entity, entity => _client.CreateSender(entity));
public ServiceBusReceiver GetReceiver(string entity) =>
_receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));
public async ValueTask DisposeAsync()
{
await _client.DisposeAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
}
然后我为我的服务总线工厂创建了一个接口,并将它作为一个单例添加到我的“Startup.cs”类中的“ConfigurationService”方法的底部,用我的服务总线连接字符串初始化它。
界面:
public interface IServiceBusFactory
{
public ServiceBusSender GetSender(string entity);
public ServiceBusReceiver GetReceiver(string entity);
public ValueTask DisposeAsync();
}
启动:
var serviceBusConnectionString = "[CONNECTION STRING]";
services.AddSingleton<IServiceBusFactory>(new ServiceBusFactory(serviceBusConnectionString));
最后这是一个依赖注入 servicebusFactory 接口到我的控制器构造函数的问题,然后使用它来获取“sender”并使用它向我的名为“”的队列发送消息添加”。
构造函数:
private readonly IServiceBusFactory _serviceBusFactory;
public ServicebusController(IServiceBusFactory serviceBusFactory)
{
_serviceBusFactory = serviceBusFactory;
}
控制器方法/动作实现:
var test = new List<ServiceBusMessage>();
test.Add(new ServiceBusMessage("This is a test message."));
var sender = _serviceBusFactory.GetSender("Add");
sender.SendMessagesAsync(test);
【问题讨论】:
标签: asp.net-core message-queue azureservicebus messaging azure-servicebus-queues