【问题标题】:skipped queue in MassTransit with RabbitMQ in dot net core application在点网核心应用程序中使用 RabbitMQ 在 MassTransit 中跳过队列
【发布时间】:2021-01-01 06:25:54
【问题描述】:

我有三个项目。一是Dot net core MVC,二是API项目。 MVC 正在调用一个 API 来获取用户详细信息。当询问用户详细信息时,我通过 MassTransit 向队列发送消息。我看到跳过队列。第三个项目是API项目的消费者。

我尝试为具有相同配置的演示制作另一个解决方案。它运行良好。

下面是 MVC Razor 页面代码..

public async Task<IActionResult> OnPostAsync(string returnUrl = null)
    {
        ReturnUrl = returnUrl;

        if (ModelState.IsValid)
        {
            var user = await AuthenticateUser(Input.Email);

            if (user == null)
            {
                ModelState.AddModelError(string.Empty, "Invalid login attempt.");
                return Page();
            }

            #region snippet1
            var claims = new List<Claim>
            {
                new Claim(ClaimTypes.Name, user.Email),
                new Claim("FullName", user.FullName),
                new Claim(ClaimTypes.Role, "Administrator"),
            };

            var claimsIdentity = new ClaimsIdentity(
                claims, CookieAuthenticationDefaults.AuthenticationScheme);

            var authProperties = new AuthenticationProperties
            {
                ExpiresUtc = DateTimeOffset.UtcNow.AddMinutes(15),
                IsPersistent = true,
            };

            await HttpContext.SignInAsync(
                CookieAuthenticationDefaults.AuthenticationScheme,
                new ClaimsPrincipal(claimsIdentity),
                authProperties);
            #endregion

            _logger.LogInformation("User {Email} logged in at {Time}.",
                user.Email, DateTime.UtcNow);

            return LocalRedirect(Url.GetLocalUrl(returnUrl));
        }

        return Page();
    }

    private async Task<ApplicationUser> AuthenticateUser(string email)
    {
        if (!string.IsNullOrEmpty(email))
        {
            using (var client = new System.Net.Http.HttpClient())
            {
                var request = new System.Net.Http.HttpRequestMessage();
                request.RequestUri = new Uri("http://localhost:52043/api/user?uName=" + email); // ASP.NET 3 (VS 2019 only)
                var response = await client.SendAsync(request);
                var customer = Newtonsoft.Json.JsonConvert.DeserializeObject<Customers>(response.Content.ReadAsStringAsync().Result);

                
                return new ApplicationUser()
                {
                    Email = email,
                    FullName = customer.FullName
                };
            }
        }
        else
        {
            return null;
        }
    }

MVC 启动:

services.AddMassTransit(x =>
        {
            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                // configure health checks for this bus instance
                cfg.UseHealthCheck(provider);

                cfg.Host("rabbitmq://localhost");

            }));
        });

        services.AddMassTransitHostedService();

用户 API 代码 - 52043:

[HttpGet]
    public async Task<IActionResult> Get(string uName)
    {
        var customer = _userRepository.GetCustomerByUserName(uName);

        Uri uri = new Uri("rabbitmq://localhost/loginqueue");
        var endpoint = await _bus.GetSendEndpoint(uri);
        await endpoint.Send(new LoginObj() { NoteString = customer.FullName + " has logged in at " + DateTime.Now.ToString() });
        
        return Json(customer);
    }

日志记录 API - 消费者代码:

public class LoginConsumer : IConsumer<LoginObj>
{
    private readonly ILogger<object> _logger;

    public LoginConsumer(ILogger<object> logger)
    {
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<LoginObj> context)
    {
        var data = context.Message;
        _logger.LogInformation(data.ToString());
    }
}

登录 API 启动:

services.AddMassTransit(x =>
        {
            x.AddConsumer<LoginConsumer>();

            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                // configure health checks for this bus instance
                cfg.UseHealthCheck(provider);

                cfg.Host("rabbitmq://localhost");

                cfg.ReceiveEndpoint("loginqueue", ep =>
                {
                    ep.PrefetchCount = 16;
                    ep.UseMessageRetry(r => r.Interval(2, 100));

                    ep.ConfigureConsumer<LoginConsumer>(provider);
                });
            }));
        });

        services.AddMassTransitHostedService();

【问题讨论】:

    标签: asp.net-mvc .net-core rabbitmq masstransit


    【解决方案1】:

    根据the documentation

    MassTransit 使用完整的类型名称,包括命名空间,用于消息契约。在两个单独的项目中创建相同的消息类型时,命名空间必须匹配,否则消息将不会被消费。

    确保您的消息类型在每个项目中具有相同的命名空间/类型。

    【讨论】:

    • 我的控制器在Users.API.Controller命名空间下,消费者在Analytics.API命名空间下。你指的是这个吗?
    • 消息类型必须相同,包括命名空间。
    猜你喜欢
    • 2019-06-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多