【问题标题】:Saga handling undeliverable messagesSaga 处理无法投递的消息
【发布时间】:2018-03-11 23:46:17
【问题描述】:

来自文档here

如果端点的配置发生变化,或者如果消息是 错误地发送到端点,消息类型可能是 收到的没有任何连接的消费者。如果发生这种情况, 消息被移动到 _skipped 队列(以原始 队列名称)。保留原消息内容,并附加 添加标头以指示移动消息的主机。

当一些消息试图传递到 saga 但尚未创建 saga 实例时,我是否应该期望 saga 具有相同的行为? 换句话说,如果想在 saga 中处理消息,但该消息不是创建 saga 的触发器。我希望在 _skipped 队列中看到这些消息,但实际上我在 _skipped 或 _error 队列中都看不到它们。我还可以看到,按摩成功地传递到了传奇队列,并且以某种方式成功地被消耗掉了,没有任何警告或错误。谁消费了消息?

更新

状态机:

public class TestState : SagaStateMachineInstance, IVersionedSaga
{
    public Guid CorrelationId { get; set; }
    public Guid Id { get; set; }
    public string CurrentState { get; set; }
    public int Version { get; set; }
    public DateTime TimeStart { get; set; }
    public int Progress { get; set; }
}

public class TestStateMachine : MassTransitStateMachine<TestState>
{
    public TestStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => ProcessingStarted, x => x.CorrelateById(context => context.Message.Id));
        Event(() => ProgressUpdated, x => x.CorrelateById(context => context.Message.Id));
        Event(() => ProcessingFinished, x => x.CorrelateById(context => context.Message.Id));

        Initially(
            When(ProcessingStarted)
                .Then(context =>
                {
                    context.Instance.TimeStart = DateTime.Now;
                })
                .TransitionTo(Processing)
        );

        During(Processing,
            When(ProgressUpdated)
                .Then(context =>
                {
                    context.Instance.Progress = context.Data.Progress;
                }),
            When(ProcessingFinished)
                .TransitionTo(Processed)
                .ThenAsync(async context =>
                {
                    await context.Raise(AllDone);
                })
        );

        During(Processed,
            When(AllDone)
                .Then(context =>
                {
                    Log.Information($"Saga {context.Instance.Id} finished");
                })
                .Finalize());

        SetCompletedWhenFinalized();
    }

    public State Processing { get; set; }
    public State Processed { get; set; }

    public Event AllDone { get; set; }

    public Event<ProcessingStarted> ProcessingStarted { get; set; }
    public Event<ProgressUpdated> ProgressUpdated { get; set; }
    public Event<ProcessingFinished> ProcessingFinished { get; set; }
}

public interface ProcessingStarted
{
    Guid Id { get; }
}

public interface ProgressUpdated
{
    Guid Id { get; }
    int Progress { get; }
}

public interface ProcessingFinished
{
    Guid Id { get; }
}

配置

var repository = new MongoDbSagaRepository<TestState>(Environment.ExpandEnvironmentVariables(configuration["MongoDb:ConnectionString"]), "sagas");

var services = new ServiceCollection();

services.AddSingleton(context => Bus.Factory.CreateUsingRabbitMq(x =>
{
    IRabbitMqHost host = x.Host(new Uri(Environment.ExpandEnvironmentVariables(configuration["MassTransit:ConnectionString"])), h => { });

    x.ReceiveEndpoint(host, "receiver_saga_queue", e =>
    {
        e.StateMachineSaga(new TestStateMachine(), repository);
    });

    x.UseRetry(r =>
    {
        r.Incremental(10, TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(50));
        r.Handle<MongoDbConcurrencyException>();
        r.Handle<UnhandledEventException>();
    });

    x.UseSerilog();
}));

var container = services.BuildServiceProvider();

var busControl = container.GetRequiredService<IBusControl>();

busControl.Start();

后来,当我发布ProgressUpdated事件时

busControl.Publish<ProgressUpdated>(new
{
    Id = id,
    Progress = 50
});

我希望它会引发 UnhandledEventException 并将消息移动到 _error 队列,但实际上我可以看到该消息出现在队列中,以某种方式被消耗,我在 _error 或 _skipped 队列中都看不到它。 MongoDB Saga 存储也没有创建任何新的 saga 实例。

我做错了什么?我需要将 saga 配置为,如果在未创建 saga 实例时收到 ProgressUpdated 事件,稍后将在实例准备好接受它时重试。

解决方案

将事件ProgressUpdatedProcessingFinished重新配置为

Event(() => ProgressUpdated, x =>
{
    x.CorrelateById(context => context.Message.Id);

    x.OnMissingInstance(m => m.Fault());
});
Event(() => ProcessingFinished, x =>
{
    x.CorrelateById(context => context.Message.Id);

    x.OnMissingInstance(m => m.Fault());
});

稍后在UseRetry 中捕获SagaException

r.Handle<SagaException>();

干活了!

感谢@Alexey Zimarev 分享知识!

【问题讨论】:

  • 这不应该是这样的。如果消息应该由 saga 处理但没有匹配的实例,则调用 OnMissingInstance 回调。由于您的问题中没有提供任何一行代码,因此很难说有什么问题。
  • 嗨,Alexey,刚刚用状态机和配置示例更新了帖子。有时间请看一下。
  • 什么消息被忽略了?
  • 你为什么期待UnhandledEventException?尝试配置OnMissingInstance,看看它是否被击中。
  • 是的,这是我的第一条评论 :) 我将您的解决方案放在答案中,最好这样做,而不是在问题中发布答案。

标签: masstransit


【解决方案1】:

如果消息应该由 saga 处理但没有匹配的实例,则调用 OnMissingInstance 回调。

重新配置ProgressUpdatedProcessingFinished

Event(() => ProgressUpdated, x =>
{
    x.CorrelateById(context => context.Message.Id);
    x.OnMissingInstance(m => m.Fault());
});
Event(() => ProcessingFinished, x =>
{
    x.CorrelateById(context => context.Message.Id);
    x.OnMissingInstance(m => m.Fault());
});

并在重试过滤器中捕获SagaException

r.Handle<SagaException>();

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-04-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-09
    • 1970-01-01
    相关资源
    最近更新 更多