【问题标题】:MassTransit: publisher no longer work. Why?MassTransit:发布者不再工作。为什么?
【发布时间】:2022-01-24 06:46:17
【问题描述】:

在之前的项目中,以下代码正在运行。 但现在我需要使用 MultiBus: - EventsConsumer 位于当前项目中。 - 支付消费者位于外部项目中。 为什么这种方法不再有效?

=================== 错误:

内部异常 1: InvalidOperationException:验证服务描述符“ServiceType:Publisher.Services.Abstract.IBusPublisher Lifetime:Singleton ImplementationType:Publisher.Services.Concrete.BusPublisher”时出错:尝试激活“Publisher”时无法解析“MassTransit.IBus”类型的服务。 Services.Concrete.BusPublisher'。

内部异常 2: InvalidOperationException:尝试激活“Publisher.Services.Concrete.BusPublisher”时无法解析“MassTransit.IBus”类型的服务。

// =================== Startup

using System;
using MassTransit;
using MassTransit.MultiBus;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Microsoft.OpenApi.Models;
using CommonTypes.Options;
using Publisher.Consumers;
using Publisher.Services.Abstract;
using Publisher.Services.Concrete;

namespace Publisher
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public interface IEventsBus : IBus { }
        public interface IPaymentBus : IBus { }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "Publisher", Version = "v1" });
            });

            #region MassTransit 

            services.AddSingleton<IBusPublisher, BusPublisher>();

            services.Configure<EventsBusOptions>(Configuration.GetSection("EventsBusOptions"));
            services.Configure<PaymentBusOptions>(Configuration.GetSection("PaymentBusOptions"));

            services.AddScoped<EventsConsumer>();

            // EventsConsumer located into current project
            services.AddMassTransit<IEventsBus>(x =>
            {
                x.AddConsumer<EventsConsumer>();

                x.UsingRabbitMq((context, cfg) =>
                {
                    var _options = context.GetRequiredService<IOptions<EventsBusOptions>>().Value;

                    cfg.Host(new Uri(_options.HostUri), h =>
                    {
                        h.Username(_options.UserName);
                        h.Password(_options.Password);
                    });

                    cfg.ReceiveEndpoint(_options.QueueName, ep =>
                    {
                        ep.PrefetchCount = _options.PrefetchCount ?? 15;
                        ep.ConcurrentMessageLimit = _options.UseConcurrencyLimit ?? 16;

                        ep.ConfigureConsumer<EventsConsumer>(context);
                    });

                    cfg.ConfigureEndpoints(context);
                });
            });

            // Consumer for Payment is located in outter project
            // Did I properly describe Bus for outter consumer?
            services.AddMassTransit<IPaymentBus>(x =>
            {
                x.UsingRabbitMq((context, cfg) =>
                {
                    var _options = context.GetRequiredService<IOptions<PaymentBusOptions>>().Value;

                    cfg.Host(new Uri(_options.HostUri), h =>
                    {
                        h.Username(_options.UserName);
                        h.Password(_options.Password);
                    });


                    cfg.ReceiveEndpoint(_options.QueueName, ep =>
                    {
                        ep.PrefetchCount = _options.PrefetchCount ?? 15;
                        ep.ConcurrentMessageLimit = _options.UseConcurrencyLimit ?? 16;
                    });

                    cfg.ConfigureEndpoints(context);
                });

            });

            services.AddMassTransitHostedService();
            services.AddGenericRequestClient();

            #endregion
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
                app.UseSwagger();
                app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "Publisher v1"));
            }

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }
    }
}


// =================== interface 


using System.Threading.Tasks;

namespace Publisher.Services.Abstract
{
    public interface IBusPublisher
    {
        Task Publish<Tin>(Tin request) where Tin : class;

        Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class;
    }
}

// =================== class

using System;
using MassTransit;
using System.Threading.Tasks;
using Publisher.Services.Abstract;
using Publisher.Contracts;
using Microsoft.Extensions.DependencyInjection;

namespace Publisher.Services.Concrete
{
    public class BusPublisher : IBusPublisher
    {
        readonly IServiceProvider _provider;
        readonly IBus _bus;

        public BusPublisher(IServiceProvider provider, IBus bus)
        {
            _provider = provider;
            _bus = bus;
        }

        public async Task Publish<Tin>(Tin request) where Tin : class
        {
            try
            {
                await _bus.Publish(request);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                throw;
            }
        }


