【问题标题】:locking during nservicebus handler在 nservicebus 处理程序期间锁定
【发布时间】:2017-03-02 09:26:39
【问题描述】:

我有一个场景,其中我需要一个 nservicebus 消息处理程序来防止同一 saga 的多个消息同时执行。

为了参数的处理程序会做这样的事情(在这个例子中过于简单了(

消息:

public class MyMessage : IMessage {
  public int OrderId {get;set;}
  public int NewQuantityLevel {get;set;}
}

传奇:

public void Handle(MyMessage message)
{
    // call remote service to get current order quantity
    // do some logic and update remote service with difference between original and new quantity

    Bus.Send(new MyOtherMessage())
}

现在我是我的流程,我可以随时收到 2 条或更多这样的消息,我不希望他们检索可能已经在其他地方更新或修改的订单数量。

我考虑了几个解决方案:

  1. 为订单获取一个互斥锁(目前我们只有一个worker实例在一台机器上运行,但未来有可能拥有多个,在这种情况下我们可能会使用redis锁或类似的东西)
  2. 在服务中使用 sql 锁定对行/数据进行序列化锁定(但不确定这是否可行)

这些都不是最理想的,而且感觉我正在反对框架

【问题讨论】:

    标签: message-queue nservicebus


    【解决方案1】:

    是否可以创建次要传奇,因为更新本身是一个长期运行的过程?当更新 saga 完成工作时,它可以发出信号继续原来的 saga。

    至于 NServiceBus 如何处理并发 sagas 有两种情况:

    • 当多个 saga start 消息进来时,只有一个会提交。其他消息将失败并通过重试获取。在第二次尝试时,传奇已经存在,并且没有创建第二个实例。这将确保只创建一个 saga。

    • 当您同时访问 saga(例如更新状态)时,持久性存储并发设置生效。如果使用 RavenDB,NServiceBus 会打开optimistic concurrency support

    这一切都在 NServiceBus 文档的this page 中有更详细的记录。

    如果您需要确保每批仅存在一个 saga 实例(例如,如果您可以将 saga 与您想要锁定的产品 ID 相关联),您可以将其用作关联 ID,因此每个批次仅存在一个 saga 批次

    如果您只需要一个 saga 实例(更像是一个单例 saga),您可以使用无操作关联逻辑以及自定义 saga finder。这样,您仍然可以扩展端点,其他处理程序/sagas 不会受到影响。该技术显示为here

    【讨论】:

      【解决方案2】:

      传奇就是锁。

      正如@Hadi 提到的,NServiceBus 将使用乐观并发来确保一次只有一条消息可以更新 saga 实例。

      不要直接在 saga 中执行更新,而是存储您正在执行更新的事实并将执行远程服务调用的消息发送到不同端点中的单独消息处理程序。将事实存储在 saga 中并发送消息以执行它,要么完成要么根本不完成。如果两条消息尝试同时执行此操作,则只有一条消息会成功完成。另一条消息会得到并发异常,回滚到队列并最终被重试。

      那时它会看到已经有一个数量更新操作发生。然后,您可以丢弃第二条消息或在 saga 上存储一些状态,以确保在第一次完成后发生第二次数量更新。

      将远程服务调用与全双工请求/响应消息一起移到 saga 之外,确保了作为流程管理器的 saga 和作为集成点的消息处理程序之间的良好分离。

      伪代码

      public class MySaga
      {
          public void Handle(MyMessage message)
          {
              if(Data.CurrentlyUpdatingQuantity)
                  return; //or schedule for later
      
              Data.CurrentlyUpdatingQuantity = true;
      
              Bus.Send(new PerformQuantityUpdateMessage(message.OrderId));    
          }
      
          public void Handle(QuantityUpdateResponse message)
          {
              Data.CurrentlyUpdatingQuantity = false;
              Bus.Send(new MyOtherMessage());
          }
      }
      

      单独的消息处理程序(不是 SAGA 的一部分)

      public void Handle(PerformQuantityUpdateMessage message)
      {
          // call remote service to get current order quantity
          // do some logic and update remote service with difference between original and new quantity
      
          Bus.Reply(new QuantityUpdateResponse(message.OrderId));    
      }
      

      【讨论】:

      • 我应该使用延迟传递来重新安排邮件还是自己跟踪邮件并在发件人中检查?
      • 对于这种情况,我可能会通过在 saga 数据中存储“我需要执行另一个更新数量”操作来跟踪它。然后首先在Handle(QuantityUpdateResponse) 中检查该标志,并在需要时发送另一个PerformQuantityUpdateMessage。通过这种方式,您可以将多个额外的MyMessage 批处理到单个数量更新操作中,并确保在初始操作完成后立即执行。
      • 如果同时发生的消息是针对同一个唯一键的,难道它们不能更新 saga 数据吗
      • 应该可以的。只要 orderId 相同,它总是会映射到同一个 saga 实例。 (至少在你明确完成这个传奇之前)
      【解决方案3】:

      你可能不应该在你的传奇中这样做:

      //调用远程服务获取当前订单数量

      相反,应该将其移到单独的端点,作为流程管理器的 saga 以全双工请求/响应消息传递方式与之交互。

      因此,当 saga 获得启动它的第一个触发消息时,它会向 RemoteServiceInvocationEndpoint 发送请求消息,并更新其状态以指示它正在等待响应。

      RemoteServiceInvocationEndpoint 在得到远程服务的响应后会向 saga 发送响应消息。

      当 saga 收到响应消息时,它将知道该过程已完成,然后执行所需的任何最终操作 - 例如发送其他消息。

      如果 saga 收到另一个触发消息,它可以检查它的状态并查看它已经发送了一个请求消息并且知道不再发送另一个。

      正如@Hadi 在他的回复中所说,NServiceBus 中的并发控制机制将保证 saga 一次只能成功处理一条消息。

      【讨论】:

        猜你喜欢
        • 2013-09-06
        • 1970-01-01
        • 2023-03-30
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多