【发布时间】:2018-03-11 23:46:17
【问题描述】:
来自文档here:
如果端点的配置发生变化,或者如果消息是 错误地发送到端点,消息类型可能是 收到的没有任何连接的消费者。如果发生这种情况, 消息被移动到 _skipped 队列(以原始 队列名称)。保留原消息内容,并附加 添加标头以指示移动消息的主机。
当一些消息试图传递到 saga 但尚未创建 saga 实例时,我是否应该期望 saga 具有相同的行为? 换句话说,如果想在 saga 中处理消息,但该消息不是创建 saga 的触发器。我希望在 _skipped 队列中看到这些消息,但实际上我在 _skipped 或 _error 队列中都看不到它们。我还可以看到,按摩成功地传递到了传奇队列,并且以某种方式成功地被消耗掉了,没有任何警告或错误。谁消费了消息?
更新
状态机:
public class TestState : SagaStateMachineInstance, IVersionedSaga
{
public Guid CorrelationId { get; set; }
public Guid Id { get; set; }
public string CurrentState { get; set; }
public int Version { get; set; }
public DateTime TimeStart { get; set; }
public int Progress { get; set; }
}
public class TestStateMachine : MassTransitStateMachine<TestState>
{
public TestStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => ProcessingStarted, x => x.CorrelateById(context => context.Message.Id));
Event(() => ProgressUpdated, x => x.CorrelateById(context => context.Message.Id));
Event(() => ProcessingFinished, x => x.CorrelateById(context => context.Message.Id));
Initially(
When(ProcessingStarted)
.Then(context =>
{
context.Instance.TimeStart = DateTime.Now;
})
.TransitionTo(Processing)
);
During(Processing,
When(ProgressUpdated)
.Then(context =>
{
context.Instance.Progress = context.Data.Progress;
}),
When(ProcessingFinished)
.TransitionTo(Processed)
.ThenAsync(async context =>
{
await context.Raise(AllDone);
})
);
During(Processed,
When(AllDone)
.Then(context =>
{
Log.Information($"Saga {context.Instance.Id} finished");
})
.Finalize());
SetCompletedWhenFinalized();
}
public State Processing { get; set; }
public State Processed { get; set; }
public Event AllDone { get; set; }
public Event<ProcessingStarted> ProcessingStarted { get; set; }
public Event<ProgressUpdated> ProgressUpdated { get; set; }
public Event<ProcessingFinished> ProcessingFinished { get; set; }
}
public interface ProcessingStarted
{
Guid Id { get; }
}
public interface ProgressUpdated
{
Guid Id { get; }
int Progress { get; }
}
public interface ProcessingFinished
{
Guid Id { get; }
}
配置
var repository = new MongoDbSagaRepository<TestState>(Environment.ExpandEnvironmentVariables(configuration["MongoDb:ConnectionString"]), "sagas");
var services = new ServiceCollection();
services.AddSingleton(context => Bus.Factory.CreateUsingRabbitMq(x =>
{
IRabbitMqHost host = x.Host(new Uri(Environment.ExpandEnvironmentVariables(configuration["MassTransit:ConnectionString"])), h => { });
x.ReceiveEndpoint(host, "receiver_saga_queue", e =>
{
e.StateMachineSaga(new TestStateMachine(), repository);
});
x.UseRetry(r =>
{
r.Incremental(10, TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(50));
r.Handle<MongoDbConcurrencyException>();
r.Handle<UnhandledEventException>();
});
x.UseSerilog();
}));
var container = services.BuildServiceProvider();
var busControl = container.GetRequiredService<IBusControl>();
busControl.Start();
后来,当我发布ProgressUpdated事件时
busControl.Publish<ProgressUpdated>(new
{
Id = id,
Progress = 50
});
我希望它会引发 UnhandledEventException 并将消息移动到 _error 队列,但实际上我可以看到该消息出现在队列中,以某种方式被消耗,我在 _error 或 _skipped 队列中都看不到它。 MongoDB Saga 存储也没有创建任何新的 saga 实例。
我做错了什么?我需要将 saga 配置为,如果在未创建 saga 实例时收到 ProgressUpdated 事件,稍后将在实例准备好接受它时重试。
解决方案
将事件ProgressUpdated和ProcessingFinished重新配置为
Event(() => ProgressUpdated, x =>
{
x.CorrelateById(context => context.Message.Id);
x.OnMissingInstance(m => m.Fault());
});
Event(() => ProcessingFinished, x =>
{
x.CorrelateById(context => context.Message.Id);
x.OnMissingInstance(m => m.Fault());
});
稍后在UseRetry 中捕获SagaException
r.Handle<SagaException>();
干活了!
感谢@Alexey Zimarev 分享知识!
【问题讨论】:
-
这不应该是这样的。如果消息应该由 saga 处理但没有匹配的实例,则调用
OnMissingInstance回调。由于您的问题中没有提供任何一行代码,因此很难说有什么问题。 -
嗨,Alexey,刚刚用状态机和配置示例更新了帖子。有时间请看一下。
-
什么消息被忽略了?
-
你为什么期待
UnhandledEventException?尝试配置OnMissingInstance,看看它是否被击中。 -
是的,这是我的第一条评论 :) 我将您的解决方案放在答案中,最好这样做,而不是在问题中发布答案。
标签: masstransit