【问题标题】:MassTransit The Step1FinishedEvent Event event is not handled during the ProcessingStartedState state for the StateMachine state machine errorMassTransit 在 ProcessingStartedState 状态期间未处理 Step1FinishedEvent 事件事件,用于 StateMachine 状态机错误
【发布时间】:2021-12-15 02:41:29
【问题描述】:

我正在尝试使用 MassTransit 状态机制作一个完全有效的示例,以协调完全解耦的服务并遇到异常:The Step1FinishedEvent Event event is not handled during the ProcessingStartedState state for the ArcStateMachine state machine error。在调试会话期间,消息(由消费者使用)似乎触发了由状态机处理的事件为时已晚。

我的定义:

// StateMachine processing instance definition
[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
    public Guid CorrelationId { get; set; }

    public string CurrentState { get; set; }

    public int Version { get; set; }

    public Guid ActivationId { get; set; }
}

// Contracts correlations definitions for message exchange of the state machine
public static class MessageContracts
{
    static bool _initialized;

    public static void Initialize()
    {
        if (_initialized)
            return;

        GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<StartStep1Message>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<Step1FinishedMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<StartStep2Message>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<Step2FinishedMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);

        _initialized = true;
    }
}

// Step1CounsumerDefinition to avoid fault messages be routed to _error RMQ queue for Step1Consumer
public class Step1ConsumerDefinition : ConsumerDefinition<Step1Consumer>
{
    public Step1ConsumerDefinition()
    {
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<Step1Consumer> consumerConfigurator)
    {
        endpointConfigurator.DiscardFaultedMessages();
    }
}

我的 2 个实际处理消费者:

public class Step1Consumer : IConsumer<StartStep1Message>
{
    readonly ILogger<Step1Consumer> _Logger;

    private readonly int _DelaySeconds = 5;

    public Step1Consumer(ILogger<Step1Consumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartStep1Message> context)
    {
        var activationId = context.Message.ActivationId;

        _Logger.LogInformation($"Step 1 started: {activationId}");

        await Task.Delay(_DelaySeconds * 1000);

        _Logger.LogInformation($"Step 2 finished: {activationId}");

        await context.Publish<Step1FinishedMessage>(new { ActivationId = activationId });
    }
}

public class Step2Consumer : IConsumer<StartStep2Message>
{
    readonly ILogger<Step2Consumer> _Logger;

    private readonly int _DelaySeconds = 1;

    public Step2Consumer(ILogger<Step2Consumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartStep2Message> context)
    {
        var activationId = context.Message.ActivationId;

        _Logger.LogInformation($"Step 2 started {activationId}");

        await Task.Delay(_DelaySeconds * 1000);

        _Logger.LogInformation($"Step 2 finished {activationId}");

        await context.Publish<Step2FinishedMessage>(new { ActivationId = activationId });
    }
}

我还有 2 个辅助消费者来协调不同服务之间的消息转换,以将它们解耦并检测所有处理的完成情况:

public class TransitionConsumer : 
    IConsumer<StartProcessingMessage>, 
    IConsumer<Step1FinishedMessage>, 
    IConsumer<Step2FinishedMessage>
{
    readonly ILogger<TransitionConsumer> _Logger;

    private readonly int _DelaySeconds = 5;

    public TransitionConsumer(
        ILogger<TransitionConsumer> logger
        )
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartProcessingMessage> context)
    {
        var activationId = context.Message.ActivationId;
        _Logger.LogInformation($"Transition from Started to Step 1: {activationId}");
        await context.Publish<StartStep1Message>(new { ActivationId = activationId });
    }

    public async Task Consume(ConsumeContext<Step1FinishedMessage> context)
    {
        var activationId = context.Message.ActivationId;
        _Logger.LogInformation($"Transition from Step 1 to Step 2: {activationId}");
        await context.Publish<StartStep2Message>(new { ActivationId = activationId });
    }

    public async Task Consume(ConsumeContext<Step2FinishedMessage> context)
    {
        var activationId = context.Message.ActivationId;
        _Logger.LogInformation($"Transition from Step 2 to Completion: {activationId}");
        await context.Publish<ProcessingFinishedMessage>(new { ActivationId = activationId });
    }
}

public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
    readonly ILogger<ProcessingFinishedConsumer> _Logger;

    public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
    {
        _Logger.LogInformation($"Finish {context.Message.ActivationId}");

        await Task.CompletedTask;
    }
}

还有一个Fault&lt;&gt; 消费者,负责处理可能来自Step1ConsumerStep2Consumer 的所有故障:

public class FaultConsumer : 
    IConsumer<Fault<StartStep1Message>>, 
    IConsumer<Fault<StartStep2Message>>
{
    readonly ILogger<FaultConsumer> _Logger;

    public FaultConsumer(ILogger<FaultConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<Fault<StartStep1Message>> context)
    {
        await LogError("Step 1", context.Message.Message.ActivationId, context.Message.Exceptions);
    }

    public async Task Consume(ConsumeContext<Fault<StartStep2Message>> context)
    {
        await LogError("Step 2", context.Message.Message.ActivationId, context.Message.Exceptions);
    }

    private async Task LogError(string step, Guid activationId, ExceptionInfo[] exceptions)
    {
        var errorMessages = string.Join(", ", exceptions.Select(e => e.Message));
        _Logger.LogInformation($"{step} failed for {activationId}, cause: {errorMessages}");
    }
}

