【发布时间】:2020-11-13 10:15:43
【问题描述】:
我有一系列由 Saga
协调的活动从控制器中,我将数据保存在 db 中,然后将事件发布到我的 saga:
[ApiController]
public class MyController : ControllerBase
{
private readonly IProductService _productService ;
private readonly IMediator _mediator;
public MyController(IProductService productService, IMediator mediator)
{
_productService = productService;
_mediator = mediator;
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] ProductContract productContract)
{
try
{
var result = await _productService.DoSomeThingAsync(productContract);
await _mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = NewId.NextGuid(), result.Label });
return Ok();
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}
}
}
Saga 消耗第一个事件然后发送新命令:
public class ProductSaga:
ISaga,
InitiatedBy<ProductSubmittedEvent>,
Orchestrates<AcecptedFilterEvent>,
Orchestrates<RefusedFilterEvent>
{
public Guid CorrelationId { get; set; }
public string State { get; private set; } = "Not Started";
public readonly Uri filterEndpoint = new Uri(
$"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(FilterCommand))}");
public async Task Consume(ConsumeContext<ApiSubmittedEvent> context)
{
//Send new command to filter step
var sendEndpoint = await context.GetSendEndpoint(filterEndpoint);
await sendEndpoint.Send<FilterCommand>(new { CorrelationId, context.Message.Label});
}
// I have two other events to consume according to command result
public Task Consume(ConsumeContext<AcecptedFilterEvent> context)
{
//if OK send new command to next step
//...
return Task.CompletedTask;
}
public Task Consume(ConsumeContext<RefusedFilterEvent> context)
{
//if FilterCommand refused do it again
//...
return Task.CompletedTask;
}
}
这里是FilterCommand 的消费者代码:
public class FilterCommandConsumer : IConsumer<FilterCommand>
{
private readonly ILogger<FilterCommand> _logger;
public FilterCommandConsumer (ILogger<FilterCommand> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<FilterCommand> context)
{
_logger?.LogInformation($"Consuming FilterCommand- {context.Message.CorrelationId}");
try
{
//call a service to handle the filtering
//...
//here the problem: when publish thread exit and app still run to infinity
await context.Publish<AcecptedFilterEvent>(new
{
CorrelationId = context.CorrelationId.Value,
State = "AcecptedFilter"
});
}
catch (Exception ex)
{
await context.Publish<RefusedFilterEvent>(new
{
CorrelationId = context.CorrelationId.Value,
State = "RefusedFilter",
Error = ex.Message
});
throw;
}
}
}
问题:
使用我的命令时,我想发布新事件AcecptedFilterEvent & RefusedFilterEvent,我无法发布:
- 应用仍在运行,没有任何结果
- 线程退出
我想发布一个事件并在Saga 上使用它以开始下一步。
我尝试在我的消费者类中注入 IMediator 以使用 _mediator.Publish() 发布消息,但我遇到了同样奇怪的行为。
这是我的startup.cs 配置:
//configure MassTransit
services.AddMediator(cfg =>
{
cfg.AddConsumersFromNamespaceContaining<FilterCommandConsumer>();
cfg.AddSaga<ProductSaga>().InMemoryRepository();
});
如果你有任何推荐想法感谢分享和挑战我????
【问题讨论】:
标签: publish-subscribe masstransit mediator