【问题标题】:MassTransit In-Memory Outbox in Saga佐贺的 MassTransit In-Memory Outbox
【发布时间】:2020-02-24 03:19:51
【问题描述】:

我们对带有 In-Memory 发件箱的 Saga 进行了一些负载测试。在这些测试中,我们模拟了不同类型的故障:应用程序重启、基础设施重启、消息代理重启等。

我们注意到,一些 saga 实例没有完成,并且出现了一堆错误: Automatonymous.NotAcceptedStateMachineException: ... {SomeEvent}: Not Accepted in state {SomeState}

经过一些调试,我们隔离了问题。我将尝试使用此示例代码来描述它:

public class OrderStateMachine : MassTransitStateMachine<Order>
{
    public OrderStateMachine()
    {
        InstanceState(x => x.CurrentState);

        During(Initial, 
            When(Create).TransitionTo(New));

        During(New,
            When(AddItem)
                .Then(x => x.Instance.Items.Add(x.Data.Name)),

            When(Submit)
                .ThenAsync(async x =>
                {
                    // do something
                    await x.Publish(new SendEmail {Text = $"Order submitted. {x.Instance.Summary}"});
                })
                .TransitionTo(Submitted));

        During(Submitted,
            When(Accept)
                .ThenAsync(async x =>
                {
                    // do something
                    await x.Publish(new SendEmail {Text = $"Order accepted. {x.Instance.Summary}"});
                })
                .Finalize());

        SetCompletedWhenFinalized();
    }

    public State New { get; private set; }
    public State Submitted { get; private set; }

    public Event<Create> Create { get; private set; }
    public Event<AddItem> AddItem { get; private set; }
    public Event<Submit> Submit { get; private set; }
    public Event<Accept> Accept { get; private set; }
}

public class Order : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public IList<string> Items { get; set; } = new List<string>();

    public string Summary => $"Items: {string.Join(", ", Items)}";
}

public class Create : CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; set; }
}

public class AddItem : CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; set; }
    public string Name { get; set; }
}

public class Submit : CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; set; }
}

public class Accept : CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; set; }
}

public class SendEmail
{
    public string Text { get; set; }
}

会发生这样的事情:

  1. Order 状态 New 期间,我们处理 Submit 事件,做一些 更改顺序,发布 SendEmail 事件并转换到 提交状态。
  2. 订单状态已成功持久化到数据库
  3. 应用程序在发送消息之前崩溃(即强制重启) 发件箱。
  4. 应用程序重新启动,提交第二次交付,订单处于已提交状态,我们得到一个异常 Automatonymous.NotAcceptedStateMachineException: 。 .. 提交:在提交状态不接受

如果在 Accept 事件处理期间状态 Submitted 发生这种情况怎么办?我的假设:

  1. Order 状态 Submitted 期间,我们处理 Accept 事件,做一些 更改顺序,发布 SendEmail 事件并完成 Order
  2. Order 状态已从 DB 中移除,因为它已完成并配置为 CompletedWhenFinalized
  3. 应用程序在发送消息之前崩溃(即强制重启) 发件箱。
  4. 应用程序重新启动,Accept 第二次交付,Order 不再在 DB 中,我们丢失了有关它的所有信息......现在会发生什么?李>

处理此类情况的最佳解决方案是什么?我已经阅读了 Chris 关于 In-Memory Outbox 的精彩 article,但不明白当 Saga 处于不再处理该消息的状态时,如何在重新传递期间处理该消息。当然我们可以用一些棘手的逻辑来处理下一个状态下的redelivered事件,但是看起来很麻烦。我们的 Saga 比提供的示例复杂得多。

也许在发件箱中的所有消息都已发送后提交的事务将是一个解决方案吗? Transaction Outbox 可以以某种方式配置 Saga 吗?

【问题讨论】:

    标签: masstransit


    【解决方案1】:

    既然您已经阅读了有关使用发件箱的文章,并且您意识到您需要将Submit 的处理程序添加到Submitted 状态,那就是真正的答案。但是,与更新 saga 状态并被持久化的原始处理程序不同,您只需要重新生成已发送/发布的事件。这处理了问题的第一部分,已提交

    第二部分是一个不同的答案,实际上很简单。您没有在接受中完成订单。您创建了一个附加状态,Accepted,订单在被接受后转换到该状态。并且您在一段时间(一周、一个月等)后删除订单实例。这样,当 Accept 消息传递到 Accepted 实例时,您可以重新生成已发布的事件。

    现在,您可以使用 Quartz 安排未来的消息来完成 saga,这不会执行任何业务逻辑,而只会删除 saga 实例。并且您可以设置 Initially(When(RemoveOrder).Ignore()) 处理程序,如果 saga 不存在,该处理程序将丢弃删除订单消息。这使它成为自动的。但在过去的系统中,我们只是归档文件组的日期范围分区(在 SQL 服务器中)或在 30 或 90 天或其他任何时间后删除较旧的记录。

    【讨论】:

    • 非常感谢。现在一切都清楚了。第二种情况实际上对我们来说不是问题。我们从未计划自动完成最终的会话。我们有一个外部机制来归档旧记录并直接在数据库上执行。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-12-27
    相关资源
    最近更新 更多