【问题标题】:Publish command result of command handler through ConsumeContext --Masstransit通过 ConsumeContext --Masstransit 发布命令处理程序的命令结果
【发布时间】:2016-04-04 21:44:15
【问题描述】:

我已经为 masstransit 配置了一个 webapi 控制器。像这样的。

_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(x =>
            {
                x.Host(new Uri("rabbitmq://localhost/"), h =>{ });
            });
            _busControl.Start();

控制台应用程序是我的服务器,它正在监听我的命令/事件 --

 var BusControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
             {
                 var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
                {
                    //h.Username("guest");
                    //h.Password("guest");
                });

                 cfg.ReceiveEndpoint(host, "create_product_queue", config => config.Consumer<CreateInventoryProductItemHandler>());
                 cfg.ReceiveEndpoint(host, "ProductItemCreatedEvent", config => config.Handler<ProductItemCreatedEvent>(async context => await Console.Out.WriteLineAsync($"Product Item Id -----{context.Message.ProductItemId} "))); <---- this never gets executed . 
             });
            BusControl.Start();

控制器动作类似于 -

[HttpPost("product")]
        public async Task<IActionResult> Post([FromBody] Product Product)
        {
            var endpoint = await Startup.Bus.GetSendEndpoint(new Uri("rabbitmq://localhost/create_product_queue"));
            await endpoint.Send<CreateInventoryProductItem>(new
            {
                ProductName = Product.ProductName,
                PartNumber = Product.PartNumber,
                ManufactureDate = Product.ManufacturedDate
            });
            return new HttpStatusCodeResult(StatusCodes.Status202Accepted);
        }

处理程序是--

public async Task Consume(ConsumeContext<CreateInventoryProductItem> context)
        {
            CreateInventoryProductItem Command = context.Message;
            Product Product = await Context.SaveAsync(new Product(Command.ProductName
                , Command.PartNumber
                , Command.ManufacturedDate));
            await context.Publish<ProductItemCreatedEvent>(Product.Id); <--- problem area

        }

现在我的情况是。

  1. 创建一个命令对象。

  2. 从新创建的Entity框架Context.SaveAsync()方法中获取产品ID。

  3. 将 ProductId 发布到总线,以便其他侦听器可以侦听此产品 Id 并采取相应的行动。

    我正在尝试找出在这种情况下应用的最佳模式。

现在问题从 -- await context.Publish&lt;ProductItemCreatedEvent&gt;(Product.Id); 这一行开始。它尝试一次又一次地无限发布到同一个队列。结果,相同的 Product 对象被插入了数百次(我知道这很蹩脚,导致没有唯一约束,但这不是这里的问题),发生的情况是我的命令处理程序尽管被强类型化,但它会无限期地执行。如何摆脱这种情况。我对公共交通概念还很陌生。

所以有人可以解释一下为什么会发生这种情况。

【问题讨论】:

    标签: c# rabbitmq masstransit rabbitmq-exchange


    【解决方案1】:

    所以问题是您需要将一个对象传递给您正在使用的Publish 重载:

    await context.Publish<ProductItemCreatedEvent>(Product.Id);
    

    如果ProductItemCreatedEvent 包含两个属性IdDescription,您将创建一个匿名对象来初始化接口:

    await context.Publish<ProductItemCreatedEvent>(new
    {
        Id = Product.Id,
        Description = Product.Description,
    });
    

    这样,传递一个具有属性的对象,该对象可以为事件接口启动动态生成的支持类。

    【讨论】:

    • 我意识到它有点晚了。感谢您对此的回复。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-01
    • 2021-11-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多