这是状态机定义:

public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
    static ArcStateMachine()
    {
        MessageContracts.Initialize();
    }

    public ArcStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Initially(
            When(StartProcessingEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(ProcessingStartedState));

        During(ProcessingStartedState,
            When(Step1StartedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(Step1StartedState));

        During(Step1StartedState,
            When(Step1FinishedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(Step1FinishedState));

        During(Step1FinishedState,
            When(Step2StartedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(Step2StartedState));

        During(Step2StartedState,
            When(Step2FinishedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(Step2FinishedState));

        During(Step2FinishedState,
            When(ProcessingFinishedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .Finalize());
    }

    public State ProcessingStartedState { get; }
    public State Step1StartedState { get; }
    public State Step1FinishedState { get; }
    public State Step2StartedState { get; }
    public State Step2FinishedState { get; }

    public Event<StartProcessingMessage> StartProcessingEvent { get; }
    public Event<StartStep1Message> Step1StartedEvent { get; }
    public Event<Step1FinishedMessage> Step1FinishedEvent { get; }
    public Event<StartStep2Message> Step2StartedEvent { get; }
    public Event<Step2FinishedMessage> Step2FinishedEvent { get; }
    public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}

以及MassTransit的设置:

        var rabbitHost = Configuration["RABBIT_MQ_HOST"];

        if (rabbitHost.IsNotEmpty())
        {
            services.AddMassTransit(cnf =>
            {
                var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];

                cnf.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
                    .Endpoint(e => e.Name = BusConstants.SagaQueue)
                    .MongoDbRepository(connectionString, r =>
                    {
                        r.DatabaseName = "mongo";
                        r.CollectionName = "WorkflowState";
                    });

                cnf.AddConsumer(typeof(TransitionConsumer));
                cnf.AddConsumer(typeof(Step1Consumer), typeof(Step1ConsumerDefinition));
                cnf.AddConsumer(typeof(Step2Consumer));
                cnf.AddConsumer(typeof(ProcessingFinishedConsumer));
                cnf.AddConsumer(typeof(FaultConsumer));

                //cnf.AddMessageScheduler(schedulerEndpoint);

                cnf.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host(new Uri(rabbitHost), hst =>
                    {
                        hst.Username("guest");
                        hst.Password("guest");
                    });

                    //cfg.UseMessageScheduler(schedulerEndpoint);

                    cfg.ConfigureEndpoints(context);
                });
            });

            services.AddMassTransitHostedService();

            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
            });
        }

所以实际上消息流应该是这样的:

  1. 控制器发布StartProcessingMessageTransitionConsumer 使用该TransitionConsumer,后者又发布StartStep1Message
  2. Step1Consumer 收到消息,完成其工作并发布 Step1FinishedMessage
  3. TransitionConsumer 获取消息并发布StartStep2Message
  4. Step2Consumer 收到消息,完成其工作并发布 Step2FinishedMessage
  5. TransitionConsumer 获取消息并发布ProcessingFinishedMessage,由ProcessingFinishedConsumer 使用。

在这种情况下,Step1ConsumerStep2Consumer 都不知道另一个的存在,并且步骤之间转换的唯一责任是由 TransitionConsumer 协调的。所有这些都是在状态机跟踪每条消息并通过所有受尊重的状态时完成的。

问题从一开始就出现了,因为TransitionConsumerArcStateMachine 开始处理我认为之前会被解雇的StartProcessingEvent 之前发布了StartStep1Message。所有这些都会导致状态机卡在ProcessingStartedState 中的情况。结果是,在发布Step1FinishedEvent 时,机器不在Step1StartedState 中,因为StartStep1Message 消息应该会触发Step1StartedEvent

我该如何解决这个问题?

【问题讨论】:

    标签: c# exception state-machine masstransit orchestration


    【解决方案1】:

    您应该为您的状态机创建一个Saga Definition,以便您可以配置消息重试和内存中的发件箱。

    在该定义中,将重试/发件箱直接添加到接收端点,如下所示。

    endpointConfigurator.UseMessageRetry(r => r.Interval(3,1000));
    endpointConfigurator.UseInMemoryOutbox();
    

    这应该处理 saga 中的任何并发问题(消费者可能正在接收消息,产生事件,并且该事件在 saga 完成处理触发消费者命令的事件之前被分派到 saga . 是的,就是这么快。

    【讨论】:

    • 你是国王,@Chris。很快我将发布完整的工作解决方案。我还使用了您文档示例中的endpointConfigurator.CreatePartitioner(16),因为我听说它对 mongoDB 性能有好处。
    • 只剩下一个小问题。我仍然在 RMQ 的 Step2_skipped 队列中收到 1 条消息。我应该担心吗?这正常吗?这能消除吗?
    • 消息是什么(您可以在 RMQ 控制台中查看内容)?跳过通常意味着“不被消费者消费”。
    • 它显示:“sourceAddress”:“rabbitmq://localhost/Step1”,“destinationAddress”:“rabbitmq://localhost/....Messages:Step1FinishedMessage”,原因已经死了-信件。但是TransitionConsumer 确实使用了这种类型的消息,并且我确实在消费者和状态机中都遇到了断点。那可能是什么原因呢?
    猜你喜欢
    • 2016-01-16
    • 1970-01-01
    • 2012-07-31
    • 1970-01-01
    • 2020-06-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-07-16
    相关资源
    最近更新 更多