【问题标题】:Get Tenant Id from RabbitMq Message for Db Connection从 RabbitMq 消息中获取租户 ID 以进行 Db 连接
【发布时间】:2019-05-30 05:35:49
【问题描述】:

我有一个微服务架构,其中包含 ASP.Net Core 应用程序和 RabbitMq 作为微服务之间的事件总线。
我也想支持多租户。
因此,我在Startup.cs 中定义了以下依赖注入服务,以根据用户的租户 ID 在每个请求上打开与数据库的连接。

services.AddScoped<IDocumentSession>(ds =>
            {
                var store = ds.GetRequiredService<IDocumentStore>();
                var httpContextAccessor = ds.GetRequiredService<IHttpContextAccessor>();
                var tenant = httpContextAccessor?.HttpContext?.User?.Claims.FirstOrDefault(c => c.Type == "tid")?.Value;
                return tenant != null ? store.OpenSession(tenant) : store.OpenSession();
            });

问题在于服务何时处理事件总线消息(如 UserUpdatedEvent)。
在那种情况下,当它尝试打开 Db 连接时,它显然没有来自 http 上下文的用户信息。

在注入作用域服务并使用 RabbitMq 处理事件时,如何发送/访问相应用户的租户 ID?

或者改写我的问题: 执行依赖注入代码时,有什么方法可以访问 RabbitMQ 消息(例如它的标头)?

【问题讨论】:

    标签: events asp.net-core dependency-injection rabbitmq multi-tenant


    【解决方案1】:

    由于没有HttpContext,因为RabbitMq请求不是Http请求,正如@istepaniuk的回答中指出的那样,我创建了自己的上下文并将其命名为AmqpContext

    public interface IAmqpContext
        {
            void ClearHeaders();
            void AddHeaders(IDictionary<string, object> headers);
            string GetHeaderByKey(string headerKey);
        }
    
        public class AmqpContext : IAmqpContext
        {
            private readonly Dictionary<string, object> _headers;
    
            public AmqpContext()
            {
                _headers = new Dictionary<string, object>();
            }
    
            public void ClearHeaders()
            {
                _headers.Clear();
            }
    
            public void AddHeaders(IDictionary<string, object> headers)
            {
                foreach (var header in headers)
                    _headers.Add(header.Key, header.Value);
            }
    
            public string GetHeaderByKey(string headerKey) 
            {
                if (_headers.TryGetValue(headerKey, out object headerValue))
                {
                    return Encoding.Default.GetString((byte[])headerValue);
                }
                return null;
            }
        }
    

    在发送 RabbitMq 消息时,我通过如下标头发送租户 ID:

                        var properties = channel.CreateBasicProperties();
                        if (tenantId != null)
                        {
                            var headers = new Dictionary<string, object>
                            {
                                { "tid", tenantId }
                            };
                            properties.Headers = headers;
                        }
    
                        channel.BasicPublish(exchange: BROKER_NAME,
                                         routingKey: eventName,
                                         mandatory: true,
                                         basicProperties: properties,
                                         body: body);
    

    然后,在接收服务上,我将AmqpContext 注册为Startup.cs 中的范围服务:

    services.AddScoped<IAmqpContext, AmqpContext>();
    

    当接收到 RabbitMq 消息时,在消费者通道中,一个范围和 Amqp 上下文被创建:

    consumer.Received += async (model, ea) =>
                {
                    var eventName = ea.RoutingKey;
                    var message = Encoding.UTF8.GetString(ea.Body);
                    var properties = ea.BasicProperties;
    
                    using (var scope = _serviceProvider.CreateScope())
                            {
                                var amqpContext = scope.ServiceProvider.GetService<IAmqpContext>();
                                if (amqpContext != null)
                                {
                                    amqpContext.ClearHeaders();
                                    if (properties.Headers != null && amqpContext != null)
                                    {
                                        amqpContext.AddHeaders(properties.Headers);
                                    }
                                }
                                var handler = scope.ServiceProvider.GetService(subscription.HandlerType);
                                if (handler == null) continue;
                                var eventType = _subsManager.GetEventTypeByName(eventName);
                                var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                                var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                                await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                            }
    
                    channel.BasicAck(ea.DeliveryTag, multiple: false);
                };
    

    然后在创建作用域 Db 连接服务时(请参阅我的问题),我可以从消息头访问租户 ID:

        services.AddScoped<IDocumentSession>(ds =>
        {
            var store = ds.GetRequiredService<IDocumentStore>();
            string tenant = null;
            var httpContextAccessor = ds.GetRequiredService<IHttpContextAccessor>();
            if (httpContextAccessor.HttpContext != null)
            {
                tenant = httpContextAccessor.HttpContext.User?.Claims.FirstOrDefault(c => c.Type == "tid")?.Value;
            }
            else
            {
                var amqpContext = ds.GetRequiredService<IAmqpContext>();
                tenant = amqpContext.GetHeaderByKey("tid");
            }
            return tenant != null ? store.OpenSession(tenant) : store.OpenSession();
        });
    

    【讨论】:

      【解决方案2】:

      你不能

      或者也许,但如果您的设计依赖于 HTTP 上下文,则不是。正如 .NET documentation on service lifetime 所述:

      范围内的生命周期服务为每个客户端请求创建一次 (连接)。

      所以从您的 (HTTP) 服务的角度来看,请求是一个入口点,它使用容器魔法通过全局 HTTP 上下文,每个请求设置您的数据库,在您的任何业务逻辑之前。这似乎不是最好的设计选择,尤其是如果您计划在 HTTP 请求之外使用相同的逻辑。

      相比之下,您的消息消费者服务是长期运行的;在这个生命周期中,如果您的连接设置需要来自每条消息(租户 ID)的信息,那么您不能仅依赖依赖注入。

      “正确”的方法是不依赖 HTTP 上下文中的全局状态来建立数据库连接。改为设置适用于所有租户的数据库上下文。

      【讨论】:

      • 我的设计不依赖于 HTTP 上下文。即使在依赖注入期间访问 rabbitmq 消息也会有所帮助。
      • @Palmi 准确地说,注入不会在每个消息中发生(因为它会在每个请求中发生)。您不能为此使用范围服务。也许这个其他问题会更清楚一些:stackoverflow.com/questions/23535569/…
      • 是的。我一直认为消息基本上是一个请求。不知何故,ASP Net Core 将消息视为请求,因为每次处理消息时都会创建作用域服务...
      • @Palmi,我想看看它是如何连接的,我想你必须手动 CreateScope() 才能做到这一点。就像这里的答案一样(也可能对你有用)stackoverflow.com/questions/49728884/…
      • 等等。是的。你说的对。处理 RabbitMQ 消息时有一个 CreateScope()。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-18
      • 1970-01-01
      • 2016-02-06
      • 1970-01-01
      相关资源
      最近更新 更多