        public async Task<Tout> GetResponse<Tin, Tout>(Tin request)
            where Tin : class
            where Tout : class
        {
            try
            {
                using (var _scope = _provider.CreateScope())
                {
                    var client = _scope.ServiceProvider.GetRequiredService<IRequestClient<Tin>>();
                    var response = await client.GetResponse<Tout>(request);
                    return response.Message;
                }

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                throw;
            }
        }
    }
}

【问题讨论】:

    标签: masstransit


    【解决方案1】:

    我已经解决了我的问题。现在我可以向 EventsConsumer 发布消息了。 但是我在第二个总线队列(IPaymentBus)中看不到消息。 也许我犯了一些错误。请帮我在两个队列中发布消息。

    //// ==================================== BusPublisher 
    namespace Publisher.Services.Concrete
    {
        public class BusPublisher : IBusPublisher
        {
            readonly IServiceProvider _provider;
            readonly IPublishEndpoint _bus;
            
    
            public BusPublisher(IServiceProvider provider, IPublishEndpoint bus)
            {
                _provider = provider;
                _bus = bus;
            }
    
            public async Task Publish<Tin>(Tin request) where Tin : class
            {
                try
                {
                    await _bus.Publish(request);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    throw;
                }
            }
    
    
            public async Task<Tout> GetResponse<Tin, Tout>(Tin request)
                where Tin : class
                where Tout : class
            {
                try
                {
                    using (var _scope = _provider.CreateScope())
                    {
                        var client = _scope.ServiceProvider.GetRequiredService<IRequestClient<Tin>>();
    
                        var response = await client.GetResponse<Tout>(request);
    
                        return response.Message;
                    }
    
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    throw;
                }
            }
    
    
        }
    }
    
    //// ==================================== Startup
    namespace Publisher
    {
        public class Startup
        {
            public Startup(IConfiguration configuration)
            {
                Configuration = configuration;
            }
            public IConfiguration Configuration { get; }
    
            public interface IPaymentBus : IBus { }
    
            public void ConfigureServices(IServiceCollection services)
            {
            ....
    
                #region MassTransit 
    
                services.Configure<EventsBusOptions>(Configuration.GetSection("EventsBusOptions"));
                services.Configure<PaymentBusOptions>(Configuration.GetSection("PaymentBusOptions"));
    
                services.AddScoped<EventsConsumer>();
                services.AddScoped<IBusPublisher, BusPublisher>();
    
                // EventsConsumer located into current project
                //services.AddMassTransit<IEventsBus>(x =>
                services.AddMassTransit(x =>
                {
                    x.AddConsumer<EventsConsumer>();
    
                    x.UsingRabbitMq((context, cfg) =>
                    {
                        var _options = context.GetRequiredService<IOptions<EventsBusOptions>>().Value;
    
                        cfg.Host(new Uri(_options.HostUri), h =>
                        {
                            h.Username(_options.UserName);
                            h.Password(_options.Password);
                        });
    
                        cfg.ReceiveEndpoint(_options.QueueName, ep =>
                        {
                            ep.PrefetchCount = _options.PrefetchCount ?? 15;
                            ep.ConcurrentMessageLimit = _options.UseConcurrencyLimit ?? 16;
    
                            ep.ConfigureConsumer<EventsConsumer>(context);
                        });
    
                        cfg.ConfigureEndpoints(context);
                    });
                });
    
                // Consumer for Payment is located in outter project
                // Did I properly describe Bus for outter consumer?
                // I need just publish message to queue with _options.QueueName 
                services.AddMassTransit<IPaymentBus>(x =>
                {
                    x.UsingRabbitMq((context, cfg) =>
                    {
                        var _options = context.GetRequiredService<IOptions<PaymentBusOptions>>().Value;
    
                        cfg.Host(new Uri(_options.HostUri), h =>
                        {
                            h.Username(_options.UserName);
                            h.Password(_options.Password);
                        });
    
    
                        cfg.ReceiveEndpoint(_options.QueueName, ep =>
                        {
                            ep.PrefetchCount = _options.PrefetchCount ?? 15;
                            ep.ConcurrentMessageLimit = _options.UseConcurrencyLimit ?? 16;
                        });
    
                        cfg.ConfigureEndpoints(context);
                    });
    
                });
    
                services.AddMassTransitHostedService();
                services.AddGenericRequestClient();
    
                #endregion
            }
    
        }
    }
    

    【讨论】:

    • 不要使用答案类型来提出新问题 - 没有人会在其中寻找您的问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-08-10
    • 1970-01-01
    • 1970-01-01
    • 2016-06-21
    • 2020-06-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多