【问题标题】:Rebus multiple Queues based on content根据内容重排多个队列
【发布时间】:2017-04-13 06:46:14
【问题描述】:

设置:在 asp.net mvc 项目中使用 SimpleInjector 进行 Rebus。

我需要创建两个接收消息的处理程序,每个处理程序都来自一个特定的队列。按照我在SO answer 上找到的内容,我创建了类似的代码。

在一个类库中,我有一个实现 SimpleInjector IPackage 的类,其代码如下:

public void RegisterServices( Container container ) {
    container.Register<IHandleMessages<MyMessage>, MyMessageHandler>( Lifestyle.Scoped );
    IContainerAdapter adapter = new SimpleInjectorContainerAdapter( container );
    Configure.With( adapter )
             .Transport( t => t.UseAzureServiceBus( connectionString, "A_QUEUE_NAME", AzureServiceBusMode.Standard ) )
             .Options( oc => {
                 oc.SetNumberOfWorkers( 1 );
                 oc.SimpleRetryStrategy( errorQueueAddress: "A_ERROR_QUEUE_NAME", maxDeliveryAttempts: 3 );
             } )
             .Start();

    Configure.With(adapter
             .Transport(t => t.UseAzureServiceBus(connectionString, "B_QUEUE_NAME")
             .Options( oc => {
                 oc.SetNumberOfWorkers( 1 );
                 oc.SimpleRetryStrategy( errorQueueAddress: "B_ERROR_QUEUE_NAME", maxDeliveryAttempts: 3 );
             } )
             .Start();
}

但是,当调试器到达第二个 Configure.With( ... ) 调用时,i 会以错误提示终止:

类型 IBus 已注册。如果您的意图是解析 IBus 实现的集合,请使用 RegisterCollection 重载。更多信息:https://simpleinjector.org/coll1。如果您打算用此新注册替换现有注册,则可以通过将 Container.Options.AllowOverridingRegistrations 设置为 true 来允许覆盖当前注册。更多信息:https://simpleinjector.org/ovrrd

堆栈跟踪:

[InvalidOperationException: Type IBus has already been registered. If your intention is to resolve a collection of IBus implementations, use the RegisterCollection overloads. More info: https://simpleinjector.org/coll1. If your intention is to replace the existing registration with this new registration, you can allow overriding the current registration by setting Container.Options.AllowOverridingRegistrations to true. More info: https://simpleinjector.org/ovrrd.]
   SimpleInjector.Internals.NonGenericRegistrationEntry.ThrowWhenTypeAlreadyRegistered(InstanceProducer producer) +102
   SimpleInjector.Internals.NonGenericRegistrationEntry.Add(InstanceProducer producer) +59
   SimpleInjector.Container.AddInstanceProducer(InstanceProducer producer) +105
   SimpleInjector.Container.AddRegistrationInternal(Type serviceType, Registration registration) +69
   SimpleInjector.Container.AddRegistration(Type serviceType, Registration registration) +131
   SimpleInjector.Container.RegisterSingleton(TService instance) +183
   Rebus.SimpleInjector.SimpleInjectorContainerAdapter.SetBus(IBus bus) +55
   Rebus.Config.RebusConfigurer.Start() +2356
   MyModule.RegisterServices(Container container) +497
   SimpleInjector.PackageExtensions.RegisterPackages(Container container, IEnumerable`1 assemblies) +50
   Myproject.SimpleInjectorInitializer.InitializeContainer(Container container) +35
   Myproject.SimpleInjectorInitializer.Initialize() +68
   Myproject.Startup.Configuration(IAppBuilder app) +28

编辑

然后我删除了第二个Configure.With( ... ) 代码块,现在当我执行_bus.Send( message ) 时,我在消费者进程中遇到另一个错误,上面写着

处理 ID 为 fef3acca-97f4-4495-b09d-96e6c9f66c4d 的消息时未处理的异常 1:SimpleInjector.ActivationException:找不到类型 IEnumerable> 的注册。但是,有 IHandleMessages 的注册;您是要调用 GetInstance>() 还是依赖 IHandleMessages?还是您的意思是使用 RegisterCollection 注册类型的集合?

堆栈跟踪:

2017-04-13 10:21:03,805 [77] WARN  Rebus.Retry.ErrorTracking.InMemErrorTracker - 
   at SimpleInjector.Container.ThrowMissingInstanceProducerException(Type serviceType)
   at SimpleInjector.Container.GetInstanceForRootType[TService]()
   at SimpleInjector.Container.GetInstance[TService]()
   at SimpleInjector.Container.GetAllInstances[TService]()
   at Rebus.SimpleInjector.SimpleInjectorContainerAdapter.<GetHandlers>d__3`1.MoveNext()

【问题讨论】:

    标签: asp.net-mvc simple-injector azure-servicebus-queues rebus


    【解决方案1】:

    我通常建议每个容器实例只保留一个 IBus,因为总线本身可以被视为“应用程序”,这恰好与 IoC 容器是可以“托管”在其生命周期内应用。

    Rebus 不提供 Conforming Container 抽象,因为我同意 Mark Seemann 的观点,即这是一个注定要失败的项目。事实上,正如the wiki page mentions,Rebus 曾经提供处理程序的自动注册,但结果证明是有问题的。

    相反,Rebus 鼓励您提供“容器适配器”(IContainerAdapter 的实现),其职责是:

    • 查找处理程序
    • 提供一种以正确方式注册IBusIMessageContext 的方法

    为 Autofac、Castle Windsor、SimpleInjector 等提供开箱即用的容器适配器。但是,不需要提供容器适配器——Configure.With(...) rant 很高兴只收到一个“处理程序激活器”(实现IHandlerActivator),因此如果您只想使用您的 IoC 容器来查找处理程序并自行注册 IBus,您也可以通过实现 IHandlerActivator 并在您的容器中查找处理程序来做到这一点。

    TL;DR:Rebus 方式是将 IoC 容器的一个实例视为一个单独的应用程序,因此只在其中注册一个 IBus 是有意义的。

    在单个进程中新建多个容器实例是非常好的,您希望托管多个应用程序(甚至是具有不同消息 SLA 的应用程序的多个实例)。

    【讨论】:

    • 感谢您的回答@mookid8000。但是在我看来,如果我错了,请纠正我,这回答了另一个答案的 cmets,而不是我的问题本身。这是否意味着我需要创建两个不同的容器,然后注册两个 Bus 以使用两个不同的队列?
    • 我认为这是一个错误,因为 Rebus.SimpleInjector 只是调用 GetAllInstances 而例如,Rebus.Autofac 以不同的方式处理它。
    • 嗨@mookid,很高兴听到 Rebus 不支持 Conforming Container,我很抱歉认为 Rebus 支持。如果您在为 Simple Injector 创建集成包时遇到任何问题,请随时私下(通过邮件)联系我或在Simple Injector forum 上发布问题。我会尽力帮助你的。
    • 让我们解决这个问题 :) 我创建了this issue,我在其中询问如何使用 SimpleInjector 满足 Rebus 的要求。
    【解决方案2】:

    异常状态:“类型 IBus 已注册”。根据您的堆栈跟踪,第二次添加 IBus 是在 SimpleInjectorContainerAdapter 内。您必须知道它是何时首次注册的。这很容易做到;在创建Container 后,刚刚注册了一个虚拟IBus 作为第一个注册,并查看它爆炸的堆栈跟踪。

    【讨论】:

    • 感谢您的回答。我想每次我打电话给Configure.With(...) 时,它都会注册总线。这似乎是here 的正常做法。我不认为是与容器有关的问题,而是与 Rebus 的工作方式有关。我不明白为什么我需要不同的总线来简单地处理另一个队列。
    • 这是我无法回答的问题。我没有看过 SimpleInjectorContainerAdapter 并且我不熟悉 Rubus 在容器周围的 Conforming Container 抽象,我不知道为什么适配器和 Rebus 都添加了自己的 IBus。可能是同一个总线实例吗?您可能想联系SimpleInjectorContainerAdapter 的创建者。
    • 你能看看我的问题编辑和新的例外吗?请注意,我使用container.Register&lt;IHandleMessages&lt;MyMessage&gt;, MyMessageHandler&gt;( Lifestyle.Scoped ); 正确注册了代码中的类型
    【解决方案3】:

    您引用了我的 SO Answer(问题),所以我将与您分享我是如何实现它的。 正如您将看到的,我使用特定的接口将命令与事件分开。

    然后,就在队列的消费部分,我进行了这种注册:

        public static void Register()
        {
            var assemblies = AppDomain.CurrentDomain.GetAssemblies()
                .Where(i => i.FullName.StartsWith("MySolutionPrefix"))
                ;
    
            var container = new Container();
    
            // http://simpleinjector.readthedocs.io/en/latest/lifetimes.html#perexecutioncontextscope
            container.Options.DefaultScopedLifestyle = new ExecutionContextScopeLifestyle();
    
    
            var serviceType = typeof(IHandleMessages<>).Name;
            // this is extension method to get specific handlers
            IEnumerable<Type> commandHandlers = assemblies.GetHandlers(serviceType, typeof(ICommand));
            container.Register(typeof(IHandleMessages<>), commandHandlers.Concat(new List<Type> { typeof(HistorizeCommandHanlder) }));
    
            // NOTE Just command Handlers
            container.RegisterCollection(typeof(IHandleMessages<>), commandHandlers);
    
    
            var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
                //.... logging, transport (I created my own transport on mongoDb), routing, sagas and so on
                .Options(o =>
                {
                    // This simply my personal transport as Register<IOneWayClientTransport> 
                    o.ConfigureDecouplingDatabase(db, settings.TopicBasedMessageQueueName);
                    // this is more complicated because i want that automatically the message is copied on
                    // a separate topic for each handler
                    o.EnableHandlerDecoupling(settings.DecouplingHandlersRegistration);
                })
                .Start();
    
            container.Verify();
    
            // It is necessary because otherwise it sends published messages to no-one 
            commandHandlers.GetHandledSubTypes(serviceType, typeof(IVersionedEvent))
                .ToList()
                .ForEach(i => bus.Subscribe(i).Wait());
    
            // NOTE just events handlers
            IEnumerable<Type> eventHandlers = assemblies
                .GetHandlers(serviceType, typeof(IVersionedEvent))
                .Except(commandHandlers)
                //.Except(new List<Type> { typeof(TempHandlerLogDecorator) })
                ;
            foreach (var handler in eventHandlers)
                ConfigureServiceBus(mongoDbConnectionProvider, db, handler.FullName, new[] { handler });
        }
    
        private static IBus ConfigureServiceBus(MongoDbConnectionProvider mongoDbConnectionProvider, IMongoDatabase db, string inputQueueName, IEnumerable<Type> handlers)
        {
            var container = new Container();
    
            container.Options.DefaultScopedLifestyle = new ExecutionContextScopeLifestyle();
    
            container.RegisterCollection(typeof(IHandleMessages<>), handlers);
    
            var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
                .... logging, Subscriptions
                // this is a consumer only for inputQueueName
                .Transport(t => t.UseMongoDb(db, settings.TopicBasedMessageQueueName, inputQueueName))
                .Options(o =>
                {
                    o.ConfigureDecouplingDatabase(db, settings.TopicBasedMessageQueueName);
                    o.EnableHandlerDecoupling(settings.DecouplingHandlersRegistration);
                })
                .Start();
    
            container.Verify();
    
            handlers.GetHandledSubTypes(typeof(IHandleMessages<>).Name, typeof(IVersionedEvent))
                .ToList()
                .ForEach(i => bus.Advanced.Topics.Subscribe(i.GetDecoupledTopic(settings.DecouplingHandlersRegistration)).Wait());
    
            return bus;
        }
    

    我仍在使用 Rebus 3.0.1 和 SimpleInjector 3.2.3。

    【讨论】:

    • 非常感谢 :)
    猜你喜欢
    • 2023-03-23
    • 2022-08-11
    • 2017-08-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-10-05
    • 1970-01-01
    • 2013-06-07
    相关资源
    最近更新 更多