【问题标题】:Azure: circular (ring) queue with unique keyAzure:具有唯一键的循环(环形)队列
【发布时间】:2015-08-14 19:53:20
【问题描述】:

对于我的应用,我需要组织一个循环(环形)队列。这意味着任何处理过的消息都会立即进入队列的末尾继续处理。

例如:

  1. 队列:ABC
  2. 接收方进程A
  3. 队列:BCA

2 和 3 应该以原子方式执行。所以我们永远不会丢失A 或任何其他消息。

另一个要求是忽略重复项。所以队列中应该总是有一个A。即使发件人推送另一个 A 项目。 A 在这里指的是消息的一些唯一(主)键。

我寻找使用 Azure 服务总线,但我找不到如何同时满足这两个要求。是否可以使用 Service Bus 实现该场景?如果没有,最好的选择是什么?

【问题讨论】:

    标签: azure azureservicebus


    【解决方案1】:

    这种队列可以通过服务总线会话来实现。会话提供“分组依据”机制,因此我们可以将唯一键分配给消息的SessionId,然后分组接收消息,忽略组中除第一个消息之外的所有消息。

    实施

    1) 创建一个队列,将RequiresSession 设置为true:

    var queueDescription = new QueueDescription("CircularQueue")
    {
        RequiresSession = true,
    };
    await namespaceManager.CreateQueueAsync(queueDescription);
    

    2) 向队列发送消息时,将SessionId 设置为您的唯一键值:

    var message = new BrokeredMessage($"Message body")
    {
        MessageId = "MESSAGE_UNIQUE_KEY",
        SessionId = "MESSAGE_UNIQUE_KEY"
    };
    await queueClient.SendAsync(message);
    

    3) 使用会话接收消息:

    while (true)
    {
        var session = await queueClient.AcceptMessageSessionAsync(TimeSpan.FromSeconds(10));
        if (session == null)
            continue;
    
        try
        {
            var messages = (await session.ReceiveBatchAsync(100)).ToList();
    
            if (messages.Count == 0)
                continue;
    
            var message = messages[0];
    
            ProcessMessage(message);
    
            await queueClient.SendAsync(message.Clone());
            await session.CompleteBatchAsync(messages.Select(msg => msg.LockToken));
        }
        finally
        {
            await session.CloseAsync();
        }
    }
    

    【讨论】:

      【解决方案2】:

      根据我对 Azure 服务总线的了解,我相信这两个要求都可以单独满足,但我不确定如何同时满足这两个要求。

      消息循环

      据我了解,Azure 服务总线支持First-In-First-Out (FIFO) 行为。您可以做的是在Receive and Delete 模式下从队列顶部获取消息(例如A),然后将消息重新插入队列中。由于您正在创建一条新消息,它将被发布到队列的末尾。

      避免重复消息

      Service Bus Queues 有一个名为RequiresDuplicateDetection 的布尔属性,相应地设置此值将防止插入重复消息。简单搜索Azure Service Bus Duplicate Detection 即可找到许多示例。

      【讨论】:

      • 此实现存在一些问题: 1) 如果在接收和重新插入消息之间发生某些事情,使用“接收和删除”可能会导致消息丢失(可以通过峰值锁定模式修复)。 2) 服务总线中的重复检测实现会忽略重复,即使之前的消息已经被处理过。所以我们在这里丢失了消息。 3) 重复检测也有 7 天的限制,所以如果我们超过这个限制,它将无法工作,我们甚至都不知道。
      • 我同意你上面提到的所有观点。 “接收和删除”肯定会带来潜在的“消息丢失”问题。但我不确定“Peek & Lock”如何与“重复检测”一起使用,因为消息仍在队列中。我很想知道你最终想出了什么解决方案:)。
      • 是的,重复消息是一个真正的问题。使用服务总线似乎无法正确实现它。寻找其他选择:SQL Azure、Redis。
      • 看起来重复检测可以使用服务总线中的会话来实现。唯一键应设置为 SessionId 属性。然后接受会话所有重复的消息也与原始消息一起检索。看起来很奇怪,但应该在这里工作。
      • 已发布解决方案作为答案。
      猜你喜欢
      • 1970-01-01
      • 2012-08-05
      • 2012-07-06
      • 1970-01-01
      • 1970-01-01
      • 2021-02-12
      • 2013-11-15
      • 1970-01-01
      • 2014-01-04
      相关资源
      最近更新 更多