【发布时间】:2021-12-15 02:41:29
【问题描述】:
我正在尝试使用 MassTransit 状态机制作一个完全有效的示例,以协调完全解耦的服务并遇到异常:The Step1FinishedEvent Event event is not handled during the ProcessingStartedState state for the ArcStateMachine state machine error。在调试会话期间,消息(由消费者使用)似乎触发了由状态机处理的事件为时已晚。
我的定义:
// StateMachine processing instance definition
[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public int Version { get; set; }
public Guid ActivationId { get; set; }
}
// Contracts correlations definitions for message exchange of the state machine
public static class MessageContracts
{
static bool _initialized;
public static void Initialize()
{
if (_initialized)
return;
GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<StartStep1Message>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<Step1FinishedMessage>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<StartStep2Message>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<Step2FinishedMessage>(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);
_initialized = true;
}
}
// Step1CounsumerDefinition to avoid fault messages be routed to _error RMQ queue for Step1Consumer
public class Step1ConsumerDefinition : ConsumerDefinition<Step1Consumer>
{
public Step1ConsumerDefinition()
{
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<Step1Consumer> consumerConfigurator)
{
endpointConfigurator.DiscardFaultedMessages();
}
}
我的 2 个实际处理消费者:
public class Step1Consumer : IConsumer<StartStep1Message>
{
readonly ILogger<Step1Consumer> _Logger;
private readonly int _DelaySeconds = 5;
public Step1Consumer(ILogger<Step1Consumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<StartStep1Message> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Step 1 started: {activationId}");
await Task.Delay(_DelaySeconds * 1000);
_Logger.LogInformation($"Step 2 finished: {activationId}");
await context.Publish<Step1FinishedMessage>(new { ActivationId = activationId });
}
}
public class Step2Consumer : IConsumer<StartStep2Message>
{
readonly ILogger<Step2Consumer> _Logger;
private readonly int _DelaySeconds = 1;
public Step2Consumer(ILogger<Step2Consumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<StartStep2Message> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Step 2 started {activationId}");
await Task.Delay(_DelaySeconds * 1000);
_Logger.LogInformation($"Step 2 finished {activationId}");
await context.Publish<Step2FinishedMessage>(new { ActivationId = activationId });
}
}
我还有 2 个辅助消费者来协调不同服务之间的消息转换,以将它们解耦并检测所有处理的完成情况:
public class TransitionConsumer :
IConsumer<StartProcessingMessage>,
IConsumer<Step1FinishedMessage>,
IConsumer<Step2FinishedMessage>
{
readonly ILogger<TransitionConsumer> _Logger;
private readonly int _DelaySeconds = 5;
public TransitionConsumer(
ILogger<TransitionConsumer> logger
)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<StartProcessingMessage> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Transition from Started to Step 1: {activationId}");
await context.Publish<StartStep1Message>(new { ActivationId = activationId });
}
public async Task Consume(ConsumeContext<Step1FinishedMessage> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Transition from Step 1 to Step 2: {activationId}");
await context.Publish<StartStep2Message>(new { ActivationId = activationId });
}
public async Task Consume(ConsumeContext<Step2FinishedMessage> context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Transition from Step 2 to Completion: {activationId}");
await context.Publish<ProcessingFinishedMessage>(new { ActivationId = activationId });
}
}
public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
readonly ILogger<ProcessingFinishedConsumer> _Logger;
public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
{
_Logger.LogInformation($"Finish {context.Message.ActivationId}");
await Task.CompletedTask;
}
}
还有一个Fault<> 消费者,负责处理可能来自Step1Consumer 和Step2Consumer 的所有故障:
public class FaultConsumer :
IConsumer<Fault<StartStep1Message>>,
IConsumer<Fault<StartStep2Message>>
{
readonly ILogger<FaultConsumer> _Logger;
public FaultConsumer(ILogger<FaultConsumer> logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext<Fault<StartStep1Message>> context)
{
await LogError("Step 1", context.Message.Message.ActivationId, context.Message.Exceptions);
}
public async Task Consume(ConsumeContext<Fault<StartStep2Message>> context)
{
await LogError("Step 2", context.Message.Message.ActivationId, context.Message.Exceptions);
}
private async Task LogError(string step, Guid activationId, ExceptionInfo[] exceptions)
{
var errorMessages = string.Join(", ", exceptions.Select(e => e.Message));
_Logger.LogInformation($"{step} failed for {activationId}, cause: {errorMessages}");
}
}
这是状态机定义:
public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
static ArcStateMachine()
{
MessageContracts.Initialize();
}
public ArcStateMachine()
{
InstanceState(x => x.CurrentState);
Initially(
When(StartProcessingEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(ProcessingStartedState));
During(ProcessingStartedState,
When(Step1StartedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(Step1StartedState));
During(Step1StartedState,
When(Step1FinishedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(Step1FinishedState));
During(Step1FinishedState,
When(Step2StartedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(Step2StartedState));
During(Step2StartedState,
When(Step2FinishedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(Step2FinishedState));
During(Step2FinishedState,
When(ProcessingFinishedEvent)
.Then(context =>
{
context.Instance.ActivationId = context.Data.ActivationId;
})
.Finalize());
}
public State ProcessingStartedState { get; }
public State Step1StartedState { get; }
public State Step1FinishedState { get; }
public State Step2StartedState { get; }
public State Step2FinishedState { get; }
public Event<StartProcessingMessage> StartProcessingEvent { get; }
public Event<StartStep1Message> Step1StartedEvent { get; }
public Event<Step1FinishedMessage> Step1FinishedEvent { get; }
public Event<StartStep2Message> Step2StartedEvent { get; }
public Event<Step2FinishedMessage> Step2FinishedEvent { get; }
public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}
以及MassTransit的设置:
var rabbitHost = Configuration["RABBIT_MQ_HOST"];
if (rabbitHost.IsNotEmpty())
{
services.AddMassTransit(cnf =>
{
var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];
cnf.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
.Endpoint(e => e.Name = BusConstants.SagaQueue)
.MongoDbRepository(connectionString, r =>
{
r.DatabaseName = "mongo";
r.CollectionName = "WorkflowState";
});
cnf.AddConsumer(typeof(TransitionConsumer));
cnf.AddConsumer(typeof(Step1Consumer), typeof(Step1ConsumerDefinition));
cnf.AddConsumer(typeof(Step2Consumer));
cnf.AddConsumer(typeof(ProcessingFinishedConsumer));
cnf.AddConsumer(typeof(FaultConsumer));
//cnf.AddMessageScheduler(schedulerEndpoint);
cnf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), hst =>
{
hst.Username("guest");
hst.Password("guest");
});
//cfg.UseMessageScheduler(schedulerEndpoint);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
});
}
所以实际上消息流应该是这样的:
- 控制器发布
StartProcessingMessage,TransitionConsumer使用该TransitionConsumer,后者又发布StartStep1Message。 -
Step1Consumer收到消息,完成其工作并发布Step1FinishedMessage。 -
TransitionConsumer获取消息并发布StartStep2Message。 -
Step2Consumer收到消息,完成其工作并发布Step2FinishedMessage。 -
TransitionConsumer获取消息并发布ProcessingFinishedMessage,由ProcessingFinishedConsumer使用。
在这种情况下,Step1Consumer 和 Step2Consumer 都不知道另一个的存在,并且步骤之间转换的唯一责任是由 TransitionConsumer 协调的。所有这些都是在状态机跟踪每条消息并通过所有受尊重的状态时完成的。
问题从一开始就出现了,因为TransitionConsumer 在ArcStateMachine 开始处理我认为之前会被解雇的StartProcessingEvent 之前发布了StartStep1Message。所有这些都会导致状态机卡在ProcessingStartedState 中的情况。结果是,在发布Step1FinishedEvent 时,机器不在Step1StartedState 中,因为StartStep1Message 消息应该会触发Step1StartedEvent。
我该如何解决这个问题?
【问题讨论】:
标签: c# exception state-machine masstransit orchestration