【发布时间】:2018-07-12 04:25:37
【问题描述】:
我试图通过遵循与在 NServiceBus 中实现 Sagas 的方式类似的模式,在不使用自动状态机(我开始使用但发现很难正确进行单元测试)的情况下手动启动 Saga。
但是,我遇到了一个问题,即在初始消息创建实例之前,消息正在触发 saga,这会导致 MassTransit 默默地吞下消息,而不会引发异常或将消息移动到错误队列中。
在尝试解决这个问题时,很多人建议使用OnMissingInstance 来错误消息,然后依靠重试框架有效地延迟它,直到 saga 被正确初始化。
See Here.
我想知道是否有一种方法可以在不使用 Automatonymous 框架的情况下做到这一点,很可能是通过利用一些中间件来抢先检查以确保在消息之前存在 Saga(抛出异常并稍后重试)试图处理它?如果不是,这听起来像是有用/可能的吗?
更多信息:
-使用 Azure 服务总线
- 用于 Saga Persistence 的 MongoDB。
一些代码简化了我想要的代码sn-ps:
ec.Saga(new MongoDbSagaRepository<CodeProviderSaga>(sagaDatabase, new MongoDbSagaConsumeContextFactory(), nameof(CodeProviderSaga)), config =>
{
config.UseApplicationInsights(telemetryClient);
config.UseRetry(retryConfig => retryConfig.Exponential(10, TimeSpan.FromMilliseconds(500), TimeSpan.FromMinutes(5), TimeSpan.FromSeconds(1)));
});
public class CodeProviderSaga :
InitiatedBy<FirstEvent>,
Orchestrates<SecondEvent>,
IVersionedSaga
{
[BsonId]
public Guid CorrelationId { get; set; }
public int Version { get; set; }
public bool SecondEventRecieved { get; set; }
public DateTimeOffset? LastUpdated { get; set; }
public Task Consume(ConsumeContext<FirstEvent> context)
{
this.LastUpdated = DateTimeOffset.UtcNow;
return Task.Completed;
}
// An exception should be thrown if this is received first so it can be retried but it is silently dropped atm
public Task Consume(ConsumeContext<SecondEvent> context)
{
this.SecondEventRecieved = true;
this.LastUpdated = DateTimeOffset.UtcNow;
return Task.Completed;
}
}
【问题讨论】:
-
当前代码似乎不可能,因为无法为
Orchestrates接口指定缺失的管道。可能值得创建一个问题以添加配置它的能力。 -
@ChrisPatterson 感谢您回复我 - 我在 GitHub 上提出了这个 issue - 稍后我将在存储库中进行潜水,看看我是否也能想出一些东西
标签: middleware masstransit saga automatonymous