【问题标题】:Missing configuration to well publish event from MassTransit consumer to MassTransit Saga (using MassTransit with Mediator)缺少将事件从 MassTransit 消费者发布到 MassTransit Saga 的配置(将 MassTransit 与 Mediator 一起使用)
【发布时间】:2020-11-13 10:15:43
【问题描述】:

我有一系列由 Saga

协调的活动

从控制器中,我将数据保存在 db 中,然后将事件发布到我的 saga:

[ApiController]
public class MyController : ControllerBase
{    
    private readonly IProductService _productService ;
    private readonly IMediator _mediator;

    public MyController(IProductService productService, IMediator mediator)
    {
        _productService = productService;
        _mediator = mediator;
    }

    [HttpPost]
    public async Task<IActionResult> Post([FromBody] ProductContract productContract)
    {            
        try
        {
            var result = await _productService.DoSomeThingAsync(productContract);            
            await _mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = NewId.NextGuid(), result.Label });

            return Ok();
        }
        catch (Exception ex)
        {
            return BadRequest(ex.Message);
        }
    }
}

Saga 消耗第一个事件然后发送新命令:


public class ProductSaga:
    ISaga,
    InitiatedBy<ProductSubmittedEvent>,
    Orchestrates<AcecptedFilterEvent>,
    Orchestrates<RefusedFilterEvent>
{
    public Guid CorrelationId { get; set; }
    public string State { get; private set; } = "Not Started";

    public readonly Uri filterEndpoint = new Uri(
        $"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(FilterCommand))}");

    public async Task Consume(ConsumeContext<ApiSubmittedEvent> context)
    {
        //Send new command to filter step
        var sendEndpoint = await context.GetSendEndpoint(filterEndpoint);
        await sendEndpoint.Send<FilterCommand>(new { CorrelationId, context.Message.Label});
    }

    // I have two other events to consume according to command result    
    public Task Consume(ConsumeContext<AcecptedFilterEvent> context)
    {   
        //if OK send new command to next step
        //...
        return Task.CompletedTask;
    }
    
    public Task Consume(ConsumeContext<RefusedFilterEvent> context)
    {
        //if FilterCommand refused do it again
        //... 
        return Task.CompletedTask;
    }
}

这里是FilterCommand 的消费者代码:


public class FilterCommandConsumer : IConsumer<FilterCommand>
{
    private readonly ILogger<FilterCommand> _logger;

    public FilterCommandConsumer (ILogger<FilterCommand> logger)
    {
        _logger = logger;
    }
    public async Task Consume(ConsumeContext<FilterCommand> context)
    {
        _logger?.LogInformation($"Consuming FilterCommand- {context.Message.CorrelationId}");
       
        try
        {
            //call a service to handle the filtering 
            //...
            
            //here the problem: when publish thread exit and app still run to infinity
            await context.Publish<AcecptedFilterEvent>(new
            {
                CorrelationId = context.CorrelationId.Value,
                State = "AcecptedFilter"
            });
        }
        catch (Exception ex)
        {
            await context.Publish<RefusedFilterEvent>(new
            {
                CorrelationId = context.CorrelationId.Value,
                State = "RefusedFilter",
                Error = ex.Message
            });
            throw;
        }
    }
}

问题:

使用我的命令时,我想发布新事件AcecptedFilterEvent & RefusedFilterEvent,我无法发布:

  • 应用仍在运行,没有任何结果
  • 线程退出

我想发布一个事件并在Saga 上使用它以开始下一步。

我尝试在我的消费者类中注入 IMediator 以使用 _mediator.Publish() 发布消息,但我遇到了同样奇怪的行为。

这是我的startup.cs 配置:


        //configure MassTransit
        services.AddMediator(cfg =>
        {
            cfg.AddConsumersFromNamespaceContaining<FilterCommandConsumer>();                
            cfg.AddSaga<ProductSaga>().InMemoryRepository();
        });

如果你有任何推荐想法感谢分享和挑战我????

【问题讨论】:

    标签: publish-subscribe masstransit mediator


    【解决方案1】:

    因为您使用的是调解器,它使用调用者的异步调用上下文将消息传递给消费者,所以您基本上是在创建一个非常讨厌的调用堆栈并且在 saga 实例上死锁。

    (controller) => (saga) => (consumer) => (saga, oops, 它被之前的 saga 锁定了)

    【讨论】:

    • 好的,谢谢,我明白你的意思了,在这种情况下,你有什么建议来处理这种情况?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多