【问题标题】:MassTransit 5.2, SignalR: How can i get the IHubContext inside my Consumer?MassTransit 5.2,SignalR:如何在我的消费者中获取 IHubContext?
【发布时间】:2019-01-14 12:18:51
【问题描述】:

我的主要问题是获取正确的 SignalR 集线器实例。

上下文:我正在构建一个与几个外部系统通信的 Web 应用程序。我的应用程序中的 CRUD 操作会导致更新外部系统的数据库。

在这个例子中,我运行了 3 个服务:

外部系统 |状态机 | .NET CORE WebAPI

当我发布“创建员工”表单时,RabbitMQ 消息将从 WebAPI 发送到状态机。然后状态机向我的外部系统服务发送几条创建消息来更新数据库。此后,它更新状态机以跟踪创建操作。

表单 -> API -> StateMachine -> ExternalSystem -> StateMachine -> API

到目前为止一切顺利。现在我想使用 SignalR 将状态更新发送给客户端。所以我在 API 中实现了这个消费者:

public class UpdatesConsumer :
    IConsumer<IExternalSystemUpdateMessage>
{
    private readonly IHubContext<UpdatesHub> _updaterHubContext;

    public UpdatesConsumer(IHubContext<UpdatesHub> hubContext)
    {
        _updaterHubContext = hubContext;
    }

    public Task Consume(ConsumeContext<IExternalSystemUpdateMessage> context)
    {
        //return _updaterHubContext.Clients.Group(context.Message.CorrelationId.ToString()).SendAsync("SEND_UPDATE", context.Message.Message);// this.SendUpdate(context.Message.CorrelationId, context.Message.Message);
        return _updaterHubContext.Clients.All.SendAsync("SEND_UPDATE", context.Message.Message);
    }
}

这是我的 SignalR 集线器:

public class UpdatesHub :
    Hub
{
    public Task SendUpdate(Guid correlationId, string message)
    {
        return Clients.Group(correlationId.ToString()).SendAsync("SEND_UPDATE", message);
    }
}

这就是总线和消费者的实例化方式:

    public void ConfigureServices(IServiceCollection services)
    {
        _services = services;

        services.AddMvc();
        services.AddSignalR();            
        //services.AddSingleton<IHubContext<UpdatesHub>>();

        WebAPI.CreateBus();
    }

    public static IServiceCollection _services;

    static IBusControl _busControl;
    public static IBusControl Bus
    {
        get
        {
            return _busControl;
        }
    }

    public static void CreateBus()
    {
        IRMQConnection rmqSettings = Config.GetRMQConnectionConfig("rmq-settings.json", "connection");

        _busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(x =>
        {
            var host = x.Host(BusInitializer.GetUri("", rmqSettings), h =>
            {
                h.Username(rmqSettings.UserName);
                h.Password(rmqSettings.Password);
            });

            x.ReceiveEndpoint(host, "externalsystems.update",
                e => { e.Consumer(() => new UpdatesConsumer((IHubContext<UpdatesHub>)Startup.__serviceProvider.GetService(typeof(IHubContext<UpdatesHub>)))); });
        });

        TaskUtil.Await(() => _busControl.StartAsync());
    }

================================================ ============================

所以问题是我的 Consumer 类中的 _updaterHubContext.Clients 总是为空。我已经测试了在控制器中访问集线器,并且客户端确实出现了:

public class TestController : Controller
{
    private readonly IHubContext<UpdatesHub> _hubContext;
    public TestController(IHubContext<UpdatesHub> hubContext)
    {
        _hubContext = hubContext;
    }

    [HttpGet]
    [Route("api/Test/")]
    public IActionResult Index()
    {
        return View();
    }
}

如何在 Consumer 类中获得正确的集线器实例?或者我如何访问 .net 正在使用的 IServiceCollection?

提前谢谢!

【问题讨论】:

    标签: asp.net-core signalr masstransit


    【解决方案1】:

    您可以注册您的消费者,以便 MassTransit 使用 MassTransit.Extensions.DependencyInjection 包中提供的支持从 IServiceProvider 解决它。

    x.ReceiveEndpoint(host, "externalsystems.update", e => 
    {
        e.Consumer<UpdatesConsumer>(_serviceProvider);
    });
    

    请务必在容器中注册您的UpdatesConsumer。这应该为端点上收到的每条消息解析一个新的消费者实例。

    【讨论】:

    • 谢谢!这就是答案。
    【解决方案2】:

    为什么不使用Microsoft Dependency Injection 注册总线。它应该可以解决您的问题,它将使用 IServiceProvider 解决您的消费者

    【讨论】:

    • 感谢示例页面。它有助于我的解决方案的优雅。尤其是链接中的示例项目
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-12-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多