【发布时间】:2021-09-22 11:53:48
【问题描述】:
我是 Azure 服务总线的新手,并试图了解启用会话的队列是如何工作的。
我有 3 个控制台应用程序:1 个消息发送器应用程序,用于将消息发送到队列和 2 个消息接收器应用程序。 Azure 服务总线队列已启用会话,我正在尝试实现并发会话。
每个运行消息发送者控制台应用程序的用户都会向队列发送一条消息,并且该消息必须被两个接收者控制台应用程序接收。 并且接收方应用程序必须按顺序接收/处理每条消息,首先是 Receiver1,然后是 Receiver2。为此,我标记了每条消息 在发送时使用接收方应用程序名称,然后在接收方代码中检查接收方应用程序要接收的标签名称。
以下是我用于发送和接收消息的代码。
消息发送者代码:
public static int Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
public async Task MainAsync()
{
await SendSessionMessagesAsync();
}
public async Task SendSessionMessagesAsync()
{
// Below userList coming from db
foreach (var user in userList)
{
List<ReceiverApplicationName> receiverApplicationNameList = new List<ReceiverApplicationName>();
var receiverApplicationNames = //getting the ReceiverApplicationName list from appsettings.json, [ "Receiver1", "Receiver2" ]
foreach (var item in receiverApplicationNames)
{
ReceiverApplicationName receiverApplicationName = new ReceiverApplicationName();
receiverApplicationName.UserId = user.Id;
receiverApplicationName.ApplicationName = item;
receiverApplicationNameList.Add(receiverApplicationName);
}
foreach (var queueMessageItem in receiverApplicationNameList)
{
var messageBody = JsonConvert.SerializeObject(queueMessageItem);
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
message.SessionId = "Session " + queueMessageItem.UserId;
Console.WriteLine($"Sending message for UserId: {queueMessageItem.UserId}, SessionId: {message.SessionId}, Message: {messageBody}");
await queueClient.SendAsync(message); // 2 messages(Receiver1 and Receiver2) for each user
}
}
}
消息接收方代码(Receiver1):
public static int Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
public async Task MainAsync()
{
RegisterOnSessionHandlerAndReceiveSessionMessages();
}
public void RegisterOnSessionHandlerAndReceiveSessionMessages()
{
var sessionHandlerOptions = new SessionHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentSessions = 2,
MessageWaitTimeout = TimeSpan.FromSeconds(5),
AutoComplete = false,
};
queueClient.RegisterSessionHandler(ProcessSessionMessagesAsync, sessionHandlerOptions);
}
public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
return Task.CompletedTask;
}
public async Task ProcessSessionMessagesAsync(IMessageSession session, Message message, CancellationToken token)
{
var result = JsonConvert.DeserializeObject<ReceiverApplicationName>(Encoding.UTF8.GetString(message.Body));
if (result.ReceiverApplicationName == "Receiver1")
{
Console.WriteLine($"Received message for UserId: {result.UserId}, Session: {session.SessionId}, Message: SequenceNumber: {message.SystemProperties.SequenceNumber}, Body:{Encoding.UTF8.GetString(message.Body)}, at: {DateTime.Now}");
await session.CompleteAsync(message.SystemProperties.LockToken);
}
}
消息接收方代码(Receiver2):
public static int Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
public async Task MainAsync()
{
RegisterOnSessionHandlerAndReceiveSessionMessages();
}
public void RegisterOnSessionHandlerAndReceiveSessionMessages()
{
var sessionHandlerOptions = new SessionHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentSessions = 2,
MessageWaitTimeout = TimeSpan.FromSeconds(5),
AutoComplete = false,
};
queueClient.RegisterSessionHandler(ProcessSessionMessagesAsync, sessionHandlerOptions);
}
public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
return Task.CompletedTask;
}
public async Task ProcessSessionMessagesAsync(IMessageSession session, Message message, CancellationToken token)
{
var result = JsonConvert.DeserializeObject<ReceiverApplicationName>(Encoding.UTF8.GetString(message.Body));
if (result.ReceiverApplicationName == "Receiver2")
{
Console.WriteLine($"Received message for UserId: {result.UserId}, Session: {session.SessionId}, Message: SequenceNumber: {message.SystemProperties.SequenceNumber}, Body:{Encoding.UTF8.GetString(message.Body)}, at: {DateTime.Now}");
await session.CompleteAsync(message.SystemProperties.LockToken);
}
}
结果: 我无法在会话 ID 中按顺序接收消息。 Receiver1 和 Receiver2 收到消息的时间戳不按顺序排列。例如,在 UserId = 2 时,Receiver1 应该在 Receiver2 应用程序接收到消息之前接收到消息。在会话中,应按添加顺序接收消息,否则我不正确。
消息发送方输出: MessageSender
接收器1输出: Receiver1
接收器2输出: Receiver2
【问题讨论】:
-
由于多个接收器同时运行,您不能期望消息以任何特定顺序到达。如果您的设计依赖于按顺序处理的消息,您应该重新审视您的设计。
标签: c# .net-core azureservicebus